Skip to content

Commit 478671d

Browse files
committed
Add queued mapping state
1 parent e718cb4 commit 478671d

File tree

3 files changed

+13
-2
lines changed

3 files changed

+13
-2
lines changed

alembic/versions/d7e6f8c3b9dc_scoreset_mapping_columns.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def upgrade():
3030
"complete",
3131
"pending_variant_processing",
3232
"not_attempted",
33+
"queued",
3334
name="mappingstate",
3435
native_enum=False,
3536
create_constraint=True,

src/mavedb/models/enums/mapping_state.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ class MappingState(enum.Enum):
88
complete = "complete"
99
pending_variant_processing = "pending_variant_processing"
1010
not_attempted = "not_attempted"
11+
queued = "queued"

src/mavedb/worker/jobs.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ async def create_variants_for_score_set(
212212

213213
await redis.lpush(MAPPING_QUEUE_NAME, score_set_urn) # type: ignore
214214
await redis.enqueue_job("variant_mapper_manager", correlation_id, score_set_urn, updater_id)
215+
score_set.mapping_state = MappingState.queued
215216
finally:
216217
db.add(score_set)
217218
db.commit()
@@ -275,7 +276,6 @@ async def map_variants_for_score_set(
275276

276277
except Exception as e:
277278
db.rollback()
278-
score_set.mapping_state = MappingState.failed
279279
score_set.mapping_errors = {
280280
"error_message": f"Encountered an internal server error during mapping. Mapping will be automatically retried up to 5 times for this score set (attempt {attempt}/5)."
281281
}
@@ -305,6 +305,10 @@ async def map_variants_for_score_set(
305305
logging_context["backoff_job_id"] = new_job_id
306306

307307
except Exception as backoff_e:
308+
score_set.mapping_state = MappingState.failed
309+
score_set.mapping_errors = {"error_message": "Encountered an internal server error during mapping"}
310+
db.add(score_set)
311+
db.commit()
308312
send_slack_message(backoff_e)
309313
logging_context = {**logging_context, **format_raised_exception_info_as_dict(backoff_e)}
310314
logger.critical(
@@ -313,6 +317,9 @@ async def map_variants_for_score_set(
313317
)
314318
else:
315319
if new_job_id and not max_retries_exceeded:
320+
score_set.mapping_state = MappingState.queued
321+
db.add(score_set)
322+
db.commit()
316323
logger.info(
317324
msg="After encountering an error while mapping variants, another mapping job was queued.",
318325
extra=logging_context,
@@ -463,7 +470,6 @@ async def map_variants_for_score_set(
463470

464471
except Exception as e:
465472
db.rollback()
466-
score_set.mapping_state = MappingState.failed
467473
score_set.mapping_errors = {
468474
"error_message": f"Encountered an unexpected error while parsing mapped variants. Mapping will be automatically retried up to 5 times for this score set (attempt {attempt}/5)."
469475
}
@@ -505,6 +511,9 @@ async def map_variants_for_score_set(
505511
)
506512
else:
507513
if new_job_id and not max_retries_exceeded:
514+
score_set.mapping_state = MappingState.queued
515+
db.add(score_set)
516+
db.commit()
508517
logger.info(
509518
msg="After encountering an error while parsing mapped variants, another mapping job was queued.",
510519
extra=logging_context,

0 commit comments

Comments
 (0)