1515from sqlalchemy .dialects .postgresql import JSONB
1616from sqlalchemy .orm import Session
1717
18- from mavedb .lib .exceptions import NonexistentMappingReferenceError , NonexistentMappingResultsError
18+ from mavedb .lib .exceptions import NonexistentMappingReferenceError , NonexistentMappingResultsError , MappingEnqueueError
1919from mavedb .lib .score_sets import (
2020 columns_for_dataset ,
2121 create_variants ,
@@ -199,6 +199,7 @@ async def create_variants_for_score_set(
199199 msg = "Encountered an unhandled exception while creating variants for score set." , extra = logging_context
200200 )
201201
202+ # Don't raise BaseExceptions so we may emit canonical logs (TODO: Perhaps they are so problematic we want to raise them anyway).
202203 return {"success" : False }
203204
204205 else :
@@ -226,6 +227,7 @@ async def map_variants_for_score_set(
226227) -> dict :
227228 async with mapping_in_execution (redis = ctx ["redis" ], job_id = ctx ["job_id" ]):
228229 logging_context = {}
230+ score_set = None
229231 try :
230232 db : Session = ctx ["db" ]
231233 redis : ArqRedis = ctx ["redis" ]
@@ -234,19 +236,6 @@ async def map_variants_for_score_set(
234236 logging_context ["attempt" ] = attempt
235237 logger .info (msg = "Started variant mapping" , extra = logging_context )
236238
237- except Exception as e :
238- # NOTE: can't update mapping state here because setup is necessary to update the db
239- send_slack_message (e )
240- logging_context = {** logging_context , ** format_raised_exception_info_as_dict (e )}
241- logger .error (
242- msg = "Variant mapper encountered an unexpected error during setup. This job will not be retried." ,
243- extra = logging_context ,
244- )
245-
246- return {"success" : False , "retried" : False }
247-
248- score_set = None
249- try :
250239 score_set = db .scalars (select (ScoreSet ).where (ScoreSet .urn == score_set_urn )).one ()
251240 score_set .mapping_state = MappingState .processing
252241 score_set .mapping_errors = null ()
@@ -257,41 +246,26 @@ async def map_variants_for_score_set(
257246 logging_context ["mapping_state" ] = score_set .mapping_state
258247 logger .debug (msg = "Fetched score set metadata for mapping job." , extra = logging_context )
259248
260- except Exception as e :
261- db .rollback ()
262- if score_set :
263- score_set .mapping_state = MappingState .failed
264- score_set .mapping_errors = {"error_message" : "Encountered an internal server error during mapping" }
265- db .add (score_set )
266- db .commit ()
267- send_slack_message (e )
268- logging_context = {** logging_context , ** format_raised_exception_info_as_dict (e )}
269- logger .error (
270- msg = "Variant mapper encountered an unexpected error while fetching score set metadata. This job will not be retried." ,
271- extra = logging_context ,
272- )
273-
274- return {"success" : False , "retried" : False }
275-
276- try :
277249 # Do not block Worker event loop during mapping, see: https://arq-docs.helpmanual.io/#synchronous-jobs.
278250 vrs = vrs_mapper ()
279251 blocking = functools .partial (vrs .map_score_set , score_set_urn )
280252 loop = asyncio .get_running_loop ()
281253
282254 except Exception as e :
283- db .rollback ()
284- score_set .mapping_state = MappingState .failed
285- score_set .mapping_errors = {"error_message" : "Encountered an internal server error during mapping" }
286- db .add (score_set )
287- db .commit ()
288255 send_slack_message (e )
289256 logging_context = {** logging_context , ** format_raised_exception_info_as_dict (e )}
290257 logger .error (
291- msg = "Variant mapper encountered an unexpected error while preparing the mapping event loop . This job will not be retried." ,
258+ msg = "Variant mapper encountered an unexpected error during setup . This job will not be retried." ,
292259 extra = logging_context ,
293260 )
294261
262+ db .rollback ()
263+ if score_set :
264+ score_set .mapping_state = MappingState .failed
265+ score_set .mapping_errors = {"error_message" : "Encountered an internal server error during mapping" }
266+ db .add (score_set )
267+ db .commit ()
268+
295269 return {"success" : False , "retried" : False }
296270
297271 mapping_results = None
@@ -307,9 +281,10 @@ async def map_variants_for_score_set(
307281 }
308282 db .add (score_set )
309283 db .commit ()
284+
310285 send_slack_message (e )
311286 logging_context = {** logging_context , ** format_raised_exception_info_as_dict (e )}
312- logger .warn (
287+ logger .warning (
313288 msg = "Variant mapper encountered an unexpected error while mapping variants. This job will be retried." ,
314289 extra = logging_context ,
315290 )
@@ -500,7 +475,7 @@ async def map_variants_for_score_set(
500475
501476 send_slack_message (e )
502477 logging_context = {** logging_context , ** format_raised_exception_info_as_dict (e )}
503- logger .warn (
478+ logger .warning (
504479 msg = "An unexpected error occurred during variant mapping. This job will be attempted again." ,
505480 extra = logging_context ,
506481 )
@@ -566,6 +541,8 @@ async def variant_mapper_manager(
566541 ctx : dict , correlation_id : str , score_set_urn : str , updater_id : int , attempt : int = 0
567542) -> dict :
568543 logging_context = {}
544+ mapping_job_id = None
545+ mapping_job_status = None
569546 try :
570547 redis : ArqRedis = ctx ["redis" ]
571548 db : Session = ctx ["db" ]
@@ -574,21 +551,12 @@ async def variant_mapper_manager(
574551 logging_context ["attempt" ] = attempt
575552 logger .debug (msg = "Variant mapping manager began execution" , extra = logging_context )
576553
577- except Exception as e :
578- send_slack_message (e )
579- logging_context = {** logging_context , ** format_raised_exception_info_as_dict (e )}
580- logger .error (msg = "Variant mapper manager encountered an unexpected error during setup." , extra = logging_context )
581- return {"success" : False , "enqueued_job" : None }
582-
583- mapping_job_id = None
584- mapping_job_status = None
585- try :
586554 queued_urn = await redis .rpop (MAPPING_QUEUE_NAME ) # type: ignore
587555 queue_length = await redis .llen (MAPPING_QUEUE_NAME ) # type: ignore
588556 logging_context ["variant_mapping_queue_length" ] = queue_length
589557
590558 # Setup the job id cache if it does not already exist.
591- if await redis .exists (MAPPING_CURRENT_ID_NAME ):
559+ if not await redis .exists (MAPPING_CURRENT_ID_NAME ):
592560 await redis .set (MAPPING_CURRENT_ID_NAME , "" )
593561
594562 if not queued_urn :
@@ -599,7 +567,6 @@ async def variant_mapper_manager(
599567 logging_context ["current_mapping_resource" ] = queued_urn
600568 logger .debug (msg = "Found mapping job(s) still in queue." , extra = logging_context )
601569
602- mapping_job_status = None
603570 mapping_job_id = await redis .get (MAPPING_CURRENT_ID_NAME )
604571 if mapping_job_id :
605572 mapping_job_id = mapping_job_id .decode ("utf-8" )
@@ -611,10 +578,7 @@ async def variant_mapper_manager(
611578 except Exception as e :
612579 send_slack_message (e )
613580 logging_context = {** logging_context , ** format_raised_exception_info_as_dict (e )}
614- logger .error (
615- msg = "Variant mapper manager encountered an unexpected error while fetching the executing mapping job." ,
616- extra = logging_context ,
617- )
581+ logger .error (msg = "Variant mapper manager encountered an unexpected error during setup." , extra = logging_context )
618582 return {"success" : False , "enqueued_job" : None }
619583
620584 new_job = None
@@ -662,25 +626,22 @@ async def variant_mapper_manager(
662626 # before the deferred time, these deferred jobs will still run once able.
663627 return {"success" : True , "enqueued_job" : new_job_id }
664628
665- logger .warn (
666- msg = "Unable to queue a new mapping job or defer mapping. This score set will not be mapped." ,
629+ raise MappingEnqueueError ()
630+
631+ except Exception as e :
632+ send_slack_message (e )
633+ logging_context = {** logging_context , ** format_raised_exception_info_as_dict (e )}
634+ logger .error (
635+ msg = "Variant mapper manager encountered an unexpected error while enqueing a mapping job. This job will not be retried." ,
667636 extra = logging_context ,
668637 )
669- # TODO: If we end up here, we were unable to enqueue a new mapping job or a new manager job despite expecting we should have
670- # been able to do so. We should raise some sort of exception.
638+
639+ db . rollback ()
671640 score_set = db .scalars (select (ScoreSet ).where (ScoreSet .urn == score_set_urn )).one_or_none ()
672641 if score_set :
673642 score_set .mapping_state = MappingState .failed
674643 score_set .mapping_errors = "Unable to queue a new mapping job or defer score set mapping."
675644 db .add (score_set )
645+ db .commit ()
676646
677647 return {"success" : False , "enqueued_job" : new_job_id }
678-
679- except Exception as e :
680- send_slack_message (e )
681- logging_context = {** logging_context , ** format_raised_exception_info_as_dict (e )}
682- logger .error (
683- msg = "Variant mapper manager encountered an unexpected error while enqueing a mapping job." ,
684- extra = logging_context ,
685- )
686- return {"success" : False , "enqueued_job" : new_job_id }
0 commit comments