|
2 | 2 | This module is used to send or broadcast UDP packets with spoofed IP addresses. |
3 | 3 | """ |
4 | 4 |
|
5 | | -import struct |
6 | | -import random |
7 | 5 | import socket |
8 | 6 | from typing import List, Optional, Tuple |
9 | 7 |
|
10 | 8 | from scapy.all import send as scapy_send |
11 | | -from scapy.all import IP, UDP, Raw, RandShort # type: ignore |
| 9 | +from scapy.all import IP, UDP, Raw, RandShort # pylint: disable=no-name-in-module # type: ignore |
12 | 10 |
|
13 | 11 | try: |
14 | 12 | from Jiyu_udp_attack.ip_analyze import ip_analyze |
15 | 13 | except ImportError: |
16 | 14 | from ip_analyze import ip_analyze |
17 | 15 |
|
18 | 16 |
|
19 | | -def calculate_checksum(data: bytes) -> int: |
20 | | - """ |
21 | | - Calculates the checksum for the given data. |
22 | | -
|
23 | | - Args: |
24 | | - data (bytes): The data for which to calculate the checksum. |
25 | | -
|
26 | | - Returns: |
27 | | - int: The calculated checksum. |
28 | | - """ |
29 | | - data = data + b"\x00" * (len(data) % 2) # Ensure even length |
30 | | - |
31 | | - total = 0 |
32 | | - for word in struct.unpack("!" + "H" * (len(data) // 2), data): |
33 | | - total += word |
34 | | - if total > 0xFFFF: |
35 | | - total = (total & 0xFFFF) + (total >> 16) |
36 | | - |
37 | | - return ~total & 0xFFFF |
38 | | - |
39 | | - |
40 | | -def create_raw_udp_packet( |
41 | | - src_ip: str, |
42 | | - src_port: Optional[int], |
43 | | - dst_ip: str, |
44 | | - dst_port: int, |
45 | | - payload: bytes, |
46 | | - *, |
47 | | - ip_id: Optional[int] = None, |
48 | | -) -> bytes: |
49 | | - """ |
50 | | - Creates a raw UDP packet with a spoofed source IP address. |
51 | | -
|
52 | | - Args: |
53 | | - src_ip (str): The source IP address to spoof. |
54 | | - dst_ip (str): The destination IP address. |
55 | | - dst_port (int): The destination port number. |
56 | | - payload (bytes): The data payload to include in the packet. |
57 | | -
|
58 | | - Returns: |
59 | | - bytes: The constructed raw UDP packet. |
60 | | - """ |
61 | | - # 1. Set IP header parameters |
62 | | - ip_ver = 4 |
63 | | - ip_ihl = 5 # 5 * 4 = 20 bytes header |
64 | | - ip_tos = 0 |
65 | | - ip_total_len = 20 + 8 + len(payload) # IP header + UDP header + data |
66 | | - ip_id = random.randint(0, 65535) if ip_id is None else ip_id |
67 | | - ip_frag_off = 0 |
68 | | - ip_ttl = 64 |
69 | | - ip_proto = socket.IPPROTO_UDP |
70 | | - ip_check = 0 # Initial value is 0, will be calculated later |
71 | | - |
72 | | - # 2. Build IP header (initial checksum is 0) |
73 | | - ip_header = struct.pack( |
74 | | - "!BBHHHBBH4s4s", |
75 | | - (ip_ver << 4) | ip_ihl, |
76 | | - ip_tos, |
77 | | - ip_total_len, |
78 | | - ip_id, |
79 | | - ip_frag_off, |
80 | | - ip_ttl, |
81 | | - ip_proto, |
82 | | - ip_check, |
83 | | - socket.inet_aton(src_ip), |
84 | | - socket.inet_aton(dst_ip), |
85 | | - ) |
86 | | - |
87 | | - # 3. Calculate IP header checksum |
88 | | - ip_check = calculate_checksum(ip_header) |
89 | | - |
90 | | - # 4. Rebuild IP header with correct checksum |
91 | | - ip_header = struct.pack( |
92 | | - "!BBHHHBBH4s4s", |
93 | | - (ip_ver << 4) | ip_ihl, |
94 | | - ip_tos, |
95 | | - ip_total_len, |
96 | | - ip_id, |
97 | | - ip_frag_off, |
98 | | - ip_ttl, |
99 | | - ip_proto, |
100 | | - ip_check, |
101 | | - socket.inet_aton(src_ip), |
102 | | - socket.inet_aton(dst_ip), |
103 | | - ) |
104 | | - |
105 | | - # 5. Build UDP header (initial checksum is 0) |
106 | | - if src_port is None: |
107 | | - src_port = random.randint(1024, 65535) # Random source port |
108 | | - udp_length = 8 + len(payload) |
109 | | - udp_header = struct.pack("!HHHH", src_port, dst_port, udp_length, 0) # Initial checksum is 0 |
110 | | - |
111 | | - # 6. Create pseudo header for UDP checksum calculation |
112 | | - pseudo_header = struct.pack("!4s4sBBH", socket.inet_aton(src_ip), socket.inet_aton(dst_ip), 0, ip_proto, udp_length) |
113 | | - |
114 | | - # 7. Calculate UDP checksum (including pseudo header) |
115 | | - udp_check = calculate_checksum(pseudo_header + udp_header + payload) |
116 | | - |
117 | | - # 8. Rebuild UDP header with correct checksum |
118 | | - udp_header = struct.pack("!HHHH", src_port, dst_port, udp_length, udp_check) |
119 | | - |
120 | | - # 9. Combine complete packet |
121 | | - return ip_header + udp_header + payload |
122 | | - |
123 | | - |
124 | 17 | def send_packet( |
125 | 18 | src_ip: Optional[str], |
126 | 19 | src_port: Optional[int], |
|
0 commit comments