@@ -239,7 +239,7 @@ def create_dataset(self, dataset_file: DatasetFile):
239239 session .add (latest_dataset )
240240 session .add (new_dataset )
241241
242- db .refresh_materialized_view (t_feedsearch .name ) # todo: ????????
242+ db .refresh_materialized_view (session , t_feedsearch .name )
243243 session .commit ()
244244 logging .info (f"[{ self .feed_stable_id } ] Dataset created successfully." )
245245 except Exception as e :
@@ -310,49 +310,52 @@ def process_dataset(cloud_event: CloudEvent):
310310 stable_id = "UNKNOWN"
311311 execution_id = "UNKNOWN"
312312 bucket_name = os .getenv ("DATASETS_BUCKET_NANE" )
313- start_db_session (os .getenv ("FEEDS_DATABASE_URL" ))
314- maximum_executions = os .getenv ("MAXIMUM_EXECUTIONS" , 1 )
315- public_hosted_datasets_url = os .getenv ("PUBLIC_HOSTED_DATASETS_URL" )
316- trace_service = None
317- dataset_file : DatasetFile = None
318- error_message = None
313+ db = Database (database_url = os .getenv ("FEEDS_DATABASE_URL" ))
319314 try :
320- # Extract data from message
321- data = base64 .b64decode (cloud_event .data ["message" ]["data" ]).decode ()
322- json_payload = json .loads (data )
323- logging .info (
324- f"[{ json_payload ['feed_stable_id' ]} ] JSON Payload: { json .dumps (json_payload )} "
325- )
326- stable_id = json_payload ["feed_stable_id" ]
327- execution_id = json_payload ["execution_id" ]
328- trace_service = DatasetTraceService ()
329-
330- trace = trace_service .get_by_execution_and_stable_ids (execution_id , stable_id )
331- logging .info (f"[{ stable_id } ] Dataset trace: { trace } " )
332- executions = len (trace ) if trace else 0
333- logging .info (
334- f"[{ stable_id } ] Dataset executed times={ executions } /{ maximum_executions } "
335- f"in execution=[{ execution_id } ] "
336- )
315+ with db .start_db_session ():
316+ maximum_executions = os .getenv ("MAXIMUM_EXECUTIONS" , 1 )
317+ public_hosted_datasets_url = os .getenv ("PUBLIC_HOSTED_DATASETS_URL" )
318+ trace_service = None
319+ dataset_file : DatasetFile = None
320+ error_message = None
321+ # Extract data from message
322+ data = base64 .b64decode (cloud_event .data ["message" ]["data" ]).decode ()
323+ json_payload = json .loads (data )
324+ logging .info (
325+ f"[{ json_payload ['feed_stable_id' ]} ] JSON Payload: { json .dumps (json_payload )} "
326+ )
327+ stable_id = json_payload ["feed_stable_id" ]
328+ execution_id = json_payload ["execution_id" ]
329+ trace_service = DatasetTraceService ()
337330
338- if executions > 0 :
339- if executions >= maximum_executions :
340- error_message = f"[{ stable_id } ] Function already executed maximum times in execution: [{ execution_id } ]"
341- logging .error (error_message )
342- return error_message
343-
344- processor = DatasetProcessor (
345- json_payload ["producer_url" ],
346- json_payload ["feed_id" ],
347- stable_id ,
348- execution_id ,
349- json_payload ["dataset_hash" ],
350- bucket_name ,
351- int (json_payload ["authentication_type" ]),
352- json_payload ["api_key_parameter_name" ],
353- public_hosted_datasets_url ,
354- )
355- dataset_file = processor .process ()
331+ trace = trace_service .get_by_execution_and_stable_ids (
332+ execution_id , stable_id
333+ )
334+ logging .info (f"[{ stable_id } ] Dataset trace: { trace } " )
335+ executions = len (trace ) if trace else 0
336+ logging .info (
337+ f"[{ stable_id } ] Dataset executed times={ executions } /{ maximum_executions } "
338+ f"in execution=[{ execution_id } ] "
339+ )
340+
341+ if executions > 0 :
342+ if executions >= maximum_executions :
343+ error_message = f"[{ stable_id } ] Function already executed maximum times in execution: [{ execution_id } ]"
344+ logging .error (error_message )
345+ return error_message
346+
347+ processor = DatasetProcessor (
348+ json_payload ["producer_url" ],
349+ json_payload ["feed_id" ],
350+ stable_id ,
351+ execution_id ,
352+ json_payload ["dataset_hash" ],
353+ bucket_name ,
354+ int (json_payload ["authentication_type" ]),
355+ json_payload ["api_key_parameter_name" ],
356+ public_hosted_datasets_url ,
357+ )
358+ dataset_file = processor .process ()
356359 except Exception as e :
357360 logging .error (e )
358361 error_message = f"[{ stable_id } ] Error execution: [{ execution_id } ] error: [{ e } ]"
0 commit comments