|
15 | 15 | from mavedb.view_models.contributor import ContributorCreate |
16 | 16 | from mavedb.view_models.doi_identifier import DoiIdentifierCreate |
17 | 17 | from mavedb.view_models.publication_identifier import PublicationIdentifierCreate |
| 18 | +from mavedb.view_models.score_set_dataset_columns import DatasetColumnMetadata |
18 | 19 | from mavedb.view_models.target_gene import TargetGeneCreate |
19 | 20 | from sqlalchemy import null, or_, select |
20 | 21 | from sqlalchemy.exc import MultipleResultsFound, NoResultFound |
@@ -88,56 +89,54 @@ async def enqueue_variant_creation( |
88 | 89 | *, |
89 | 90 | item: ScoreSet, |
90 | 91 | user_data: UserData, |
| 92 | + new_scores_df: Optional[pd.DataFrame] = None, |
| 93 | + new_counts_df: Optional[pd.DataFrame] = None, |
| 94 | + new_score_columns_metadata: Optional[dict[str, DatasetColumnMetadata]] = None, |
| 95 | + new_count_columns_metadata: Optional[dict[str, DatasetColumnMetadata]] = None, |
91 | 96 | worker: ArqRedis, |
92 | | -) -> None: |
| 97 | +) -> str | None: |
93 | 98 | assert item.dataset_columns is not None |
94 | 99 |
|
95 | | - # score_columns_metadata and count_columns_metadata are the only values of dataset_columns that can be set manually. |
96 | | - # The others, scores_columns and count_columns, are calculated based on the uploaded data and should not be changed here. |
97 | | - # if item_update.dataset_columns.get("countColumnsMetadata") is not None: |
98 | | - # item.dataset_columns= {**item.dataset_columns, "count_columns_metadata": item_update.dataset_columns["countColumnsMetadata"]} |
99 | | - # if item_update.dataset_columns.get("scoreColumnsMetadata") is not None: |
100 | | - # item.dataset_columns = {**item.dataset_columns, "score_columns_metadata": item_update.dataset_columns["scoreColumnsMetadata"]} |
101 | | - |
102 | | - score_columns = [ |
103 | | - "hgvs_nt", |
104 | | - "hgvs_splice", |
105 | | - "hgvs_pro", |
106 | | - ] + item.dataset_columns["score_columns"] |
107 | | - count_columns = [ |
108 | | - "hgvs_nt", |
109 | | - "hgvs_splice", |
110 | | - "hgvs_pro", |
111 | | - ] + item.dataset_columns["count_columns"] |
112 | | - |
113 | | - scores_data = pd.DataFrame( |
114 | | - variants_to_csv_rows(item.variants, columns=score_columns, dtype="score_data") |
115 | | - ).replace("NA", pd.NA) |
116 | | - |
117 | | - if item.dataset_columns["count_columns"]: |
118 | | - count_data = pd.DataFrame( |
119 | | - variants_to_csv_rows(item.variants, columns=count_columns, dtype="count_data") |
| 100 | + # create CSV from existing variants on the score set if no new dataframe provided |
| 101 | + existing_scores_df = None |
| 102 | + if new_scores_df is None: |
| 103 | + score_columns = [ |
| 104 | + "hgvs_nt", |
| 105 | + "hgvs_splice", |
| 106 | + "hgvs_pro", |
| 107 | + ] + item.dataset_columns.get("score_columns", []) |
| 108 | + existing_scores_df = pd.DataFrame( |
| 109 | + variants_to_csv_rows(item.variants, columns=score_columns, dtype="score_data") |
120 | 110 | ).replace("NA", pd.NA) |
121 | | - else: |
122 | | - count_data = None |
123 | 111 |
|
124 | | - scores_column_metadata = item.dataset_columns.get("scores_column_metadata") |
125 | | - counts_column_metadata = item.dataset_columns.get("counts_column_metadata") |
| 112 | + # create CSV from existing variants on the score set if no new dataframe provided |
| 113 | + existing_counts_df = None |
| 114 | + if new_counts_df is None and item.dataset_columns.get("count_columns") is not None: |
| 115 | + count_columns = [ |
| 116 | + "hgvs_nt", |
| 117 | + "hgvs_splice", |
| 118 | + "hgvs_pro", |
| 119 | + ] + item.dataset_columns["count_columns"] |
| 120 | + existing_counts_df = pd.DataFrame( |
| 121 | + variants_to_csv_rows(item.variants, columns=count_columns, dtype="count_data") |
| 122 | + ).replace("NA", pd.NA) |
126 | 123 |
|
127 | | - # await the insertion of this job into the worker queue, not the job itself. |
| 124 | + # Await the insertion of this job into the worker queue, not the job itself. |
| 125 | + # Uses provided score and counts dataframes and metadata files, or falls back to existing data on the score set if not provided. |
128 | 126 | job = await worker.enqueue_job( |
129 | 127 | "create_variants_for_score_set", |
130 | 128 | correlation_id_for_context(), |
131 | 129 | item.id, |
132 | 130 | user_data.user.id, |
133 | | - scores_data, |
134 | | - count_data, |
135 | | - scores_column_metadata, |
136 | | - counts_column_metadata, |
| 131 | + existing_scores_df if new_scores_df is None else new_scores_df, |
| 132 | + existing_counts_df if new_counts_df is None else new_counts_df, |
| 133 | + item.dataset_columns.get("score_columns_metadata") if new_score_columns_metadata is None else new_score_columns_metadata, |
| 134 | + item.dataset_columns.get("count_columns_metadata") if new_count_columns_metadata is None else new_count_columns_metadata, |
137 | 135 | ) |
138 | 136 | if job is not None: |
139 | | - save_to_logging_context({"worker_job_id": job.job_id}) |
140 | | - logger.info(msg="Enqueud variant creation job.", extra=logging_context()) |
| 137 | + return job.job_id |
| 138 | + else: |
| 139 | + return None |
141 | 140 |
|
142 | 141 | class ScoreSetUpdateResult(TypedDict): |
143 | 142 | item: ScoreSet |
@@ -252,6 +251,7 @@ async def score_set_update( |
252 | 251 | item.score_ranges = item_update_dict.get("score_ranges", null()) |
253 | 252 |
|
254 | 253 | if "target_genes" in item_update_dict: |
| 254 | + # stash existing target gene ids to compare after update, to determine if variants need to be re-created |
255 | 255 | assert all(tg.id is not None for tg in item.target_genes) |
256 | 256 | existing_target_ids: list[int] = [tg.id for tg in item.target_genes if tg.id is not None] |
257 | 257 |
|
@@ -371,6 +371,59 @@ async def score_set_update( |
371 | 371 | save_to_logging_context({"updated_resource": item.urn}) |
372 | 372 | return {"item": item, "should_create_variants": should_create_variants} |
373 | 373 |
|
| 374 | +class ParseScoreSetUpdate(TypedDict): |
| 375 | + scores_df: Optional[pd.DataFrame] |
| 376 | + counts_df: Optional[pd.DataFrame] |
| 377 | + score_columns_metadata: Optional[dict[str, DatasetColumnMetadata]] |
| 378 | + count_columns_metadata: Optional[dict[str, DatasetColumnMetadata]] |
| 379 | + |
| 380 | +async def parse_score_set_variants_uploads( |
| 381 | + scores_file: Optional[UploadFile] = File(None), |
| 382 | + counts_file: Optional[UploadFile] = File(None), |
| 383 | + score_columns_metadata_file: Optional[UploadFile] = File(None), |
| 384 | + count_columns_metadata_file: Optional[UploadFile] = File(None), |
| 385 | +) -> ParseScoreSetUpdate: |
| 386 | + if scores_file and scores_file.file: |
| 387 | + try: |
| 388 | + scores_df = csv_data_to_df(scores_file.file) |
| 389 | + # Handle non-utf8 file problem. |
| 390 | + except UnicodeDecodeError as e: |
| 391 | + raise HTTPException(status_code=400, detail=f"Error decoding file: {e}. Ensure the file has correct values.") |
| 392 | + else: |
| 393 | + scores_df = None |
| 394 | + |
| 395 | + if counts_file and counts_file.file: |
| 396 | + try: |
| 397 | + counts_df = csv_data_to_df(counts_file.file) |
| 398 | + # Handle non-utf8 file problem. |
| 399 | + except UnicodeDecodeError as e: |
| 400 | + raise HTTPException(status_code=400, detail=f"Error decoding file: {e}. Ensure the file has correct values.") |
| 401 | + else: |
| 402 | + counts_df = None |
| 403 | + |
| 404 | + if score_columns_metadata_file and score_columns_metadata_file.file: |
| 405 | + try: |
| 406 | + score_columns_metadata = json.load(score_columns_metadata_file.file) |
| 407 | + except json.JSONDecodeError as e: |
| 408 | + raise HTTPException(status_code=400, detail=f"Error decoding scores metadata file: {e}. Ensure the file is valid JSON.") |
| 409 | + else: |
| 410 | + score_columns_metadata = None |
| 411 | + |
| 412 | + if count_columns_metadata_file and count_columns_metadata_file.file: |
| 413 | + try: |
| 414 | + count_columns_metadata = json.load(count_columns_metadata_file.file) |
| 415 | + except json.JSONDecodeError as e: |
| 416 | + raise HTTPException(status_code=400, detail=f"Error decoding counts metadata file: {e}. Ensure the file is valid JSON.") |
| 417 | + else: |
| 418 | + count_columns_metadata = None |
| 419 | + |
| 420 | + return { |
| 421 | + "scores_df": scores_df, |
| 422 | + "counts_df": counts_df, |
| 423 | + "score_columns_metadata": score_columns_metadata, |
| 424 | + "count_columns_metadata": count_columns_metadata, |
| 425 | + } |
| 426 | + |
374 | 427 | async def fetch_score_set_by_urn( |
375 | 428 | db, urn: str, user: Optional[UserData], owner_or_contributor: Optional[UserData], only_published: bool |
376 | 429 | ) -> ScoreSet: |
@@ -1261,88 +1314,47 @@ async def upload_score_set_variant_data( |
1261 | 1314 | assert_permission(user_data, item, Action.UPDATE) |
1262 | 1315 | assert_permission(user_data, item, Action.SET_SCORES) |
1263 | 1316 |
|
1264 | | - # get existing column metadata for scores if no new file is provided |
1265 | | - if score_columns_metadata_file and score_columns_metadata_file.file: |
1266 | | - try: |
1267 | | - scores_column_metadata = json.load(score_columns_metadata_file.file) |
1268 | | - except json.JSONDecodeError as e: |
1269 | | - raise HTTPException(status_code=400, detail=f"Error decoding scores metadata file: {e}. Ensure the file is valid JSON.") |
1270 | | - else: |
1271 | | - scores_column_metadata = item.dataset_columns.get("scores_column_metadata") if item.dataset_columns else None |
1272 | | - |
1273 | | - # get existing column metadata for counts if no new file is provided |
1274 | | - if count_columns_metadata_file and count_columns_metadata_file.file: |
1275 | | - try: |
1276 | | - counts_column_metadata = json.load(count_columns_metadata_file.file) |
1277 | | - except json.JSONDecodeError as e: |
1278 | | - raise HTTPException(status_code=400, detail=f"Error decoding counts metadata file: {e}. Ensure the file is valid JSON.") |
1279 | | - else: |
1280 | | - counts_column_metadata = item.dataset_columns.get("counts_column_metadata") if item.dataset_columns else None |
1281 | | - |
| 1317 | + score_set_variants_data = await parse_score_set_variants_uploads( |
| 1318 | + scores_file, |
| 1319 | + counts_file, |
| 1320 | + score_columns_metadata_file, |
| 1321 | + count_columns_metadata_file, |
| 1322 | + ) |
1282 | 1323 |
|
1283 | | - if scores_file and scores_file.file: |
1284 | | - try: |
1285 | | - scores_df = csv_data_to_df(scores_file.file) |
1286 | | - counts_df = None |
1287 | | - if counts_file and counts_file.filename: |
1288 | | - counts_df = csv_data_to_df(counts_file.file) |
1289 | | - # Handle non-utf8 file problem. |
1290 | | - except UnicodeDecodeError as e: |
1291 | | - raise HTTPException(status_code=400, detail=f"Error decoding file: {e}. Ensure the file has correct values.") |
1292 | | - elif item.variants: |
1293 | | - assert item.dataset_columns is not None |
1294 | | - score_columns = [ |
1295 | | - "hgvs_nt", |
1296 | | - "hgvs_splice", |
1297 | | - "hgvs_pro", |
1298 | | - ] + item.dataset_columns["score_columns"] |
1299 | | - count_columns = [ |
1300 | | - "hgvs_nt", |
1301 | | - "hgvs_splice", |
1302 | | - "hgvs_pro", |
1303 | | - ] + item.dataset_columns["count_columns"] |
| 1324 | + for key, val in score_set_variants_data.items(): |
| 1325 | + logger.info(msg=f"{key}: {val}", extra=logging_context()) |
| 1326 | + |
| 1327 | + # Although this is also updated within the variant creation job, update it here |
| 1328 | + # as well so that we can display the proper UI components (queue invocation delay |
| 1329 | + # races the score set GET request). |
| 1330 | + item.processing_state = ProcessingState.processing |
| 1331 | + |
| 1332 | + logger.info(msg="Enqueuing variant creation job.", extra=logging_context()) |
| 1333 | + jobId = await enqueue_variant_creation( |
| 1334 | + item=item, |
| 1335 | + user_data=user_data, |
| 1336 | + new_scores_df=score_set_variants_data["scores_df"], |
| 1337 | + new_counts_df=score_set_variants_data["counts_df"], |
| 1338 | + new_score_columns_metadata=score_set_variants_data["score_columns_metadata"], |
| 1339 | + new_count_columns_metadata=score_set_variants_data["count_columns_metadata"], |
| 1340 | + worker=worker |
| 1341 | + ) |
1304 | 1342 |
|
1305 | | - scores_df = pd.DataFrame( |
1306 | | - variants_to_csv_rows(item.variants, columns=score_columns, dtype="score_data") |
1307 | | - ).replace("NA", pd.NA) |
1308 | 1343 |
|
1309 | | - if item.dataset_columns["count_columns"]: |
1310 | | - counts_df = pd.DataFrame( |
1311 | | - variants_to_csv_rows(item.variants, columns=count_columns, dtype="count_data") |
1312 | | - ).replace("NA", pd.NA) |
1313 | | - else: |
1314 | | - counts_df = None |
| 1344 | + if jobId is None: |
| 1345 | + item.processing_state = ProcessingState.failed |
| 1346 | + logger.warning(msg="Failed to enqueue variant creation job.", extra=logging_context()) |
1315 | 1347 | else: |
1316 | | - scores_df = pd.DataFrame() |
1317 | | - |
1318 | | - if not scores_df.empty: |
1319 | | - # Although this is also updated within the variant creation job, update it here |
1320 | | - # as well so that we can display the proper UI components (queue invocation delay |
1321 | | - # races the score set GET request). |
1322 | | - item.processing_state = ProcessingState.processing |
1323 | | - |
1324 | | - # await the insertion of this job into the worker queue, not the job itself. |
1325 | | - job = await worker.enqueue_job( |
1326 | | - "create_variants_for_score_set", |
1327 | | - correlation_id_for_context(), |
1328 | | - item.id, |
1329 | | - user_data.user.id, |
1330 | | - scores_df, |
1331 | | - counts_df, |
1332 | | - scores_column_metadata, |
1333 | | - counts_column_metadata, |
1334 | | - ) |
1335 | | - if job is not None: |
1336 | | - save_to_logging_context({"worker_job_id": job.job_id}) |
1337 | | - logger.info(msg="Enqueud variant creation job.", extra=logging_context()) |
| 1348 | + save_to_logging_context({"worker_job_id": jobId}) |
| 1349 | + logger.info(msg="Enqueued variant creation job.", extra=logging_context()) |
1338 | 1350 |
|
1339 | 1351 | db.add(item) |
1340 | 1352 | db.commit() |
1341 | 1353 | db.refresh(item) |
| 1354 | + |
1342 | 1355 | enriched_experiment = enrich_experiment_with_num_score_sets(item.experiment, user_data) |
1343 | 1356 | return score_set.ScoreSet.model_validate(item).copy(update={"experiment": enriched_experiment}) |
1344 | 1357 |
|
1345 | | - |
1346 | 1358 | @router.post( |
1347 | 1359 | "/score-sets/{urn}/ranges/data", |
1348 | 1360 | response_model=score_set.ScoreSet, |
@@ -1419,7 +1431,7 @@ async def update_score_set_with_variants( |
1419 | 1431 |
|
1420 | 1432 | itemUpdateResult = await score_set_update(db=db, urn=urn, item_update=item_update, exclude_unset=True, user_data=user_data) |
1421 | 1433 | updatedItem = itemUpdateResult["item"] |
1422 | | - # should_create_variants = itemUpdateResult["should_create_variants"] |
| 1434 | + should_create_variants = itemUpdateResult["should_create_variants"] |
1423 | 1435 |
|
1424 | 1436 | # TODO handle uploaded files |
1425 | 1437 |
|
@@ -1454,8 +1466,15 @@ async def update_score_set( |
1454 | 1466 | # races the score set GET request). |
1455 | 1467 | updatedItem.processing_state = ProcessingState.processing |
1456 | 1468 |
|
1457 | | - await enqueue_variant_creation(item=updatedItem, user_data=user_data, worker=worker) |
| 1469 | + logger.info(msg="Enqueuing variant creation job.", extra=logging_context()) |
| 1470 | + jobId = await enqueue_variant_creation(item=updatedItem, user_data=user_data, worker=worker) |
1458 | 1471 |
|
| 1472 | + if jobId is None: |
| 1473 | + updatedItem.processing_state = ProcessingState.failed |
| 1474 | + logger.warning(msg="Failed to enqueue variant creation job.", extra=logging_context()) |
| 1475 | + else: |
| 1476 | + save_to_logging_context({"worker_job_id": jobId}) |
| 1477 | + logger.info(msg="Enqueued variant creation job.", extra=logging_context()) |
1459 | 1478 | db.add(updatedItem) |
1460 | 1479 | db.commit() |
1461 | 1480 | db.refresh(updatedItem) |
|
0 commit comments