faust从kafka消费nginx日志流分析告警
2022/1/24 7:06:16
本文主要是介绍faust从kafka消费nginx日志流分析告警,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
faust从kafka消费nginx日志流分析告警
nginx节点日志通过syslog发送至syslog server,syslog server进行格式处理后作为生产者,把日志流send至kafka 对应的topic上。
基于faust框架编写数据流消费程序,从kafka指定的topic上消费数据流,通过stream.filter+lambda表达式,指定错误界别的数据流进行分析,使用域名和ip为key进行计数,当错误超过阈值时发送告警通知相关人员。
import faust import redis from feishuRobot import feishuRobot from datetime import timedelta from log import logger from config.config import ReadConfig try: obj = ReadConfig() conf = obj.get_config() logger.info("load config file successful") except Exception as r: logger.error('Fail to load config file: {reason}', reason=r) try: pool = redis.ConnectionPool(host=conf['redis']['address'], port=conf['redis']['port'], decode_responses=True, password=conf['redis']['password'] ) obj_r = redis.Redis(connection_pool=pool) except Exception as r: logger.error("Fail to connection redis poll: {reason}", reason=r) app = faust.App( 'error_log_alarm', store='rocksdb://', broker= conf['kafka']['access_broker'], stream_wait_empty=False, broker_max_poll_records=conf['kafka']['max_poll'], topic_partitions=1, #vaule_type=json, #value_serializer='raw', ) class Transfer(faust.Record): from_host_ip: str level: str #message: str reason: str logtime: str #def master_processor(key, events): #timestamp = key[1][0] #for event in events: record_error = app.Table( 'record_error', default=int, #on_window_close=master_processor, #).tumbling(timedelta(minutes=1), expires=timedelta(minutes=1)).relative_to_stream() ).tumbling(conf['faust']['window_size'], expires=conf['faust']['expires'], key_index=True).relative_to_stream() error_topic = app.topic('sec-waf-error-log', value_type=Transfer) @app.agent(error_topic) async def greet(stream): ''' #async for value in stream.filter(lambda x: x.status == '200' ).group_by(Transfer.from_host_ip): async for value in stream: master_to_total[value.from_host_ip] += 1 ''' #async for value in stream.group_by(Transfer.from_host_ip): #upstream timed out (110: Connection timed out) while reading response header from upstream #httpApi_action(): httpApi_action[push_count_dict] error: failed to commit the pipelined (push_count_dict) requests: timeout, context: ngx.timer #upstream prematurely closed connection while reading response header from upstream # connect() failed (111: Connection refused) while connecting to upstream #access forbidden by rule, #recv() failed (104: Connection reset by peer) while proxying upgraded connection try: feishu = feishuRobot() except Exception as r: logger.error("Fail to init feishuRobot object: {reason}", reason=r) async for value in stream.filter(lambda x: x.level == "error"): #print("attack: ", value) record_error['{value.from_host_ip}'] += 1 #域名_ip统计计数 v = record_error['{value.from_host_ip}'] if v.now() >= 10: msg = "" err_key = "" if "recv() failed (110: Connection timed out) while reading response header from upstream" in value.reason: err_key = "error_" + value.from_host_ip + "_Connection_timed_out" msg += "级别: 中\r\n" elif "recv() failed (104: Connection reset by peer) while reading response header from upstream" in value.reason: err_key = "error_" + value.from_host_ip + "_Connection_reset_peer" msg += "级别: 中\r\n" elif "upstream prematurely closed connection while reading response header from upstream" in value.reason: err_key = "error_" + value.from_host_ip + "_prematurely_closed_connection" msg += "级别: 中\r\n" elif "access forbidden by rule" in value.reason: err_key = "error_" + value.from_host_ip + "_access_forbidden_rule" msg += "级别: 低\r\n" elif "connect() failed (111: Connection refused) while connecting to upstream" in value.reason: errr_key = "error_" + value.from_host_ip + "_Connection_refused" msg += "级别: 中\r\n" elif "client intended to send too large body" in value.reason: err_key = "error_" + value.from_host_ip + "_send_too_large_body" msg += "级别: 低\r\n" elif "failed to commit the pipelined (push_count_dict)" in value.reason: err_key = "error_" + value.from_host_ip + "_commit_the_pipelined" msg += "级别: 低\r\n" elif "could not build optimal server_names_hash" in value.reason: #warning err_key = "warn_" + value.from_host_ip + "_optimal_server_names_hash" msg += "级别: 低\r\n" elif "no live upstreams while connecting to upstream" in value.reason: err_key = "error_" + value.from_host_ip + "_no_live_upstreams" msg += "级别: 高\r\n" elif "SSL_do_handshake() failed" in value.reason: err_key = "error_" + value.from_host_ip + "_SSL_do_handshake" msg += "级别: 高\r\n" if obj_r.get(err_key) is not None: #如果存在更新计数统计 obj_r.set(err_key, str(v.now())) else: obj_r.set(err_key, str(v.now())) obj_r.expire(err_key, conf['redis']['record_expire']) msg += "waf节点: " + value.from_host_ip + "\r\n" msg += "错误信息" + value.reason + "\r\n" msg += "时间: " + value.logtime + "\r\n" msg += "错误日志频率: 2分钟" + str(v.now()) + "次\r\n" feishu.send_card_text(msg) if __name__ == '__main__': app.main()
这篇关于faust从kafka消费nginx日志流分析告警的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-29Nginx发布学习:从入门到实践的简单教程
- 2024-10-28Nginx发布:新手入门教程
- 2024-10-21nginx 怎么设置文件上传最大20M限制-icode9专业技术文章分享
- 2024-10-17关闭 nginx的命令是什么?-icode9专业技术文章分享
- 2024-09-17Nginx实用篇:实现负载均衡、限流与动静分离
- 2024-08-21宝塔nginx新增8022端口方法步骤-icode9专业技术文章分享
- 2024-08-21nginx配置,让ws升级为wss访问的方法步骤-icode9专业技术文章分享
- 2024-08-15nginx ws代理配置方法步骤-icode9专业技术文章分享
- 2024-08-14nginx 让访问带有/relid的地址返回404 ,例子 /relid-x-0.36-y-131.html-icode9专业技术文章分享
- 2024-08-14nginx 判断地址有/statics/的路径,指向到/home/html/statics/目录-icode9专业技术文章分享