scapy抓包使用
2021/11/19 6:39:54
本文主要是介绍scapy抓包使用,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
# coding=utf-8 import json import time import os import dpkt import socket import datetime import uuid import traceback from dpkt.ethernet import Ethernet from scapy.layers.l2 import Ether from scapy.sendrecv import sniff from scapy.utils import wrpcap from BigData.data_common.utils.file_util import FileUtil def get_local_ip(): hostname = socket.gethostname() # 获取本机内网ip local_ips = socket.gethostbyname_ex(hostname)[-1] return local_ips def body_transfer(body): str_body = body.decode() body_ls = str_body.split("&") d = {} for item in body_ls: key_, value_ = item.split("=") d[key_.strip()] = value_.strip() def analysis_pcap(timestamp, buf): data = {} if isinstance(buf, dpkt.ip.IP): eth = buf else: eth = dpkt.ethernet.Ethernet(buf) # print(eth.data.__dict__) # print("ip layer:"+eth.data.__class__.__name__) #以太包的数据既是网络层包 # print("tcp layer:"+eth.data.data.__class__.__name__) #网络层包的数据既是传输层包 # print("http layer:" + eth.data.data.data.__class__.__name__) #传输层包的数据既是应用层包 # print('Timestamp: ',str(datetime.datetime.utcfromtimestamp(timestamp))) #打印出包的抓取时间 if isinstance(eth.data, dpkt.ip.IP) or isinstance(eth.data, dpkt.ip6.IP6): # # print('%d Non IP Packet type not supported %s' % (int(timestamp), eth.data.__class__.__name__)) # print('ip.data type is {}'.format(eth.data.__class__.__name__)) # print(repr(eth.data)) # return data ip = eth.data if isinstance(eth.data, dpkt.ip.IP): src_ip = socket.inet_ntoa(ip.src) dst_ip = socket.inet_ntoa(ip.dst) # do_not_fragment =bool(ip.off & dpkt.ip.IP_DF) # more_fragments =bool(ip.off & dpkt.ip.IP_MF) # fragment_offset = ip.off & dpkt.ip.IP_OFFMASK # key = 'IPV4' if isinstance(eth.data, dpkt.ip.IP) else 'IPV6' data.update({ 'time': timestamp, 'IPV4': {'src': src_ip, 'dst': dst_ip} }) else: src_ip = ip.src dst_ip = ip.dst data.update({ 'time': timestamp, 'IPV6': {'src': src_ip, 'dst': dst_ip} }) print('ip.data type is {}'.format(ip.data.__class__.__name__)) if isinstance(ip.data, dpkt.tcp.TCP): layer = ip.data data.update(analysis_tcp(layer)) elif isinstance(ip.data, dpkt.udp.UDP): layer = ip.data data.update(analysis_udp(layer)) elif isinstance(ip.data, dpkt.icmp.ICMP) or isinstance(ip.data, dpkt.icmp6.ICMP6): layer = ip.data data.update(analysis_icmp(layer)) else: print('analysis_pcap ip.data {}'.format(repr(ip.data))) data = {'time': timestamp, eth.data.__class__.__name__: {}} else: print('analysis_pcap eth.data {}'.format(repr(eth.data))) data = {'time': timestamp, eth.data.__class__.__name__: eth.data.__dict__} return data def analysis_udp(udp, key="UDP"): try: data_dict = {} try: data_str = udp.data.decode('utf-8') if data_str.startswith('M-SEARCH'): data_list = data_str.strip().split('\n')[1:] for item in data_list: k, v = item.split(':')[0], ':'.join(item.split(':')[1:]) data_dict[k.strip()] = v.strip() else: data_dict = data_str except: pass return { key: { 'sport': udp.sport, 'dport': udp.dport, 'ulen': udp.ulen, 'sum': udp.sum, 'data': data_dict } } except: pass print('analysis_udp udp {}'.format(repr(udp))) return {} def analysis_tcp(tcp, key='TCP'): data = { key: { 'dport': tcp.dport, 'sport': tcp.sport, 'ack': tcp.ack, 'seq': tcp.seq } } try: request = dpkt.http.Request(tcp.data) data['HTTP'] = { 'type': 'request', 'uri': request.uri, 'Method': request.method.upper(), 'Headers': dict(request.headers), 'Body': body_transfer(request.body), 'Data': body_transfer(request.data) } except: pass try: response = dpkt.http.Response(tcp) data['HTTP'] = { 'type': 'response', 'Headers': dict(response.headers), 'Body': body_transfer(response.body), 'Data': body_transfer(response.data) } except: pass try: data_dict = {} data_str = tcp.data.decode('utf-8') if data_str.startswith('M-SEARCH'): data_list = data_str.strip().split('\n')[1:] for item in data_list: k, v = item.split(':')[0], ':'.join(item.split(':')[1:]) data_dict[k.strip()] = v.strip() else: data_dict = data_str if key in data and data_dict: data[key]['Data'] = data_dict except: pass if data: return data print('analysis_tcp tcp {}'.format(repr(tcp))) return {} def analysis_icmp(icmp, key='ICMP'): try: if isinstance(icmp, dpkt.icmp.ICMP): return { 'ICMP': { 'type': icmp.type, 'sum': icmp.sum, 'Data': analysis_pcap(int(time.time()), icmp.data.data) } } else: data_str = '' try: data_str = icmp.data.decode('utf-8') except: pass return { 'ICMP6': { 'type': icmp.type, 'sum': icmp.sum, 'Data': data_str } } except: pass return {} def analysis_pcap2(timestamp, buf): e = Ether(buf) e.show() def get_dpkt(): # 这里是针对单网卡的机子, 多网卡的可以在参数中指定网卡, 例:iface=Qualcomm QCA9377 802.11ac Wireless Adapter dpkt_ = sniff(count = 10) _uuid = uuid.uuid1() filename = f"{_uuid}.pcap" wrpcap(filename, dpkt_) return filename def main(): while True: filename = get_dpkt() with open(filename, "rb") as f: pcap = dpkt.pcap.Reader(f) local_ips = get_local_ip() for timestamp, buf in pcap: res = analysis_pcap(timestamp, buf) # analysis_pcap2(timestamp, buf) print(res) # FileUtil.write_lines('test.txt', json.dumps(res)) os.remove(filename) # FileUtil.flush() if __name__ =='__main__': main()
这篇关于scapy抓包使用的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23增量更新怎么做?-icode9专业技术文章分享
- 2024-11-23压缩包加密方案有哪些?-icode9专业技术文章分享
- 2024-11-23用shell怎么写一个开机时自动同步远程仓库的代码?-icode9专业技术文章分享
- 2024-11-23webman可以同步自己的仓库吗?-icode9专业技术文章分享
- 2024-11-23在 Webman 中怎么判断是否有某命令进程正在运行?-icode9专业技术文章分享
- 2024-11-23如何重置new Swiper?-icode9专业技术文章分享
- 2024-11-23oss直传有什么好处?-icode9专业技术文章分享
- 2024-11-23如何将oss直传封装成一个组件在其他页面调用时都可以使用?-icode9专业技术文章分享
- 2024-11-23怎么使用laravel 11在代码里获取路由列表?-icode9专业技术文章分享
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享