-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvt_hashquery.py
More file actions
130 lines (109 loc) · 5.11 KB
/
vt_hashquery.py
File metadata and controls
130 lines (109 loc) · 5.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# WIP
import os
import re
import time
import requests
import PyPDF2
from datetime import datetime
from tabulate import tabulate
from collections import defaultdict, Counter
# Function to extract text from a PDF file
def extract_text_from_pdf(file_path):
text = ""
with open(file_path, "rb") as file:
reader = PyPDF2.PdfReader(file)
for page in reader.pages:
text += page.extract_text()
return text
# Function to extract hashes from text using regex and count occurrences
def extract_hashes_with_count(text):
pattern = r'\b[A-Fa-f0-9]{32}\b|\b[A-Fa-f0-9]{40}\b|\b[A-Fa-f0-9]{64}\b'
matches = re.findall(pattern, text)
hash_counts = defaultdict(int)
for match in matches:
hash_counts[match] += 1
return hash_counts
# Function to summarize VirusTotal results
def summarize_verdicts(vt_data):
verdicts = Counter()
if "data" in vt_data and "attributes" in vt_data["data"]:
if "last_analysis_results" in vt_data["data"]["attributes"]:
for result in vt_data["data"]["attributes"]["last_analysis_results"].values():
verdicts[result["category"]] += 1
return ", ".join([f"{key}: {count}" for key, count in verdicts.items()])
# Function to look up hashes on VirusTotal
def lookup_virustotal(hash_list, api_key):
results = {}
verbose_results = []
url = "https://www.virustotal.com/api/v3/files/"
for i in range(0, len(hash_list), 4):
batch = hash_list[i:i + 4]
for hash_value in batch:
try:
headers = {"x-apikey": api_key}
response = requests.get(url + hash_value, headers=headers)
if response.status_code == 200:
data = response.json()
results[hash_value] = summarize_verdicts(data)
verbose_results.append((hash_value, data))
else:
results[hash_value] = f"Error: {response.status_code}"
verbose_results.append((hash_value, f"Error: {response.status_code}"))
except Exception as e:
results[hash_value] = f"Error: {str(e)}"
verbose_results.append((hash_value, f"Error: {str(e)}"))
if i + 4 < len(hash_list):
time.sleep(65) # Wait 65 seconds for the next batch
return results, verbose_results
# Main script
def main():
input_folder = ".INPUT"
output_folder = ".OUTPUT"
api_key = "YOUR_VT_API_KEY_HERE" # Replace with your VirusTotal API key
if not os.path.exists(output_folder):
os.makedirs(output_folder)
all_hashes = defaultdict(lambda: defaultdict(int))
# Read all TXT and PDF files from the .INPUT folder
for file_name in sorted(os.listdir(input_folder)): # Sort file names to ensure correct order
file_path = os.path.join(input_folder, file_name)
if file_name.lower().endswith(".txt"):
with open(file_path, "r") as file:
hashes = extract_hashes_with_count(file.read())
for hash_value, count in hashes.items():
all_hashes[file_name][hash_value] += count
elif file_name.lower().endswith(".pdf"):
text = extract_text_from_pdf(file_path)
hashes = extract_hashes_with_count(text)
for hash_value, count in hashes.items():
all_hashes[file_name][hash_value] += count
# Collect all unique hashes for VirusTotal lookup
unique_hashes = set()
for file_hashes in all_hashes.values():
unique_hashes.update(file_hashes.keys())
# Perform VirusTotal lookups
vt_results, verbose_results = lookup_virustotal(list(unique_hashes), api_key)
# Prepare the results tables
table = [["File", "Hash", "Count", "Result"]]
for file_name in sorted(all_hashes.keys()): # Sort file names to ensure correct order
for hash_value, count in all_hashes[file_name].items():
result = vt_results.get(hash_value, "No result found")
table.append([file_name, hash_value, count, result])
# Organize the output files into a new timestamped folder
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_subfolder = os.path.join(output_folder, timestamp)
os.makedirs(output_subfolder, exist_ok=True)
# Save the concise results to a TXT file
concise_file_path = os.path.join(output_subfolder, f"VT_RESULTS_{timestamp}.TXT")
with open(concise_file_path, "w") as output_file:
output_file.write(tabulate(table, headers="firstrow"))
# Save the verbose results to a TXT file
verbose_file_path = os.path.join(output_subfolder, f"VERBOSE_VT_RESULTS_{timestamp}.TXT")
with open(verbose_file_path, "w") as verbose_file:
for hash_value, data in verbose_results:
verbose_file.write(f"Hash: {hash_value}\n")
verbose_file.write(f"Result: {data}\n")
verbose_file.write("=" * 80 + "\n")
# Display the concise results table
print(tabulate(table, headers="firstrow"))
if __name__ == "__main__":
main()