logstash收集日志并写入kafka再到es集群
2021/10/7 6:13:18
本文主要是介绍logstash收集日志并写入kafka再到es集群,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
条件:
有kafka环境
图形架构:
环境准备
172.31.2.101 es1 + kibana 172.31.2.102 es2 172.31.2.103 es3 172.31.2.104 logstash1 172.31.2.105 logstash2 172.31.2.41 zookeeper + kafka 172.31.2.42 zookeeper + kafka 172.31.2.43 zookeeper + kafka 172.31.2.107 web1
先启动zookeeper
[root@mq1 ~]# /usr/local/zookeeper/bin/zkServer.sh restart [root@mq2 ~]# /usr/local/zookeeper/bin/zkServer.sh restart [root@mq3 ~]# /usr/local/zookeeper/bin/zkServer.sh restart
启动kafka
[root@mq1 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties [root@mq2 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties [root@mq3 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties
查看端口
[root@mq1 ~]# ss -tanl | grep 9092 LISTEN 0 50 [::ffff:172.31.2.41]:9092 *:*
web服务器改配置写入到kafka
[root@es-web1 ~]# cat /etc/logstash/conf.d/kafka-nginx-es.conf input { file { path => "/var/log/nginx/access.log" start_position => "beginning" stat_interval => 3 type => "nginx-accesslog" codec => "json" } file { path => "/apps/nginx/logs/error.log" start_position => "beginning" stat_interval => 3 type => "nginx-errorlog" } } output { if [type] == "nginx-accesslog" { kafka { bootstrap_servers => "172.31.2.41:9092" topic_id => "long-linux21-accesslog" codec => "json" }} if [type] == "nginx-errorlog" { kafka { bootstrap_servers => "172.31.2.41:9092" topic_id => "long-linux21-errorlog" #codec => "json" }} }
重启
root@long:~# systemctl restart logstash
在logstash服务器配置写入elasticsearch
[root@logstash1 ~]# cat /etc/logstash/conf.d/kafka-to-es.conf input { kafka { bootstrap_servers => "172.31.2.41:9092,172.31.2.42:9092,172.31.2.43:9092" topics => "long-linux21-accesslog" codec => "json" } kafka { bootstrap_servers => "172.31.2.41:9092,172.31.2.42:9092,172.31.2.43:9092" topics => "long-linux21-errorlog" codec => "json" } } output { if [type] == "nginx-accesslog" { elasticsearch { hosts => ["172.31.2.101:9200"] index => "n19-long-kafka-nginx-accesslog-%{+YYYY.MM.dd}" }} if [type] == "nginx-errorlog" { elasticsearch { hosts => ["172.31.2.101:9200"] index => "n17-long-kafka-nginx-errorlog-%{+YYYY.MM.dd}" }} }
测试
[root@logstash1 ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka-to-es.conf -t
重启
root@long:~# systemctl restart logstash
kafak工具
写入kibana
略
这篇关于logstash收集日志并写入kafka再到es集群的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-09-10RabbitMQ教程:初学者指南
- 2024-09-10RabbitMQ教程:初学者指南
- 2024-09-01Kafka事务实现原理
- 2024-08-09KubeSphere 部署 Kafka 集群实战指南
- 2024-07-24百行代码实现 Kafka 运行在 S3 之上
- 2024-07-18如何使用观测云监测 AutoMQ 集群状态
- 2024-07-18活动回顾 | AutoMQ 联合 GreptimeDB 共同探讨新能源汽车数据基础设施
- 2024-07-15AutoMQ vs Kafka: 来自小红书的独立深度评测与对比
- 2024-07-15AutoMQ 生态集成 Kafdrop-ui
- 2024-07-15AutoMQ 与蚂蚁数科达成战略合作