Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,12 @@ async def main(msg: func.QueueMessage) -> None:
# requireRaw : bool
def customize_event(line, eventsSchemaMappingDict, requiredFieldsMappingDict, requireRaw):

element = json.loads(line)
try:
element = json.loads(line) # Attempt to parse the line as JSON
except json.JSONDecodeError as e:
# Log the error and skip this line
logging.error(f"JSON decoding error for line: {line}. Error: {str(e)}")
return None, None
if "event_simpleName" in element and element["event_simpleName"] in eventsSchemaMappingDict:
schema = eventsSchemaMappingDict[element["event_simpleName"]]
else:
Expand Down Expand Up @@ -201,13 +206,14 @@ async def process_file_primary_CLv2(bucket, s3_path, client, session, eventsSche
if line:
try:
normalizedEvent, customizedEvent = customize_event(line, eventsSchemaMappingDict, requiredFieldsMappingDict, REQUIRE_RAW)
except ValueError as e:
logging.error('Error while loading json Event at s value {}. Error: {}'.format(line, str(e)))
raise e
if normalizedEvent is None: # Skip this line if it's invalid
continue
except Exception as e:
logging.error(e)
await normalizedSentinelHelperCollection.sendData(normalizedEvent)
if REQUIRE_RAW:
logging.error(f"Error while processing line: {line}. Error: {str(e)}")
continue
if normalizedEvent:
await normalizedSentinelHelperCollection.sendData(normalizedEvent)
if REQUIRE_RAW and customizedEvent:
await customizedSentinelHelperCollection.sendData(customizedEvent)
s = line
if s:
Expand Down
Loading