Druid 使用 Kafka 将数据载入到 Kafka

2021/8/7 6:06:04

本文主要是介绍Druid 使用 Kafka 将数据载入到 Kafka,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

将数据载入到 Kafka

现在让我们为我们的主题运行一个生成器(producer),然后向主题中发送一些数据!

在你的 Druid 目录中,运行下面的命令:

cd quickstart/tutorial
gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json

在你的 Kafka 的安装目录中,运行下面的命令。请将 {PATH_TO_DRUID} 替换为 Druid 的安装目录:

export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json

上面的控制台命令将会把示例消息载入到 Kafka 的 wikipedia 主题。 现在我们将会使用 Druid 的 Kafka 索引服务(indexing service)来将我们加载到 Kafka 中的消息导入到 Druid 中。

使用数据加载器(data loader)来加载数据

在 URL 中导航到 localhost:8888 页面,然后在控制台的顶部单击Load data

 

Data loader init

 

选择 Apache Kafka 然后单击 Connect data

 

Data loader sample

 

输入 Kafka 的服务器地址为 localhost:9092 然后选择 wikipedia 为主题。

然后单击 Apply。请确定你在界面中看到的数据只正确的。

一旦数据被载入后,你可以单击按钮 “Next: Parse data” 来进行下一步的操作。

 

Data loader parse data

 

Druid 的数据加载器将会为需要加载的数据确定正确的处理器。 在本用例中,我们成功的确定了需要处理的数据格式为 json 格式。 你可以在本页面中选择不同的数据处理器,通过选择不同的数据处理器,能够帮你更好的了解 Druid 是如何帮助你处理数据的。

当 json 格式的数据处理器被选择后,单击 Next: Parse time 来进行入下一个界面,在这个界面中你需要确定 timestamp 主键字段的的列。

 

Data loader parse time

 

Druid 要求所有数据必须有一个 timestamp 的主键字段(这个主键字段被定义和存储在 __time)中。 如果你需要导入的数据没有时间字段的话,那么请选择 Constant value。 在我们现在的示例中,数据载入器确定 time 字段是唯一可以被用来作为数据时间字段的数据。

单击 Next: ... 2 次,来跳过 Transform 和 Filter 步骤。 针对本教程来说,你并不需要对导入时间进行换行,所以你不需要调整 转换(Transform) 和 过滤器(Filter) 的配置。

 

Data loader schema

 

配置摘要(schema) 是你对 dimensions 和 metrics 在导入数据的时候配置的地方。 这个界面显示的是当我们对数据在 Druid 中进行导入的时候,数据是如何在 Druid 中进行存储和表现的。 因为我们提交的数据集非常小,因此我们可以关闭 回滚(rollup) ,Rollup 的开关将不会在这个时候显示来供你选择。

如果你对当前的配置满意的话,单击 Next 来进入 Partition 步骤。在这个步骤中你可以定义数据是如何在段中进行分区的。

 

Data loader partition

 

在这一步中,你可以调整你的数据是如何在段中进行分配的。 因为当前的数据集是一个非常小的数据库,我们在这一步不需要进行调制。

单击 Next: Tune 来进入性能配置页面。

 

Data loader tune

 

Tune 这一步中一个 非常重要 的参数是 Use earliest offset 设置为 True。 因为我们希望从流的开始来读取数据。 针对其他的配置,我们不需要进行修改,单击 Next: Publish 来进入 Publish 步骤。

 

Data loader publish

 

让我们将数据源命名为 wikipedia-kafka

最后,单击 Next 来查看你的配置。

 

Data loader spec

 

等到这一步的时候,你就可以看到如何使用数据导入来创建一个数据导入规范。 你可以随意的通过页面中的导航返回到前面的页面中对配置进行调整。 简单来说你可以对特性目录进行编辑,来查看编辑后的配置是如何对前面的步骤产生影响的。

当你对所有的配置都满意并且觉得没有问题的时候,单击 提交(Submit).

 

Tasks view

 

现在你需要到界面下半部分的任务视图(task view)中来查看通过 supervisor 创建的任务。

任务视图(task view)是被设置为自动刷新的,请等候 supervisor 来运行一个任务。

当一个任务启动运行后,这个任务将会对数据进行处理后导入到 Druid 中。

在页面的顶部,请导航到 Datasources 视图。

 

Datasource view

 

当 wikipedia-kafka 数据源成功显示,这个数据源中的数据就可以进行查询了。

请注意: 如果数据源在经过一段时间的等待后还是没有数据的话,那么很有可能是你的 supervisor 没有设置从 Kafka 的开头读取流数据(Tune 步骤中的配置)。

在数据源完成所有的数据导入后,你可以进入 Query 视图,来针对导入的数据源来运行 SQL 查询。

因为我们当前导入的数据库很小,你可以直接运行SELECT * FROM "wikipedia-kafka" 查询来查看数据导入的结果。

 

Query view

 

请访问 query tutorial 页面中的内容来了解如何针对一个新载入的数据如何运行查询。

 

https://www.ossez.com/t/druid-kafka-kafka/13654



这篇关于Druid 使用 Kafka 将数据载入到 Kafka的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程