11import os
22import re
3- from typing import Any , Tuple , Union , Dict , List
3+ from sqlalchemy .orm .attributes import flag_modified
4+ from typing import Any , Tuple , Dict , List
45
56import pytz
67import json
1617 embedding ,
1718 labeling_task ,
1819 labeling_task_label ,
20+ record ,
1921 record_label_association ,
2022 general ,
2123 project ,
3537from submodules .model .models import (
3638 InformationSource ,
3739 InformationSourceStatisticsExclusion ,
38- RecordLabelAssociationToken ,
3940 RecordLabelAssociation ,
4041 InformationSourcePayload ,
4142 User ,
4243)
43- from submodules .model .business_objects import record
4444from controller .auth .manager import get_user_by_info
4545from util import daemon , doc_ock , notification
4646from submodules .s3 import controller as s3
@@ -231,6 +231,20 @@ def execution_pipeline(
231231 )
232232 has_error = update_records (payload_item , project_id )
233233 if has_error :
234+ payload_item = information_source .get_payload (project_id , payload_id )
235+ tmp_log_store = payload_item .logs
236+ berlin_now = datetime .now (__tz )
237+ tmp_log_store .append (
238+ " " .join (
239+ [
240+ berlin_now .strftime ("%Y-%m-%dT%H:%M:%S" ),
241+ "If existing, results of previous run are kept." ,
242+ ]
243+ )
244+ )
245+ payload_item .logs = tmp_log_store
246+ flag_modified (payload_item , "logs" )
247+ general .commit ()
234248 raise ValueError (
235249 "update_records resulted in errors -- see log for details"
236250 )
@@ -247,9 +261,10 @@ def execution_pipeline(
247261 project_id ,
248262 f"payload_finished:{ information_source_item .id } :{ payload .id } " ,
249263 )
250- except :
264+ except Exception as e :
251265 general .rollback ()
252- print (traceback .format_exc ())
266+ if not type (e ) == ValueError :
267+ print (traceback .format_exc ())
253268 payload_item .state = enums .PayloadState .FAILED .value
254269 general .commit ()
255270 create_notification (
@@ -383,15 +398,32 @@ def update_records(
383398) -> bool :
384399 org_id = organization .get_id_by_project_id (project_id )
385400 tmp_log_store = information_source_payload .logs
401+ try :
402+ output_data = json .loads (
403+ s3 .get_object (
404+ org_id , str (project_id ) + "/" + str (information_source_payload .id )
405+ )
406+ )
407+ except Exception :
408+ berlin_now = datetime .now (__tz )
409+ tmp_log_store .append (
410+ " " .join (
411+ [
412+ berlin_now .strftime ("%Y-%m-%dT%H:%M:%S" ),
413+ "Code execution exited with errors. Please check the logs." ,
414+ ]
415+ )
416+ )
417+ information_source_payload .logs = tmp_log_store
418+ flag_modified (information_source_payload , "logs" )
419+ general .commit ()
420+ return True
421+
386422 berlin_now = datetime .now (__tz )
387423 tmp_log_store .append (
388424 berlin_now .strftime ("%Y-%m-%dT%H:%M:%S" ) + " Writing results to the database."
389425 )
390- output_data = json .loads (
391- s3 .get_object (
392- org_id , str (project_id ) + "/" + str (information_source_payload .id )
393- )
394- )
426+
395427 information_source : InformationSource = (
396428 information_source_payload .informationSource # backref resolves in camelCase
397429 )
@@ -412,10 +444,17 @@ def update_records(
412444 output_data ,
413445 )
414446 berlin_now = datetime .now (__tz )
415- tmp_log_store .append (
416- berlin_now .strftime ("%Y-%m-%dT%H:%M:%S" ) + " Finished writing."
417- )
447+ if has_errors :
448+ tmp_log_store .append (
449+ berlin_now .strftime ("%Y-%m-%dT%H:%M:%S" )
450+ + " Writing to the database failed."
451+ )
452+ else :
453+ tmp_log_store .append (
454+ berlin_now .strftime ("%Y-%m-%dT%H:%M:%S" ) + " Finished writing."
455+ )
418456 information_source_payload .logs = tmp_log_store
457+ flag_modified (information_source_payload , "logs" )
419458 general .commit ()
420459 return has_errors
421460
0 commit comments