|
| 1 | +import socket |
| 2 | + |
| 3 | +def build_dns_query(domain): |
| 4 | + """Constructs a raw DNS query for the given domain.""" |
| 5 | + # Generate a unique Transaction ID (16-bit) |
| 6 | + transaction_id = b"\xaa\xbb" |
| 7 | + |
| 8 | + # Flags: Standard Query (0x0100) |
| 9 | + flags = b"\x01\x00" |
| 10 | + |
| 11 | + # Questions: 1 |
| 12 | + qdcount = b"\x00\x01" |
| 13 | + |
| 14 | + # Answer RRs, Authority RRs, Additional RRs: 0 |
| 15 | + ancount = b"\x00\x00" |
| 16 | + nscount = b"\x00\x00" |
| 17 | + arcount = b"\x00\x00" |
| 18 | + |
| 19 | + # Header section |
| 20 | + header = transaction_id + flags + qdcount + ancount + nscount + arcount |
| 21 | + |
| 22 | + # Encode domain name in DNS format (e.g., "example.com" → "\x07example\x03com\x00") |
| 23 | + qname = b"".join(len(part).to_bytes(1, "big") + part.encode() for part in domain.split(".")) + b"\x00" |
| 24 | + |
| 25 | + # QTYPE: A (Host Address) (0x0001), QCLASS: IN (Internet) (0x0001) |
| 26 | + qtype = b"\x00\x01" |
| 27 | + qclass = b"\x00\x01" |
| 28 | + |
| 29 | + # Final query |
| 30 | + return header + qname + qtype + qclass |
| 31 | + |
| 32 | +def parse_dns_response(response): |
| 33 | + """Parses a raw DNS response and extracts the IP addresses.""" |
| 34 | + header = response[:12] # First 12 bytes are the DNS header |
| 35 | + qname_end = response[12:].index(b"\x00") + 13 # Locate end of QNAME |
| 36 | + answer_section = response[qname_end + 4:] # Skip QTYPE and QCLASS |
| 37 | + |
| 38 | + ip_addresses = [] |
| 39 | + while answer_section: |
| 40 | + if len(answer_section) < 16: |
| 41 | + break |
| 42 | + |
| 43 | + # Extract the response fields |
| 44 | + ip_part = answer_section[-4:] # Last 4 bytes should be the IPv4 address |
| 45 | + ip_address = ".".join(str(b) for b in ip_part) |
| 46 | + ip_addresses.append(ip_address) |
| 47 | + |
| 48 | + answer_section = answer_section[:-16] # Move to the next answer (if any) |
| 49 | + |
| 50 | + return ip_addresses if ip_addresses else "No IP found" |
| 51 | + |
| 52 | +def dns_query_udp(domain, dns_server="8.8.8.8", port=53): |
| 53 | + """Sends a raw DNS query over UDP and parses the response.""" |
| 54 | + query = build_dns_query(domain) |
| 55 | + |
| 56 | + # Create a UDP socket |
| 57 | + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| 58 | + sock.settimeout(2) # Set a timeout |
| 59 | + |
| 60 | + try: |
| 61 | + # Send query to the DNS server |
| 62 | + sock.sendto(query, (dns_server, port)) |
| 63 | + |
| 64 | + # Receive response |
| 65 | + response, _ = sock.recvfrom(512) # DNS responses are usually < 512 bytes |
| 66 | + except socket.timeout: |
| 67 | + return "Timeout: No response from server" |
| 68 | + finally: |
| 69 | + sock.close() |
| 70 | + |
| 71 | + return parse_dns_response(response) |
| 72 | + |
| 73 | +# Example usage |
| 74 | +print(dns_query_udp("example.com")) |
0 commit comments