1515from mavedb .view_models .contributor import ContributorCreate
1616from mavedb .view_models .doi_identifier import DoiIdentifierCreate
1717from mavedb .view_models .publication_identifier import PublicationIdentifierCreate
18+ from mavedb .view_models .score_set_dataset_columns import DatasetColumnMetadata
1819from mavedb .view_models .target_gene import TargetGeneCreate
1920from sqlalchemy import null , or_ , select
2021from sqlalchemy .exc import MultipleResultsFound , NoResultFound
@@ -88,17 +89,14 @@ async def enqueue_variant_creation(
8889 * ,
8990 item : ScoreSet ,
9091 user_data : UserData ,
92+ new_scores_df : Optional [pd .DataFrame ] = None ,
93+ new_counts_df : Optional [pd .DataFrame ] = None ,
94+ new_score_columns_metadata : Optional [dict [str , DatasetColumnMetadata ]] = None ,
95+ new_count_columns_metadata : Optional [dict [str , DatasetColumnMetadata ]] = None ,
9196 worker : ArqRedis ,
92- ) -> None :
97+ ) -> str | None :
9398 assert item .dataset_columns is not None
9499
95- # score_columns_metadata and count_columns_metadata are the only values of dataset_columns that can be set manually.
96- # The others, scores_columns and count_columns, are calculated based on the uploaded data and should not be changed here.
97- # if item_update.dataset_columns.get("countColumnsMetadata") is not None:
98- # item.dataset_columns= {**item.dataset_columns, "count_columns_metadata": item_update.dataset_columns["countColumnsMetadata"]}
99- # if item_update.dataset_columns.get("scoreColumnsMetadata") is not None:
100- # item.dataset_columns = {**item.dataset_columns, "score_columns_metadata": item_update.dataset_columns["scoreColumnsMetadata"]}
101-
102100 score_columns = [
103101 "hgvs_nt" ,
104102 "hgvs_splice" ,
@@ -110,34 +108,37 @@ async def enqueue_variant_creation(
110108 "hgvs_pro" ,
111109 ] + item .dataset_columns ["count_columns" ]
112110
113- scores_data = pd .DataFrame (
114- variants_to_csv_rows (item .variants , columns = score_columns , dtype = "score_data" )
115- ).replace ("NA" , pd .NA )
116-
117- if item .dataset_columns ["count_columns" ]:
118- count_data = pd .DataFrame (
119- variants_to_csv_rows (item .variants , columns = count_columns , dtype = "count_data" )
111+ # create CSV from existing variants on the score set if no new dataframe provided
112+ existing_scores_df = None
113+ if new_scores_df is None :
114+ existing_scores_df = pd .DataFrame (
115+ variants_to_csv_rows (item .variants , columns = score_columns , dtype = "score_data" )
120116 ).replace ("NA" , pd .NA )
121- else :
122- count_data = None
123117
124- scores_column_metadata = item .dataset_columns .get ("scores_column_metadata" )
125- counts_column_metadata = item .dataset_columns .get ("counts_column_metadata" )
118+ # create CSV from existing variants on the score set if no new dataframe provided
119+ existing_counts_df = None
120+ if new_counts_df is None :
121+ if item .dataset_columns .get ("count_columns" ):
122+ existing_counts_df = pd .DataFrame (
123+ variants_to_csv_rows (item .variants , columns = count_columns , dtype = "count_data" )
124+ ).replace ("NA" , pd .NA )
126125
127- # await the insertion of this job into the worker queue, not the job itself.
126+ # Await the insertion of this job into the worker queue, not the job itself.
127+ # Uses provided score and counts dataframes and metadata files, or falls back to existing data on the score set if not provided.
128128 job = await worker .enqueue_job (
129129 "create_variants_for_score_set" ,
130130 correlation_id_for_context (),
131131 item .id ,
132132 user_data .user .id ,
133- scores_data ,
134- count_data ,
135- scores_column_metadata ,
136- counts_column_metadata ,
133+ existing_scores_df if new_scores_df is None else new_scores_df ,
134+ existing_counts_df if new_counts_df is None else new_counts_df ,
135+ item . dataset_columns . get ( "score_columns_metadata" ) if new_score_columns_metadata is None else new_score_columns_metadata ,
136+ item . dataset_columns . get ( "count_columns_metadata" ) if new_count_columns_metadata is None else new_count_columns_metadata ,
137137 )
138138 if job is not None :
139- save_to_logging_context ({"worker_job_id" : job .job_id })
140- logger .info (msg = "Enqueud variant creation job." , extra = logging_context ())
139+ return job .job_id
140+ else :
141+ return None
141142
142143class ScoreSetUpdateResult (TypedDict ):
143144 item : ScoreSet
@@ -252,6 +253,7 @@ async def score_set_update(
252253 item .score_ranges = item_update_dict .get ("score_ranges" , null ())
253254
254255 if "target_genes" in item_update_dict :
256+ # stash existing target gene ids to compare after update, to determine if variants need to be re-created
255257 assert all (tg .id is not None for tg in item .target_genes )
256258 existing_target_ids : list [int ] = [tg .id for tg in item .target_genes if tg .id is not None ]
257259
@@ -371,6 +373,59 @@ async def score_set_update(
371373 save_to_logging_context ({"updated_resource" : item .urn })
372374 return {"item" : item , "should_create_variants" : should_create_variants }
373375
376+ class ParseScoreSetUpdate (TypedDict ):
377+ scores_df : Optional [pd .DataFrame ]
378+ counts_df : Optional [pd .DataFrame ]
379+ score_columns_metadata : Optional [dict [str , DatasetColumnMetadata ]]
380+ count_columns_metadata : Optional [dict [str , DatasetColumnMetadata ]]
381+
382+ async def parse_score_set_variants_uploads (
383+ scores_file : Optional [UploadFile ] = File (None ),
384+ counts_file : Optional [UploadFile ] = File (None ),
385+ score_columns_metadata_file : Optional [UploadFile ] = File (None ),
386+ count_columns_metadata_file : Optional [UploadFile ] = File (None ),
387+ ) -> ParseScoreSetUpdate :
388+ if scores_file and scores_file .file :
389+ try :
390+ scores_df = csv_data_to_df (scores_file .file )
391+ # Handle non-utf8 file problem.
392+ except UnicodeDecodeError as e :
393+ raise HTTPException (status_code = 400 , detail = f"Error decoding file: { e } . Ensure the file has correct values." )
394+ else :
395+ scores_df = None
396+
397+ if counts_file and counts_file .file :
398+ try :
399+ counts_df = csv_data_to_df (counts_file .file )
400+ # Handle non-utf8 file problem.
401+ except UnicodeDecodeError as e :
402+ raise HTTPException (status_code = 400 , detail = f"Error decoding file: { e } . Ensure the file has correct values." )
403+ else :
404+ counts_df = None
405+
406+ if score_columns_metadata_file and score_columns_metadata_file .file :
407+ try :
408+ score_columns_metadata = json .load (score_columns_metadata_file .file )
409+ except json .JSONDecodeError as e :
410+ raise HTTPException (status_code = 400 , detail = f"Error decoding scores metadata file: { e } . Ensure the file is valid JSON." )
411+ else :
412+ score_columns_metadata = None
413+
414+ if count_columns_metadata_file and count_columns_metadata_file .file :
415+ try :
416+ count_columns_metadata = json .load (count_columns_metadata_file .file )
417+ except json .JSONDecodeError as e :
418+ raise HTTPException (status_code = 400 , detail = f"Error decoding counts metadata file: { e } . Ensure the file is valid JSON." )
419+ else :
420+ count_columns_metadata = None
421+
422+ return {
423+ "scores_df" : scores_df ,
424+ "counts_df" : counts_df ,
425+ "score_columns_metadata" : score_columns_metadata ,
426+ "count_columns_metadata" : count_columns_metadata ,
427+ }
428+
374429async def fetch_score_set_by_urn (
375430 db , urn : str , user : Optional [UserData ], owner_or_contributor : Optional [UserData ], only_published : bool
376431) -> ScoreSet :
@@ -1261,88 +1316,47 @@ async def upload_score_set_variant_data(
12611316 assert_permission (user_data , item , Action .UPDATE )
12621317 assert_permission (user_data , item , Action .SET_SCORES )
12631318
1264- # get existing column metadata for scores if no new file is provided
1265- if score_columns_metadata_file and score_columns_metadata_file .file :
1266- try :
1267- scores_column_metadata = json .load (score_columns_metadata_file .file )
1268- except json .JSONDecodeError as e :
1269- raise HTTPException (status_code = 400 , detail = f"Error decoding scores metadata file: { e } . Ensure the file is valid JSON." )
1270- else :
1271- scores_column_metadata = item .dataset_columns .get ("scores_column_metadata" ) if item .dataset_columns else None
1319+ score_set_variants_data = await parse_score_set_variants_uploads (
1320+ scores_file ,
1321+ counts_file ,
1322+ score_columns_metadata_file ,
1323+ count_columns_metadata_file ,
1324+ )
12721325
1273- # get existing column metadata for counts if no new file is provided
1274- if count_columns_metadata_file and count_columns_metadata_file .file :
1275- try :
1276- counts_column_metadata = json .load (count_columns_metadata_file .file )
1277- except json .JSONDecodeError as e :
1278- raise HTTPException (status_code = 400 , detail = f"Error decoding counts metadata file: { e } . Ensure the file is valid JSON." )
1279- else :
1280- counts_column_metadata = item .dataset_columns .get ("counts_column_metadata" ) if item .dataset_columns else None
1326+ for key , val in score_set_variants_data .items ():
1327+ logger .info (msg = f"{ key } : { val } " , extra = logging_context ())
1328+
1329+ # Although this is also updated within the variant creation job, update it here
1330+ # as well so that we can display the proper UI components (queue invocation delay
1331+ # races the score set GET request).
1332+ item .processing_state = ProcessingState .processing
1333+
1334+ logger .info (msg = "Enqueuing variant creation job." , extra = logging_context ())
1335+ jobId = await enqueue_variant_creation (
1336+ item = item ,
1337+ user_data = user_data ,
1338+ new_scores_df = score_set_variants_data ["scores_df" ],
1339+ new_counts_df = score_set_variants_data ["counts_df" ],
1340+ new_score_columns_metadata = score_set_variants_data ["score_columns_metadata" ],
1341+ new_count_columns_metadata = score_set_variants_data ["count_columns_metadata" ],
1342+ worker = worker
1343+ )
12811344
12821345
1283- if scores_file and scores_file .file :
1284- try :
1285- scores_df = csv_data_to_df (scores_file .file )
1286- counts_df = None
1287- if counts_file and counts_file .filename :
1288- counts_df = csv_data_to_df (counts_file .file )
1289- # Handle non-utf8 file problem.
1290- except UnicodeDecodeError as e :
1291- raise HTTPException (status_code = 400 , detail = f"Error decoding file: { e } . Ensure the file has correct values." )
1292- elif item .variants :
1293- assert item .dataset_columns is not None
1294- score_columns = [
1295- "hgvs_nt" ,
1296- "hgvs_splice" ,
1297- "hgvs_pro" ,
1298- ] + item .dataset_columns ["score_columns" ]
1299- count_columns = [
1300- "hgvs_nt" ,
1301- "hgvs_splice" ,
1302- "hgvs_pro" ,
1303- ] + item .dataset_columns ["count_columns" ]
1304-
1305- scores_df = pd .DataFrame (
1306- variants_to_csv_rows (item .variants , columns = score_columns , dtype = "score_data" )
1307- ).replace ("NA" , pd .NA )
1308-
1309- if item .dataset_columns ["count_columns" ]:
1310- counts_df = pd .DataFrame (
1311- variants_to_csv_rows (item .variants , columns = count_columns , dtype = "count_data" )
1312- ).replace ("NA" , pd .NA )
1313- else :
1314- counts_df = None
1346+ if jobId is None :
1347+ item .processing_state = ProcessingState .failed
1348+ logger .warning (msg = "Failed to enqueue variant creation job." , extra = logging_context ())
13151349 else :
1316- scores_df = pd .DataFrame ()
1317-
1318- if not scores_df .empty :
1319- # Although this is also updated within the variant creation job, update it here
1320- # as well so that we can display the proper UI components (queue invocation delay
1321- # races the score set GET request).
1322- item .processing_state = ProcessingState .processing
1323-
1324- # await the insertion of this job into the worker queue, not the job itself.
1325- job = await worker .enqueue_job (
1326- "create_variants_for_score_set" ,
1327- correlation_id_for_context (),
1328- item .id ,
1329- user_data .user .id ,
1330- scores_df ,
1331- counts_df ,
1332- scores_column_metadata ,
1333- counts_column_metadata ,
1334- )
1335- if job is not None :
1336- save_to_logging_context ({"worker_job_id" : job .job_id })
1337- logger .info (msg = "Enqueud variant creation job." , extra = logging_context ())
1350+ save_to_logging_context ({"worker_job_id" : jobId })
1351+ logger .info (msg = "Enqueued variant creation job." , extra = logging_context ())
13381352
13391353 db .add (item )
13401354 db .commit ()
13411355 db .refresh (item )
1356+
13421357 enriched_experiment = enrich_experiment_with_num_score_sets (item .experiment , user_data )
13431358 return score_set .ScoreSet .model_validate (item ).copy (update = {"experiment" : enriched_experiment })
13441359
1345-
13461360@router .post (
13471361 "/score-sets/{urn}/ranges/data" ,
13481362 response_model = score_set .ScoreSet ,
@@ -1419,7 +1433,7 @@ async def update_score_set_with_variants(
14191433
14201434 itemUpdateResult = await score_set_update (db = db , urn = urn , item_update = item_update , exclude_unset = True , user_data = user_data )
14211435 updatedItem = itemUpdateResult ["item" ]
1422- # should_create_variants = itemUpdateResult["should_create_variants"]
1436+ should_create_variants = itemUpdateResult ["should_create_variants" ]
14231437
14241438 # TODO handle uploaded files
14251439
@@ -1454,8 +1468,15 @@ async def update_score_set(
14541468 # races the score set GET request).
14551469 updatedItem .processing_state = ProcessingState .processing
14561470
1457- await enqueue_variant_creation (item = updatedItem , user_data = user_data , worker = worker )
1471+ logger .info (msg = "Enqueuing variant creation job." , extra = logging_context ())
1472+ jobId = await enqueue_variant_creation (item = updatedItem , user_data = user_data , worker = worker )
14581473
1474+ if jobId is None :
1475+ updatedItem .processing_state = ProcessingState .failed
1476+ logger .warning (msg = "Failed to enqueue variant creation job." , extra = logging_context ())
1477+ else :
1478+ save_to_logging_context ({"worker_job_id" : jobId })
1479+ logger .info (msg = "Enqueued variant creation job." , extra = logging_context ())
14591480 db .add (updatedItem )
14601481 db .commit ()
14611482 db .refresh (updatedItem )
0 commit comments