22# Module for Malware Analysis
33import io
44import logging
5+ import os
56import re
67from pathlib import Path
78from socket import (
@@ -107,13 +108,28 @@ def gelocation(self):
107108 self .IP2Loc .close ()
108109
109110 def malware_check (self ):
111+ """Check domains against malware database."""
110112 try :
111113 mal_db = self .malwaredomainlist
114+
115+ if not mal_db .exists ():
116+ logger .warning ('Malware domain list not found: %s' , mal_db )
117+ return
118+
119+ if not os .access (mal_db , os .R_OK ):
120+ logger .error ('Insufficient permissions to read: %s' , mal_db )
121+ return
122+
112123 with io .open (mal_db ,
113124 mode = 'r' ,
114125 encoding = 'utf8' ,
115126 errors = 'ignore' ) as flip :
116127 entry_list = flip .readlines ()
128+
129+ if not entry_list :
130+ logger .warning ('Malware domain database is empty' )
131+ return
132+
117133 for entry in entry_list :
118134 enlist = entry .split ('","' )
119135 if len (enlist ) > 5 :
@@ -129,17 +145,38 @@ def malware_check(self):
129145 and (len (dmn_url ) > 1 ))
130146 or details_dict ['ip' ].startswith (domain )):
131147 self .result [domain ] = details_dict
148+ except FileNotFoundError :
149+ logger .error ('Malware domain database file not found: %s' , mal_db )
150+ except PermissionError :
151+ logger .error ('Permission denied accessing malware database: %s' , mal_db )
152+ except IOError :
153+ logger .exception ('I/O error reading malware database' )
132154 except Exception :
133155 logger .exception ('[ERROR] Performing Malware check' )
134156
135157 def maltrail_check (self ):
158+ """Check domains against maltrail database."""
136159 try :
137160 mal_db = self .maltrail
161+
162+ if not mal_db .exists ():
163+ logger .warning ('Maltrail database not found: %s' , mal_db )
164+ return
165+
166+ if not os .access (mal_db , os .R_OK ):
167+ logger .error ('Insufficient permissions to read: %s' , mal_db )
168+ return
169+
138170 with io .open (mal_db ,
139171 mode = 'r' ,
140172 encoding = 'utf8' ,
141173 errors = 'ignore' ) as flip :
142174 entry_list = flip .read ().splitlines ()
175+
176+ if not entry_list :
177+ logger .warning ('Maltrail database is empty' )
178+ return
179+
143180 for domain in self .domainlist :
144181 if domain in entry_list :
145182 self .result [domain ] = {
@@ -148,36 +185,71 @@ def maltrail_check(self):
148185 'desc' : 'Malicious Domain tagged by Maltrail' ,
149186 'bad' : 'yes' ,
150187 }
188+ except FileNotFoundError :
189+ logger .error ('Maltrail database file not found: %s' , mal_db )
190+ except PermissionError :
191+ logger .error ('Permission denied accessing maltrail database: %s' , mal_db )
192+ except IOError :
193+ logger .exception ('I/O error reading maltrail database' )
151194 except Exception :
152195 logger .exception ('[ERROR] Performing Maltrail Check' )
153196
154197 def update (self ):
155- if is_internet_available ():
156- self .update_maltrail_db ()
157- else :
158- logger .warning ('Internet not available. '
159- 'Skipping Malware Database Update.' )
198+ """Update malware databases with connection checks."""
199+ try :
200+ if is_internet_available ():
201+ logger .info ('Checking for malware database updates' )
202+ self .update_maltrail_db ()
203+ else :
204+ logger .warning ('Internet not available. '
205+ 'Skipping Malware Database Update.' )
206+ except Exception :
207+ logger .exception ('Failed to update malware databases' )
160208
161209 def scan (self , checksum , urls ):
210+ """Scan method."""
162211 if not settings_enabled ('DOMAIN_MALWARE_SCAN' ):
163212 logger .info ('Domain Malware check disabled in settings' )
164213 return self .result
214+
165215 msg = 'Performing Malware check on extracted domains'
166216 urls = [u .lower () for u in urls ]
167217 append_scan_status (checksum , msg )
168- self .domainlist = get_domains (urls )
169- if self .domainlist :
218+
219+ try :
220+ self .domainlist = get_domains (urls )
221+ if not self .domainlist :
222+ logger .info ('No domains found to analyze' )
223+ return self .result
224+
225+ logger .info ('Analyzing %d domains' , len (self .domainlist ))
170226 self .update ()
171227 self .malware_check ()
172228 self .maltrail_check ()
173229 self .gelocation ()
230+
231+ # Log analysis results
232+ malicious_count = sum (1 for domain in self .result .values ()
233+ if domain .get ('bad' ) == 'yes' )
234+ logger .info (
235+ 'Malware analysis completed: %d malicious domains found' ,
236+ malicious_count )
237+
238+ except Exception as exp :
239+ logger .exception ('Critical error during domain malware check' )
240+ append_scan_status (checksum , 'Domain malware check failed' , repr (exp ))
241+
174242 return self .result
175243
176244
177245# Helper Functions
178246
179247def verify_domain (checkeddom ):
248+ """Verify domain."""
180249 try :
250+ if not checkeddom :
251+ return False
252+
181253 if (len (checkeddom ) > 2
182254 and '.' in checkeddom
183255 and (checkeddom .endswith ('.' ) is False
@@ -187,6 +259,7 @@ def verify_domain(checkeddom):
187259 return False
188260 except Exception :
189261 logger .exception ('[ERROR] Verifying Domain' )
262+ return False
190263
191264
192265def get_netloc (url ):
@@ -202,24 +275,37 @@ def get_netloc(url):
202275 return domain
203276 except Exception :
204277 logger .exception ('[ERROR] Extracting Domain form URL' )
278+ return None
205279
206280
207281def sanitize_domain (domain ):
208282 """Sanitize domain to be RFC1034 compliant."""
209- domain = domain .split ('_' )[0 ]
210- domain = re .sub (r'[^\w^\.^\-]' , '' , domain )
211- if domain .startswith ('-' ):
212- domain = sanitize_domain (domain [1 :])
213- elif domain .endswith ('-' ):
214- domain = sanitize_domain (domain [:- 1 ])
283+ try :
284+ if not domain :
285+ return domain
286+
287+ domain = domain .split ('_' )[0 ]
288+ domain = re .sub (r'[^\w^\.^\-]' , '' , domain )
289+ if domain .startswith ('-' ):
290+ domain = sanitize_domain (domain [1 :])
291+ elif domain .endswith ('-' ):
292+ domain = sanitize_domain (domain [:- 1 ])
293+ except Exception :
294+ logger .exception ('[ERROR] Sanitizing domain' )
215295 return domain
216296
217297
218298def get_domains (urls ):
219299 """Get Domains."""
300+ domains = set ()
220301 try :
221- domains = set ()
302+ if not urls :
303+ return domains
304+
222305 for url in urls :
306+ if not url :
307+ continue
308+
223309 parse_uri = urlparse (url )
224310 if not parse_uri .scheme :
225311 url = '//' + url
@@ -228,6 +314,6 @@ def get_domains(urls):
228314 '{uri.hostname}' .format (uri = parse_uri ))
229315 if verify_domain (domain ):
230316 domains .add (domain )
231- return domains
232317 except Exception :
233318 logger .exception ('[ERROR] Extracting Domain form URL' )
319+ return domains
0 commit comments