|
| 1 | +# SPDX-FileCopyrightText: 2022 Martin Stephens |
| 2 | +# |
| 3 | +# SPDX-License-Identifier: MIT |
| 4 | +""" |
| 5 | +Extract unique responses from a Wireshark JSON export and write a file that includes |
| 6 | +enough information to test the DNS response parser. Also writes a file containing the |
| 7 | +domain names for running tests on microcontrollers. |
| 8 | +
|
| 9 | +The wireshark JSON file should contain only DNS queries (port 53) and include the raw |
| 10 | +response data. |
| 11 | +""" |
| 12 | +import json |
| 13 | +from typing import List |
| 14 | + |
| 15 | +READ_FILE_NAME = "wireshark_dns.json" |
| 16 | +WRITE_FILE_NAME = "dns_parser_test_data" |
| 17 | + |
| 18 | +with open(READ_FILE_NAME, "r") as f: |
| 19 | + dns_records: List[dict] = json.load(f) |
| 20 | +print(f"DNS Records: {len(dns_records)}") |
| 21 | + |
| 22 | +# Filter out the DNS queries. |
| 23 | +responses_only = [] |
| 24 | +for dns_record in dns_records: |
| 25 | + if ( |
| 26 | + dns_record["_source"]["layers"]["dns"]["dns.flags_tree"]["dns.flags.response"] |
| 27 | + == "1" |
| 28 | + ): |
| 29 | + responses_only.append(dns_record) |
| 30 | +print(f"DNS Responses: {len(responses_only)}") |
| 31 | + |
| 32 | +# Filter out the IPv6 responses. |
| 33 | +type_a_responses = [] |
| 34 | +for response in responses_only: |
| 35 | + if "AAAA" not in list(response["_source"]["layers"]["dns"]["Queries"].keys())[0]: |
| 36 | + type_a_responses.append(response) |
| 37 | +print(f"Type A responses: {len(type_a_responses)}") |
| 38 | + |
| 39 | +# Extract unique repsonses. |
| 40 | +unique_urls = set() |
| 41 | +unique_responses = [] |
| 42 | +for response in type_a_responses: |
| 43 | + query_key = list(response["_source"]["layers"]["dns"]["Queries"].keys())[0] |
| 44 | + if ( |
| 45 | + response["_source"]["layers"]["dns"]["Queries"][query_key]["dns.qry.name"] |
| 46 | + not in unique_urls |
| 47 | + ): |
| 48 | + unique_urls.add( |
| 49 | + response["_source"]["layers"]["dns"]["Queries"][query_key]["dns.qry.name"] |
| 50 | + ) |
| 51 | + unique_responses.append(response) |
| 52 | +print(f"Unique responses: {len(unique_responses)}") |
| 53 | + |
| 54 | +# Create a dictionary with the required fields. |
| 55 | +export_responses = [] |
| 56 | +for response in unique_responses: |
| 57 | + query_key = list(response["_source"]["layers"]["dns"]["Queries"].keys())[0] |
| 58 | + export_response = { |
| 59 | + "query_id": response["_source"]["layers"]["dns"]["dns.id"], |
| 60 | + "query_name": response["_source"]["layers"]["dns"]["Queries"][query_key][ |
| 61 | + "dns.qry.name" |
| 62 | + ], |
| 63 | + "query_name_length": response["_source"]["layers"]["dns"]["Queries"][query_key][ |
| 64 | + "dns.qry.name.len" |
| 65 | + ], |
| 66 | + } |
| 67 | + try: |
| 68 | + answer_keys = list(response["_source"]["layers"]["dns"]["Answers"].keys()) |
| 69 | + for answer_key in answer_keys: |
| 70 | + if "type A" in answer_key: |
| 71 | + export_response["answer_IPv4"] = response["_source"]["layers"]["dns"][ |
| 72 | + "Answers" |
| 73 | + ][answer_key]["dns.a"] |
| 74 | + break |
| 75 | + except KeyError: |
| 76 | + export_response["answer_IPv4"] = None |
| 77 | + export_response["udp_packet"] = response["_source"]["layers"]["udp"]["udp.payload"] |
| 78 | + export_responses.append(export_response) |
| 79 | +print(f"Responses to export: {len(export_responses)}") |
| 80 | + |
| 81 | +# Write a JSON file for testing the parser on a computer. |
| 82 | +print("Writing JSON file…") |
| 83 | +with open(f"{WRITE_FILE_NAME}.json", "w") as f: |
| 84 | + json.dump(export_responses, f) |
| 85 | + |
| 86 | +# Write a text file with a domain name on each line for testing on a microcontroller. |
| 87 | +print("Writing text file…") |
| 88 | +with open(f"{WRITE_FILE_NAME}.txt", "w") as f: |
| 89 | + f.writelines([f"{response['query_name']}\n" for response in export_responses]) |
| 90 | + |
| 91 | +print("Done.") |
0 commit comments