最近半年个人工作,生活变动比较大,所以不太活跃,目前正在调整中~
为什么实现简易版用户态TCP:
为了给我的资产监控添加用户态tcp扫描功能,加快扫描速度,多快好省
实现构造畸报文的方式绕过网络设备,满足一些奇怪的需求。tcp属于内核态,不会提供让我们胡作非为的功能。
本篇文章主要分为如下几个部分:
tcp/ip数据包的构建
实现tcp的基础以及http传输的原理
简单介绍种绕过姿势
当然,目前工具暂时不开源,因为还没有完善,待后面完善后再开源。目的是可以无损代替python中的tcp模块
既然我们决定实现用户态的tcp,那么我们需要构造tcp/ip的数据包。关于如何使用libpcap发包,请参考上一篇文章。
学过计算机网络的同学都知道,发送一段网络报文,首先是以太网首部,随后紧跟ip报文,再是tcp或者udp等运输层数据报文。最终才是数据,如图
所以我们需要根据协议,从以太网帧开始构建数据报文。在这里需要使用python提供的struct模块,将python的数据类型转换为bytes数组。因为以太网帧并不需要校验和,所以构造相对简单。
在这里我们并不需要考虑VLAN(虚拟局域网),因为在我们的运行环境中,交换机都配置为Access模式,很少有配置为Trunk或者Hybrid模式。当然,如果有其他特殊需求,例如跨VLAN等,可以考虑在构造以太网帧中添加vlan。
注意,以太网帧并不提供校验等功能。如果发包频率过快,会导致上层设备丢弃报文。在二十年前,icmp发送源抑制报文,但是现在该报文已被废除。所以masscan的发包速率不可过快。
既然我们决定从数据链路层构建报文,我们也需要处理arp请求。我们在接收到arp请求后,假如请求的是我们自己的协议地址,那么我们需要构建arp相应。如果我们的用户态tcp程序的ip地址与系统配置的ip地址相同,那么可以忽略arp请求响应。
@classmethod def unpack(cls, px): point = 0 # 硬件地址类型,网络层协议类型,硬件地址长度,网络层协议地址长度 # 所以我们目前不支持ipv6 hadware_type, protocol_type, hardware_addr_len, protocol_addr_len = struct.unpack("!HHBB", px[point:point + 6]) point += 6 # ipv4 的arp请求 if protocol_type == Ether_Protocol.IPV4: oper, = struct.unpack("!H", px[point:point + 2]) point += 2 sender_mac_addr, = struct.unpack(f"!{hardware_addr_len}s", px[point:point + hardware_addr_len]) point += hardware_addr_len sender_proto_addr, = struct.unpack(f"!{protocol_addr_len}s", px[point:point + protocol_addr_len]) point += protocol_addr_len target_mac_addr, = struct.unpack(f"!{hardware_addr_len}s", px[point:point + hardware_addr_len]) point += hardware_addr_len target_proto_addr, = struct.unpack(f"!{protocol_addr_len}s", px[point:point + protocol_addr_len]) point += protocol_addr_len
这时候有的同学会问,那岂不是我们只要接受到特定网卡mac地址的请求,我们也可以胡乱回应。理论上来讲是这样,但是要具体分析物理层。如果物理层是WLAN的话,AP是不会给你的网卡发送不属于你mac地址的数据报文。所以mac地址尽量不要乱改。
ip数据包的格式如下
""" 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |Version| IHL |Type of Service| Total Length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Identification |Flags| Fragment Offset | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Time to Live | Protocol | Header Checksum | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Source Address | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Destination Address | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Options | Padding | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ IP报文格式 1. 4位IP-version 4位IP头长度 8位服务类型 16位报文总长度 2. 16位标识符 3位标记位 13位片偏移 暂时不关注此行 3. 8位TTL 8位协议 16位头部校验和 4. 32位源IP地址 5. 32位目的IP地址 """
在网络中,ip,tcp,udp的校验和计算公式都一致,代码如下。
def checksum(self, raw_packet): chksum = 0 if raw_packet%2: # 说明长度是奇数,需要在末尾padding一个byte的0 raw_packet += b'\x00' for i in range(0, len(raw_tcp), 2): chksum += int.from_bytes(raw_packet[i:i + 2], "big", signed=False) chksum = (chksum >> 16) + (chksum & 0xffff) chksum = chksum + (chksum >> 16) return ~chksum & 0xffff
最终代码如下
def pack(self): chksum = 0 raw = struct.pack("!BBHHH", self.version << 4 | self.ipv4_header, 0xc0, self.tol, self.identification, self.flag << 13 | self.offset) raw += struct.pack("!BBHII", self.ttl, self.protocol, chksum, ip2int(self.src_ip), ip2int(self.dst_ip)) chksum = self.checksum(raw) raw = struct.pack("!BBHHH", self.version << 4 | self.ipv4_header, 0xc0, self.tol, self.identification, self.flag << 13 | self.offset) raw += struct.pack("!BBHII", self.ttl, self.protocol, chksum, ip2int(self.src_ip), ip2int(self.dst_ip)) return raw
包结构如下
最终代码如下
def pack(self): """ 打包tcp :return: """ chksum = 0 raw_tcp = struct.pack('>HHLLBBHHH', self.src_port, self.dst_port, self.seq_num, self.ack_num, self.data_offset, self.flag, self.win_size, chksum, self.urg_pointer) raw_tcp += self.data chksum = self.chksum(raw_tcp) return struct.pack('>HHLLBBHHH', self.src_port, self.dst_port, self.seq_num, self.ack_num, self.data_offset, self.flag, self.win_size, chksum, self.urg_pointer) + self.data
当然,如果想更详细地了解tcp的状态机,请参考Embedded Xinu操作系统的源码,该源码简单易懂,链接如下
https://github.com/xinu-os/xinu/blob/28a035ae86ba2cd38b7c07f4d35fe8115ad3078d/device/tcp/tcpRecv.c
在这里主要介绍一下seq与ack以及几种标志位。
在建立好tcp连接后,我们就可以发送数据了。这时候标志位需要设置为ACK。seq序列号为上一次发送数据包的seq + 上次发送数据的长度。如下代码
tcp = TcpPkt(self.port_me, self.dst_port, self.seq_num, self.ack_num, TcpFlag.ACK) tcp.data = data self.eth_pkt.set_transport(tcp) rawsock_send_ipv4(pcap, self.eth_pkt.pack()) self.seq_num += len(data)
对于http这种协议,首先发送http请求头,在请求头中注明请求体的长度,也就是content-length。发送完http请求头后,在最后一条tcp报文中需要设置tcp ACK和PSH。PSH标志位告诉上层应用可以接受消息了。
当然对于http chunk这种编码另说。这时候上层应用再根据content-length标注的长度继续接收报文。
在接收到tcp的报文,需要回复ACK,当然这个ACK报文可以不需要携带数据。并且seq也不需要+1。ack的长度为接收到报文的seq与接收报文的数据长度。
elif tcp_session.state == State.ESTABLISHED: if recv_tcp.transport.data: tcp_session.ack_num = recv_tcp.transport.seq_num + len(recv_tcp.transport.data) tcp_session.data += recv_tcp.transport.data tcp = TcpPkt(tcp_session.port_me, tcp_session.dst_port, tcp_session.seq_num, tcp_session.ack_num, TcpFlag.ACK) tcp_session.eth_pkt.set_transport(tcp) rawsock_send_ipv4(pcap, tcp_session.eth_pkt.pack()) if recv_tcp.transport.flag & TcpFlag.PSH: tcp_session.push = True
一般情况下,一条http请求或者http响应,都在一个包中。在上一节我们可知,每个包最大可以1420个字节。这足够容纳很多内容了。
这也就是为什么很多安全设备不愿重组包的原因
操作系统默认会将一次请求塞进一个tcp保重,这样安全设备只检查每一个包即可完成拦截任务。这样既节省了资源,又完成任务。这也就是http chunk可以绕过WAF的原因。
在高速报文的请求中,防火墙很难追踪每一条tcp会话,硬件不允许。
那么我们在发tcp包的时候,只需要控制每个包发送的长度,分多次发,最后一个数据包发送PSH&ACK即可。最终实现截图
这个时候我们再加入乱序发包的功能,延迟发包的功能,就可以更方便地绕过安全设备。安全设备即使重组tcp回话,假如每个包都延迟到达,这个延迟时间刚好处于安全设备重组TCP会话的等待延迟与系统重组的延迟时间之间,就可以达到绕过安全设备的目的。
这时我们已经达到发包实现分块传输,但是怎么让对方设备的回包也实现分块传输呢。这时候我们需要借助tcp的window滑动窗口机制。
TCP使用“窗口”,意味着发送方发送一个或更多数据包,接收方就会响应一个或所有数据包。当接收方开始一个TCP连接时,自身会打开一个接收缓存区作为临时存储,之后再交给程序处理。
当接收方发送一个ACK响应(即对收到数据的响应)时,接收方会告诉发送者下一次我能接收多少数据,我们管这个叫窗口大小(window size)**。**一般这个窗口大小就是接收方缓冲区的大小。
我们只需要将tcp的window设置的足够小,就可以实现对端设备响应的分块,如图
image.png
同样,我们可以启动延迟确认数据等构造畸形请求的方式以干扰安全设备重组tcp会话的功能。
QSNM是否进行流重组,以条件编译确定__QNSM_STREAM_REASSEMBLE,默认配置中是不进行TCP流重组的
同一个流的TCP都会进行流重组,上下行都在一个缓存队列中,最大支持8个报文,且不考虑重叠部分
重组方法基于 hashmap + 双向链表
TCP流缓存删除方式:1. 老化 2. 无需进一步解析 3. 命中规则
具体参考
https://zhuanlan.zhihu.com/p/393121010
当然绕过姿势还很多,只要我们实现了自己的用户态TCP,就可以胡作非为~