数据采集配置
2021/6/10 10:36:07
本文主要是介绍数据采集配置,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
目录
1.将生成的jar包拷贝到CentOS0 /opt/module 分发jar包
2.在CentOS0上执行jar程序
配置登录远程服务器立即source一下环境变量
4./bin目录下创建脚本lg.sh
5./bin目录下创建集群时间同步修改脚本dt.sh
6.集群所有进程查看脚本
7.在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件
8.拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入flume的lib文件夹下面(YSJ-1.0-SNAPSHOT.jar)
9.分发Flume到其他主机
10.日志采集Flume启动停止脚本
11.Kafka集群启动停止脚本
12.查看所有Kafka Topic
13.创建 Kafka Topic
14.生产消息
15.消费消息
16.Kafka Manager安装
17.进入到/opt/module/kafka-manager-1.3.3.22/conf
18.Kafka Manager启动停止脚本
19.CentOS0:/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件
20.CentOS0服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置
1.将生成的jar包拷贝到CentOS0 /opt/module 分发jar包
2.在CentOS0上执行jar程序
[root@CentOS0 module]$ java -classpath 包名 com.Charlie.Guo.AppMain > /opt/module/test.log
配置登录远程服务器立即source一下环境变量
[root@CentOS0/1/2 ~]$ echo source /etc/profile >> ~/.bashrc
4./bin目录下创建脚本lg.sh
[root@CentOS0 bin]$ vim lg.sh
#! /bin/bash for i in CentOS0 CentOS1 do ssh $i "java -classpath /opt/module/YSJ-1.0-SNAPSHOT-jar-with-dependencies.jar com.Charlie.Guo.AppMain >/opt/module/test.log" done
[root@CentOS0 bin]$ chmod 777 lg.sh [root@CentOS0 module]$ lg.sh [root@CentOS0 module]# cd data 然后ls
5./bin目录下创建集群时间同步修改脚本dt.sh
[root@CentOS0 bin]$ vim dt.sh
#!/bin/bash log_date=$1" $2" echo $log_date for i in centos0 centos1 centos2 do ssh -t $i "sudo date -s '$log_date'" done
[root@CentOS0 bin]$ chmod 777 dt.sh [root@CentOS0 bin]$ dt.sh 2021-06-02
6.集群所有进程查看脚本
[root@CentOS0 bin]$ vim xcall.sh
#! /bin/bash for i in centos0 centos1 centos2 do echo --------- $i ---------- ssh $i "$*" done
赋权,执行
7.在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件
[root@CentOS0 conf]$ vim file-flume-kafka.conf
a1.sources=r1 a1.channels=c1 c2 # configure source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/data/app.+ a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 c2 #interceptor a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = com.Charlie.Guo.ETLInterceptor$Builder a1.sources.r1.interceptors.i2.type = com.Charlie.Guo.TypeInterceptor$Builder a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = topic a1.sources.r1.selector.mapping.topic_start = c1 a1.sources.r1.selector.mapping.topic_event = c2 # configure channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = centos0:9092,centos1:9092,centos2:9092 a1.channels.c1.kafka.topic = topic_start a1.channels.c1.parseAsFlumeEvent = false a1.channels.c1.kafka.consumer.group.id = flume-consumer a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c2.kafka.bootstrap.servers = centos0:9092,centos1:9092,centos2:9092 a1.channels.c2.kafka.topic = topic_event a1.channels.c2.parseAsFlumeEvent = false a1.channels.c2.kafka.consumer.group.id = flume-consumer
8.拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入flume的lib文件夹下面(YSJ-1.0-SNAPSHOT.jar)
9.分发Flume到其他主机
[root@CentOS0 module]$ xsync flume/ [root@CentOS0 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
10.日志采集Flume启动停止脚本
[root@CentOS0 bin]$ vim f1.sh
#! /bin/bash case $1 in "start"){ for i in centos0 centos1 do echo " --------启动 $i 采集flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/dev/null 2>&1 &" done };; "stop"){ for i in centos1 centos2 do echo " --------停止 $i 采集flume-------" ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill" done };; esac
[root@CentOS0 bin]$ vim f2.sh
#! /bin/bash case $1 in "start"){ for i in CentOS2 do echo " --------启动 $i 消费flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt 2>&1 &" done };; "stop"){ for i in CentOS2 do echo " --------停止 $i 消费flume-------" ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill" done };; esac
11.Kafka集群启动停止脚本
[root@hadoop102 bin]$ vim kf.sh
#! /bin/bash case $1 in "start"){ echo " -------- 启动 KafkaManager -------" nohup /opt/module/kafka-manager-1.3.3.15/bin/kafka-manager -Dhttp.port=7456 >start.log 2>&1 & };; "stop"){ echo " -------- 停止 KafkaManager -------" ps -ef | grep ProdServerStart | grep -v grep |awk '{print $2}' | xargs kill };; esac
12.查看所有Kafka Topic
[root@CentOS0 kafka]$ bin/kafka-topics.sh --zookeeper CentOS0:2181 --list
13.创建 Kafka Topic
[root@CentOS0 kafka]$ bin/kafka-topics.sh --zookeeper CentOS0:2181, CentOS1:2181, CentOS2:2181 --create --replication-factor 1 --partitions 1 --topic topic_start
[root@CentOS0 kafka]$ bin/kafka-topics.sh --zookeeper CentOS0:2181, CentOS1:2181, CentOS2:2181 --create --replication-factor 1 --partitions 1 --topic topic_event
注意:这里的两个Topic需要提前在HDFS中创建路径
hadoop fs -mkdir -p + 路径
14.生产消息
[root@CentOS0 kafka]$ bin/kafka-console-producer.sh \ --broker-list CentOS0:9092 --topic topic_start >hello world >root root
15.消费消息
[root@CentOS0 kafka]$ bin/kafka-topics.sh --zookeeper CentOS0:2181 \ --describe --topic topic_start
16.Kafka Manager安装
[root@CentOS0 module]$ unzip kafka-manager-1.3.3.22.zip
17.进入到/opt/module/kafka-manager-1.3.3.22/conf
[root@CentOS0 conf]$ vim application.conf
修改为:
kafka-manager.zkhosts="CentOS0:2181,CentOS1:2181,CentOS2:2181"
18.Kafka Manager启动停止脚本
[root@CentOS0 bin]$ vim km.sh
#! /bin/bash case $1 in "start"){ echo " -------- 启动 KafkaManager -------" nohup /opt/module/kafka-manager-1.3.3.15/bin/kafka-manager -Dhttp.port=7456 >start.log 2>&1 & };; "stop"){ echo " -------- 停止 KafkaManager -------" ps -ef | grep ProdServerStart | grep -v grep |awk '{print $2}' | xargs kill };; esac
19.CentOS0:/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件
vim kafka-flume-hdfs.conf
## 组件 a1.sources=r1 r2 a1.channels=c1 c2 a1.sinks=k1 k2 ## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = CentOS0:9092,CentOS1:9092,CentOS2:9092 a1.sources.r1.kafka.topics=topic_start ## source2 a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r2.batchSize = 5000 a1.sources.r2.batchDurationMillis = 2000 a1.sources.r2.kafka.bootstrap.servers = CentOS0:9092,CentOS1:9092,CentOS2:9092 a1.sources.r2.kafka.topics=topic_event ## channel1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1 a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/ a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 6 ## channel2 a1.channels.c2.type = file a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2 a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/ a1.channels.c2.maxFileSize = 2146435071 a1.channels.c2.capacity = 1000000 a1.channels.c2.keep-alive = 6 ## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = logstart- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = second ##sink2 a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix = logevent- a1.sinks.k2.hdfs.round = true a1.sinks.k2.hdfs.roundValue = 10 a1.sinks.k2.hdfs.roundUnit = second ## 不要产生大量小文件 a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k2.hdfs.rollInterval = 10 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollCount = 0 ## 控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k2.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop a1.sinks.k2.hdfs.codeC = lzop ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1 a1.sources.r2.channels = c2 a1.sinks.k2.channel= c2
20.CentOS0服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
同步配置到其他机器
conf]$ xsync flume-env.sh
这篇关于数据采集配置的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-06-26结对编程到底难不难?答案在这里
- 2024-06-19《2023版Java工程师》课程升级公告
- 2024-06-15matplotlib作图不显示3D图,怎么办?
- 2024-06-1503-Loki 日志监控
- 2024-06-1504-让LLM理解知识 -Prompt
- 2024-06-05做软件测试需要懂代码吗?
- 2024-06-0514-ShardingSphere的分布式主键实现
- 2024-06-03为什么以及如何要进行架构设计权衡?
- 2024-05-31全网首发第二弹!软考2024年5月《软件设计师》真题+解析+答案!(11-20题)
- 2024-05-31全网首发!软考2024年5月《软件设计师》真题+解析+答案!(21-30题)