kafka 实战 (ELK+kafka)
2022/4/18 23:43:50
本文主要是介绍kafka 实战 (ELK+kafka),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
准备
系统 | IP | 应用 | myid | drocker.id |
---|---|---|---|---|
CentOS 7 | 192.168.100.20 | Elasticserach+kibana+kafka+zookeeper | 1 | 1 |
CentOS 7 | 192.168.100.30 | Elasticserach+kafka+zookeeper+filebeat+logstash | 2 | 2 |
CentOS 7 | 192.168.100.40 | Elasticserach+kafka+zookeeper+filebeat | 3 | 3 |
先配置好 Elasticserach 和 Kibana 教程:ELK 日志分析系统 - 花花de代码生活 - 博客园 (cnblogs.com) 做完 kibana就可以了。
一、zookeeper
1、下载
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz --no-check-certificate
2、安装及配置 zookeeper
[root@elk-1 ~]# tar -zxf apache-zookeeper-3.6.1-bin.tar.gz -C /usr/local/ [root@elk-1 ~]# mv /usr/local/apache-zookeeper-3.6.1 /usr/local/zookeeper [root@elk-1 ~]# cd /usr/local/zookeeper [root@elk-1 zookeeper]# mkdir {data,logs} [root@elk-1 zookeeper]# echo 1 > data/myid [root@elk-1 zookeeper]# cp conf/zoo_sample.cfg conf/zoo.cfg # 配置文件 [root@elk-1 zookeeper]# vim conf/zoo.cfg # The number of milliseconds of each tick tickTime=2000 #zk之间⼼跳间隔2秒 # The number of ticks that the initial # synchronization phase can take initLimit=10 #LF初始通信时限 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 #LF同步通信时限 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/tmp/zookeeper #Zookeeper保存数据的⽬录 dataLogDir=/usr/local/zookeeper/logs #Zookeeper保存⽇志⽂件的⽬录 # the port at which the clients will connect clientPort=2181 #客户端连接 Zookeeper 服务器的端⼝ admin.serverPort=8888 #默认占⽤8080端⼝ # the maximum number of client connections. # increase this if you need to handle more clients maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. ##http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature autopurge.purgeInterval=1 server.1=elk-1:2888:3888 server.2=elk-2:2888:3888 server.3=elk-3:2888:3888 # 修改配置文件 echo export ZOOKEEPER_HOME=/usr/local/zookeeper >> /etc/profile echo export PATH=$ZOOKEEPER_HOME/bin:$PATH >> /etc/profile # 刷新变量 source /etc/profile # 测试变量是否成功 echo $ZOOKEEPER_HOME # 启动 zk [root@elk-1 zookeeper]# bin/zkServer.sh start /usr/bin/java ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED //启动成功 [root@elk-1 ~]# scp /usr/local/zookeeper/conf/zoo.cfg elk-2:/usr/local/zookeeper/conf/zoo.cfg
如果开启不了,查看配置文件没有问题你就查看 data/myid 是否有问题。elk-2 与 elk-3 的这个配置文件不需要更改。
[root@elk-1 ~]# scp /usr/local/zookeeper/conf/zoo.cfg elk-2:/usr/local/zookeeper/conf/zoo.cfg elk-2: [root@elk-2 zookeeper]# mkdir {data,logs} [root@elk-2 zookeeper]# echo 2 > data/myid elk-3: [root@elk-3 zookeeper]# mkdir {data,logs} [root@elk-3 zookeeper]# echo 3 > data /myid
3、测试
1 # 查看节点状态 2 [root@elk-1 zookeeper]# /usr/local/zookeeper/bin/zkServer.sh status 3 /usr/bin/java 4 ZooKeeper JMX enabled by default 5 Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg 6 Client port found: 2181. Client address: localhost. 7 Mode: follower 8 9 [root@elk-1 zookeeper]# bin/zkCli.sh -server elk-2:2181 10 [zk: elk-1:2181(CONNECTED) 0]
二、kafka
1、下载 kafka
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz --no-check-certificate
2、安装及配置
[root@elk-1 ~]# tar -zxf kafka_2.12-2.8.1.tgz -C /usr/local/ [root@elk-1 ~]# cp /usr/local/kafka_2.12-2.8.1 /usr/local/kafka [root@elk-1 ~]# cd /usr/local/kafka/ [root@elk-1 kafka]# mkdir logs [root@elk-1 kafka]# cp config/server.properties{,.bak} #配置文件 [root@elk-1 kafka]# vim config/server.properties # broker的id 需要于本机 zk 的 myid 里的数一样 broker.id=1 # 监听端口 listeners=PLAINTEXT://192.168.100.20:9092 # 处理消息的最大线程数,一般情况下不要去修改 num.network.threads=3 # socket的发送缓冲区 num.io.threads=8 # socket的发送缓冲区 socket.send.buffer.bytes=102400 # socket接收缓冲区 socket.receive.buffer.bytes=102400 # 请求最大值 socket.request.max.bytes=104857600 # log日志文件 log.dirs=/opt/data/kafka/logs # 分区 num.partitions=3 # kafka会使用可配置的线程池来处理日志片段 num.recovery.threads.per.data.dir=1 #断分偏移量 offsets.topic.replication.factor=1 # 传输日志的复制状态 transaction.state.log.replication.factor=1 # 传输日志最小的信息 transaction.state.log.min.isr=1 # 数据存储的最大时间 log.retention.hours=168 # topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖 log.segment.bytes=1073741824 # 检查文件大小检查的周期时间 log.retention.check.interval.ms=300000 # zookeeper连接,维护集群状态 zookeeper.connect=192.168.100.20:2181,192.168.100.30:2181,192.168.100.40:2181 # 连接超时 zookeeper.connection.timeout.ms=18000 # 指定时间内没有消息到达就抛出异常,一般不需要改 group.initial.rebalance.delay.ms=0 # 修改环境变量⽂件 echo export KAFKA_HOME=/usr/local/kafka >> /etc/profile echo export PATH=$KAFKA_HOME/bin:$PATH >> /etc/profile source /etc/profile echo $KAFKA_HOME # 启动 kafka 两种开启方式 [root@elk-1 kafka]# bin/kafka-server-start.sh -daemon config/server.properties [root@elk-1 kafka]# nohup bin/kafka-server-start.sh config/server.properties > logs/kafka.log 2>1 & [1] 14653 # 查看kafka是否开启成功 [root@elk-1 kafka]# jps 14481 QuorumPeerMain 13538 Elasticsearch 15061 Jps 13239 -- process information unavailable 14653 Kafka
主要修改 elk-2 与 elk-3 修改 kafka 配置文件
[root@elk-1 ~]# scp /usr/local/kafka/config/server.properties elk-2:/usr/local/kafka/config/server.properties #第二个节点的 broker.id=2 listeners=PLAINTEXT://192.168.100.30:9092 #第三个节点的 broker.id=3 listeners=PLAINTEXT://192.168.100.40:909
3、测试 kafka
[root@elk-1 zookeeper]# yum install -y lsof [root@elk-1 kafka]# lsof -i:2181 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 13527 root 48u IPv6 50879 0t0 TCP *:eforward (LISTEN) java 13527 root 64u IPv6 53903 0t0 TCP elk-1:eforward->elk-1:36448 (ESTABLISHED) java 13527 root 66u IPv6 54636 0t0 TCP elk-1:eforward->elk-3:43152 (ESTABLISHED) java 13601 root 122u IPv6 54546 0t0 TCP elk-1:36448->elk-1:eforward (ESTABLISHED) # 创建一个 test-ken 文件 [root@elk-1 kafka]# bin/kafka-topics.sh --create --bootstrap-server elk-1:9092 --replication-factor 3 --partitions 2 --topic test-ken # 在其他节点查看是否升成 [root@elk-3 kafka]# bin/kafka-topics.sh --list --bootstrap-server 192.168.100.30:9092 test-ken # 查看Topic详情 [root@elk-1 kafka]# bin/kafka-topics.sh --describe --bootstrap-server elk-1:9092 --topic test-ken Topic: test-ken TopicId: n_OMjCKNQ7SsabGsfen_cA PartitionCount: 2 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: test-ken Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: test-ken Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 # 从消费者 elk-1 输出消息 elk-2 即可就可以收到消息 [root@elk-1 kafka]# bin/kafka-console-producer.sh --broker-list elk-1:9092 --topic test-ken >>test >ceshicgo > # elk-2 这里就可以直接可以收到 [root@elk-2 kafka]# bin/kafka-console-consumer.sh --bootstrap-server elk-2:9092 --topic test-ken test ceshicgo -------- 扩列知识 -------- # 删除Topic: [root@elk-1 kafka]# bin/kafka-topics.sh --delete --bootstrap-server elk-1:9092 --topic test-ken # 查看删除信息: [root@elk-1 kafka]# bin/kafka-topics.sh --list --bootstrap-server elk-2:9092
三、filebeat
1、下载
[root@elk-2 ~]# wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.0.0-x86_64.rpm
2、安装及配置、
(1.)elk-1
[root@elk1 ~]# rpm -ivh filebeat-6.0.0-x86_64.rpm [root@elk-1 ~]# vim /etc/filebeat/filebeat.yml filebeat.prospectors: - type: log enabled: true paths: - /var/log/es_access.log //此处可⾃⾏改为想要监听的⽇志⽂件 output.kafka: enabled: true hosts: ["elk-1:9092","elk-2:9092","elk-3:9092"] topic: es_access //对应zookeeper⽣成的topic keep_alive: 10s
必须注释
写到最下即可
[root@elk-1 ~]# systemctl start filebeat
(2.)elk-2
跟上一步一样就是修改了一下要看到日志文件
[root@elk-2 ~]# rpm -ivh filebeat-6.0.0-x86_64.rpm 编辑配置⽂件: [root@elk2 ~]# vim /etc/filebeat/filebeat.yml filebeat.prospectors: - type: log enabled: true paths: - /var/log/vmware-network.log //此处可⾃⾏改为想要监听的⽇志⽂件 output.kafka: enabled: true hosts: ["elk-1:9092","elk-2:9092","elk-3:9092"] topic: vmware-network //对应zookeeper⽣成的topic keep_alive: 10s [root@elk2 ~]# systemctl start filebeat
(3.)elk-3
[root@elk-3 ~]# rpm -ivh filebeat-6.0.0-x86_64.rpm 编辑配置⽂件: [root@elk-3 ~]# vim /etc/filebeat/filebeat.yml filebeat.prospectors: - type: log enabled: true paths: - /var/log/access.log //此处可⾃⾏改为想要监听的⽇志⽂件 output.kafka: enabled: true hosts: ["elk-1:9092","elk-2:9092","elk-3:9092"] topic: access //对应zookeeper⽣成的topic keep_alive: 10s [root@elk-3 ~]# systemctl start filebeat
四、Logstash
1、下载
[root@elk_2 ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.0.0.rpm
2、安装及部署
[root@elk-2 ~]# rpm -ivh logstash-6.0.0.0.rpm [root@elk1 ~]# vi /etc/logstash/logstash.yml http.host: "192.168.100.30" # 配置logstash收集es_access的⽇志: [root@elk-2 ~]# cat /etc/logstash/conf.d/es_access.conf # Settings file in YAML input { kafka { bootstrap_servers => "192.168.100.20:9092,192.168.100.30:9092,192.168.100.40:9092" group_id => "logstash" auto_offset_reset => "earliest" decorate_events => true topics => ["es_access"] type => "messages" } } output { if [type] == "messages" { elasticsearch { hosts => ["192.168.100.20:9200","192.168.100.30:9200","192.168.100.40:9200"] index => "es_access-%{+YYYY-MM-dd}" } } } # 配置收集vmare的日志 [root@elk-2 ~]# cat /etc/logstash/conf.d/vmware.conf # Settings file in YAML input { kafka { bootstrap_servers => "192.168.100.20:9092,192.168.100.30:9092,192.168.100.40:9092" group_id => "logstash" auto_offset_reset => "earliest" decorate_events => true topics => ["vmare"] type => "messages" } } output { if [type] == "messages" { elasticsearch { hosts => ["192.168.100.20:9200","192.168.100.30:9200","192.168.100.40:9200"] index => "vmare-%{+YYYY-MM-dd}" } } } # 配置收集nginx日志 [root@elk-2 ~]# cat /etc/logstash/conf.d/nginx.conf # Settings file in YAML input { kafka { bootstrap_servers => "192.168.100.20:9092,192.168.100.30:9092,192.168.100.40:9092" group_id => "logstash" auto_offset_reset => "earliest" decorate_events => true topics => ["nginx"] type => "messages" } } output { if [type] == "messages" { elasticsearch { hosts => ["192.168.100.20:9200","192.168.100.30:9200","192.168.100.40:9200"] index => "nginx-%{+YYYY-MM-dd}" } } } [root@elk_2 ~]# chmod 755 /var/log/messages [root@elk_2 ~]# chown -R logstash /var/lib/logstash/ [root@elk-2 ~]# ln -s /usr/share/logstash/bin/logstash /usr/bin # 测试脚本是否可以使用 返回ok就是可以使用 [root@elk-2 ~]# logstash --path.settings /etc/logstash/ -f /etc/logstash/conf.d/es_access.conf --config.test_and_exit
3、测试是否成功
第一步:(因为 nginx 跟 vmware 没有这个日志文件所以没有索引)
# 查看是否升成日志索引 [root@elk-1 ~]# curl '192.168.100.20:9200/_cat/indices?v' health status index uuid pri rep docs.count docs.deleted store.size pri.store.size yellow open es_access-2022-04-18 PfGB-dZUTWC8goIxhHgUlA 5 1 11 0 36kb 36kb yellow open .kibana lp_TV8zKTWay24UMkEPtqQ 1 1 2 0 6.8kb 6.8kb
第二步:
第三步:
[root@elk-1 ~]# echo 111 > /var/log/es_access.log [root@elk-1 ~]# cat /var/log/es_access.log 111
第四步:
这篇关于kafka 实战 (ELK+kafka)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-21MQ-2烟雾传感器详解
- 2024-12-09Kafka消息丢失资料:新手入门指南
- 2024-12-07Kafka消息队列入门:轻松掌握Kafka消息队列
- 2024-12-07Kafka消息队列入门:轻松掌握消息队列基础知识
- 2024-12-07Kafka重复消费入门:轻松掌握Kafka消费的注意事项与实践
- 2024-12-07Kafka重复消费入门教程
- 2024-12-07RabbitMQ入门详解:新手必看的简单教程
- 2024-12-07RabbitMQ入门:新手必读教程
- 2024-12-06Kafka解耦学习入门教程
- 2024-12-06Kafka入门教程:快速上手指南