4444drop_files_array = []
4545failed_files_array = []
4646
47+
4748def _create_sqs_client ():
4849 sqs_session = get_session ()
4950 return sqs_session .create_client (
50- 'sqs' ,
51- region_name = AWS_REGION_NAME ,
52- aws_access_key_id = AWS_KEY ,
53- aws_secret_access_key = AWS_SECRET
54- )
51+ 'sqs' ,
52+ region_name = AWS_REGION_NAME ,
53+ aws_access_key_id = AWS_KEY ,
54+ aws_secret_access_key = AWS_SECRET
55+ )
56+
5557
5658def _create_s3_client ():
5759 s3_session = get_session ()
58- boto_config = BotoCoreConfig (region_name = AWS_REGION_NAME , retries = {'max_attempts' : 10 , 'mode' : 'standard' })
60+ boto_config = BotoCoreConfig (region_name = AWS_REGION_NAME , retries = {'max_attempts' : 10 , 'mode' : 'standard' })
5961 return s3_session .create_client (
60- 's3' ,
61- region_name = AWS_REGION_NAME ,
62- aws_access_key_id = AWS_KEY ,
63- aws_secret_access_key = AWS_SECRET ,
64- config = boto_config
65- )
62+ 's3' ,
63+ region_name = AWS_REGION_NAME ,
64+ aws_access_key_id = AWS_KEY ,
65+ aws_secret_access_key = AWS_SECRET ,
66+ config = boto_config
67+ )
68+
6669
6770def customize_event (line ):
68- element = json .loads (line )
71+ try :
72+ element = json .loads (line ) # Attempt to parse the line as JSON
73+ except json .JSONDecodeError as e :
74+ # Log the error and skip this line
75+ logging .error (f"JSON decoding error for line: { line } . Error: { str (e )} " )
76+ return None # Return None so that this line will be ignored during further processing
6977 required_fileds = [
70- "timestamp" , "aip" , "aid" , "EventType" , "LogonType" , "HostProcessType" , "UserPrincipal" , "DomainName" ,
71- "RemoteAddressIP" , "ConnectionDirection" , "TargetFileName" , "LocalAddressIP4" , "IsOnRemovableDisk" ,
72- "UserPrincipal" , "UserIsAdmin" , "LogonTime" , "LogonDomain" , "RemoteAccount" , "UserId" , "Prevalence" ,
73- "CurrentProcess" , "ConnectionDirection" , "event_simpleName" , "TargetProcessId" , "ProcessStartTime" ,
74- "UserName" , "DeviceProductId" , "TargetSHA256HashData" , "SHA256HashData" , "MD5HashData" , "TargetDirectoryName" ,
75- "TargetFileName" , "FirewallRule" , "TaskName" , "TaskExecCommand" , "TargetAddress" , "TargetProcessId" ,
76- "SourceFileName" , "RegObjectName" , "RegValueName" , "ServiceObjectName" , "RegistryPath" , "RawProcessId" ,
77- "event_platform" , "CommandLine" , "ParentProcessId" , "ParentCommandLine" , "ParentBaseFileName" ,
78- "GrandParentBaseFileName" , "RemotePort" , "VolumeDeviceType" , "VolumeName" , "ClientComputerName" , "ProductId" , "ComputerName"
79- ]
78+ "timestamp" , "aip" , "aid" , "EventType" , "LogonType" , "HostProcessType" , "UserPrincipal" , "DomainName" ,
79+ "RemoteAddressIP" , "ConnectionDirection" , "TargetFileName" , "LocalAddressIP4" , "IsOnRemovableDisk" ,
80+ "UserPrincipal" , "UserIsAdmin" , "LogonTime" , "LogonDomain" , "RemoteAccount" , "UserId" , "Prevalence" ,
81+ "CurrentProcess" , "ConnectionDirection" , "event_simpleName" , "TargetProcessId" , "ProcessStartTime" ,
82+ "UserName" , "DeviceProductId" , "TargetSHA256HashData" , "SHA256HashData" , "MD5HashData" , "TargetDirectoryName" ,
83+ "TargetFileName" , "FirewallRule" , "TaskName" , "TaskExecCommand" , "TargetAddress" , "TargetProcessId" ,
84+ "SourceFileName" , "RegObjectName" , "RegValueName" , "ServiceObjectName" , "RegistryPath" , "RawProcessId" ,
85+ "event_platform" , "CommandLine" , "ParentProcessId" , "ParentCommandLine" , "ParentBaseFileName" ,
86+ "GrandParentBaseFileName" , "RemotePort" , "VolumeDeviceType" , "VolumeName" , "ClientComputerName" , "ProductId" , "ComputerName"
87+ ]
8088 required_fields_data = {}
8189 custom_fields_data = {}
8290 for key , value in element .items ():
@@ -90,6 +98,7 @@ def customize_event(line):
9098 event ["custom_fields_message" ] = custom_fields_data_text
9199 return event
92100
101+
93102def sort_files_by_bucket (array_obj ):
94103 array_obj = sorted (array_obj , key = itemgetter ('bucket' ))
95104 sorted_array = []
@@ -100,6 +109,7 @@ def sort_files_by_bucket(array_obj):
100109 sorted_array .append ({'bucket' : key , 'files' : temp_array })
101110 return sorted_array
102111
112+
103113async def main (mytimer : func .TimerRequest ):
104114 global drop_files_array , failed_files_array
105115 drop_files_array .clear ()
@@ -131,7 +141,8 @@ async def main(mytimer: func.TimerRequest):
131141 if 'Messages' in response :
132142 for msg in response ['Messages' ]:
133143 body_obj = json .loads (msg ["Body" ])
134- logging .info ("Got message with MessageId {}. Start processing {} files from Bucket: {}. Path prefix: {}. Timestamp: {}." .format (msg ["MessageId" ], body_obj ["fileCount" ], body_obj ["bucket" ], body_obj ["pathPrefix" ], body_obj ["timestamp" ]))
144+ logging .info ("Got message with MessageId {}. Start processing {} files from Bucket: {}. Path prefix: {}. Timestamp: {}." .format (
145+ msg ["MessageId" ], body_obj ["fileCount" ], body_obj ["bucket" ], body_obj ["pathPrefix" ], body_obj ["timestamp" ]))
135146 await download_message_files (body_obj , session , retrycount = 0 )
136147 logging .info ("Finished processing {} files from MessageId {}. Bucket: {}. Path prefix: {}" .format (body_obj ["fileCount" ], msg ["MessageId" ], body_obj ["bucket" ], body_obj ["pathPrefix" ]))
137148 try :
@@ -151,19 +162,20 @@ async def main(mytimer: func.TimerRequest):
151162
152163 if len (failed_files_array ) > 0 :
153164 logging .info ("list of files that were not processed after defined no. of retries: {}" .format (failed_files_array ))
154-
165+
166+
155167async def process_file (bucket , s3_path , client , semaphore , session , retrycount ):
156168 async with semaphore :
157169 total_events = 0
158170 logging .info ("Start processing file {}" .format (s3_path ))
159171 sentinel = AzureSentinelConnectorAsync (
160- session ,
161- LOG_ANALYTICS_URI ,
162- WORKSPACE_ID ,
163- SHARED_KEY ,
164- LOG_TYPE ,
165- queue_size = MAX_BUCKET_SIZE
166- )
172+ session ,
173+ LOG_ANALYTICS_URI ,
174+ WORKSPACE_ID ,
175+ SHARED_KEY ,
176+ LOG_TYPE ,
177+ queue_size = MAX_BUCKET_SIZE
178+ )
167179 try :
168180 response = await client .get_object (Bucket = bucket , Key = s3_path )
169181 s = ''
@@ -173,36 +185,34 @@ async def process_file(bucket, s3_path, client, semaphore, session, retrycount):
173185 for n , line in enumerate (lines ):
174186 if n < len (lines ) - 1 :
175187 if line :
176- try :
177- event = customize_event (line )
178- except ValueError as e :
179- logging .error ('Error while loading json Event at s value {}. Error: {}' .format (line , str (e )))
180- raise e
188+ event = customize_event (line )
189+ if event is None : # Skip malformed lines
190+ continue
181191 await sentinel .send (event )
182192 s = line
183193 if s :
184- try :
185- event = customize_event (line )
186- except ValueError as e :
187- logging .error ('Error while loading json Event at s value {}. Error: {}' .format (line , str (e )))
188- raise e
194+ event = customize_event (line )
195+ if event is None : # Skip malformed lines
196+ return
189197 await sentinel .send (event )
190198 await sentinel .flush ()
191199 total_events += sentinel .successfull_sent_events_number
192- logging .info ("Finish processing file {}. Sent events: {}" .format (s3_path , sentinel .successfull_sent_events_number ))
200+ logging .info ("Finish processing file {}. Sent events: {}" .format (
201+ s3_path , sentinel .successfull_sent_events_number ))
193202 except Exception as e :
194- if (retrycount <= 0 ):
195- logging .warn ("Processing file {} was failed. Error: {}" .format (s3_path ,e ))
203+ if (retrycount <= 0 ):
204+ logging .warn ("Processing file {} was failed. Error: {}" .format (s3_path , e ))
196205 drop_files_array .append ({'bucket' : bucket , 'path' : s3_path })
197206 else :
198- logging .warn ("Processing file {} was failed after defined no. of retries. Error: {}" .format (s3_path ,e ))
207+ logging .warn ("Processing file {} was failed after defined no. of retries. Error: {}" .format (s3_path , e ))
199208 failed_files_array .append ({'bucket' : bucket , 'path' : s3_path })
200-
209+
201210
202211async def download_message_files (msg , session , retrycount ):
203212 semaphore = asyncio .Semaphore (MAX_CONCURRENT_PROCESSING_FILES )
204213 async with _create_s3_client () as client :
205214 cors = []
206215 for s3_file in msg ['files' ]:
207- cors .append (process_file (msg ['bucket' ], s3_file ['path' ], client , semaphore , session , retrycount ))
216+ cors .append (process_file (
217+ msg ['bucket' ], s3_file ['path' ], client , semaphore , session , retrycount ))
208218 await asyncio .gather (* cors )
0 commit comments