44import logging
55from operator import attrgetter
66import re
7- from typing import Any , BinaryIO , Iterable , Optional , TYPE_CHECKING , Sequence , Literal
7+ from typing import Any , BinaryIO , Iterable , List , Optional , TYPE_CHECKING , Sequence , Literal
88
99from mavedb .models .mapped_variant import MappedVariant
1010import numpy as np
@@ -501,12 +501,13 @@ def find_publish_or_private_superseded_score_set_tail(
501501def get_score_set_variants_as_csv (
502502 db : Session ,
503503 score_set : ScoreSet ,
504- data_type : Literal ["scores" , "counts" ],
504+ namespaces : List [Literal ["scores" , "counts" ]],
505+ namespaced : Optional [bool ] = None ,
505506 start : Optional [int ] = None ,
506507 limit : Optional [int ] = None ,
507508 drop_na_columns : Optional [bool ] = None ,
508- include_custom_columns : bool = True ,
509- include_post_mapped_hgvs : bool = False ,
509+ include_custom_columns : Optional [ bool ] = True ,
510+ include_post_mapped_hgvs : Optional [ bool ] = False ,
510511) -> str :
511512 """
512513 Get the variant data from a score set as a CSV string.
@@ -517,8 +518,10 @@ def get_score_set_variants_as_csv(
517518 The database session to use.
518519 score_set : ScoreSet
519520 The score set to get the variants from.
520- data_type : {'scores', 'counts'}
521- The type of data to get. Either 'scores' or 'counts'.
521+ namespaces : List[Literal["scores", "counts"]]
522+ The namespaces for data. Now there are only scores and counts. There will be ClinVar and gnomAD.
523+ namespaced: Optional[bool] = None
524+ Whether namespace the columns or not.
522525 start : int, optional
523526 The index to start from. If None, starts from the beginning.
524527 limit : int, optional
@@ -537,20 +540,26 @@ def get_score_set_variants_as_csv(
537540 The CSV string containing the variant data.
538541 """
539542 assert type (score_set .dataset_columns ) is dict
540- custom_columns_set = "score_columns" if data_type == "scores" else "count_columns"
541- type_column = "score_data" if data_type == "scores" else "count_data"
542-
543- columns = [ "accession" , "hgvs_nt" , "hgvs_splice" , "hgvs_pro" ]
543+ namespaced_score_set_columns : dict [ str , list [ str ]] = {
544+ "core" : [ "accession" , "hgvs_nt" , "hgvs_splice" , "hgvs_pro" ],
545+ "mavedb" : [],
546+ }
544547 if include_post_mapped_hgvs :
545- columns .append ("post_mapped_hgvs_g" )
546- columns .append ("post_mapped_hgvs_p" )
547-
548+ namespaced_score_set_columns ["mavedb" ].append ("post_mapped_hgvs_g" )
549+ namespaced_score_set_columns ["mavedb" ].append ("post_mapped_hgvs_p" )
550+ for namespace in namespaces :
551+ namespaced_score_set_columns [namespace ] = []
548552 if include_custom_columns :
549- custom_columns = [str (x ) for x in list (score_set .dataset_columns .get (custom_columns_set , []))]
550- columns += custom_columns
551- elif data_type == "scores" :
552- columns .append (REQUIRED_SCORE_COLUMN )
553-
553+ if "scores" in namespaced_score_set_columns :
554+ namespaced_score_set_columns ["scores" ] = [
555+ col for col in [str (x ) for x in list (score_set .dataset_columns .get ("score_columns" , []))]
556+ ]
557+ if "counts" in namespaced_score_set_columns :
558+ namespaced_score_set_columns ["counts" ] = [
559+ col for col in [str (x ) for x in list (score_set .dataset_columns .get ("count_columns" , []))]
560+ ]
561+ elif "scores" in namespaced_score_set_columns :
562+ namespaced_score_set_columns ["scores" ].append (REQUIRED_SCORE_COLUMN )
554563 variants : Sequence [Variant ] = []
555564 mappings : Optional [list [Optional [MappedVariant ]]] = None
556565
@@ -587,13 +596,22 @@ def get_score_set_variants_as_csv(
587596 if limit :
588597 variants_query = variants_query .limit (limit )
589598 variants = db .scalars (variants_query ).all ()
599+ rows_data = variants_to_csv_rows (variants , columns = namespaced_score_set_columns , namespaced = namespaced , mappings = mappings ) # type: ignore
600+ rows_columns = [
601+ (
602+ f"{ namespace } .{ col } "
603+ if (namespaced and namespace not in ["core" , "mavedb" ])
604+ else (f"mavedb.{ col } " if namespaced and namespace == "mavedb" else col )
605+ )
606+ for namespace , cols in namespaced_score_set_columns .items ()
607+ for col in cols
608+ ]
590609
591- rows_data = variants_to_csv_rows (variants , columns = columns , dtype = type_column , mappings = mappings ) # type: ignore
592610 if drop_na_columns :
593- rows_data , columns = drop_na_columns_from_csv_file_rows (rows_data , columns )
611+ rows_data , rows_columns = drop_na_columns_from_csv_file_rows (rows_data , rows_columns )
594612
595613 stream = io .StringIO ()
596- writer = csv .DictWriter (stream , fieldnames = columns , quoting = csv .QUOTE_MINIMAL )
614+ writer = csv .DictWriter (stream , fieldnames = rows_columns , quoting = csv .QUOTE_MINIMAL )
597615 writer .writeheader ()
598616 writer .writerows (rows_data )
599617 return stream .getvalue ()
@@ -631,9 +649,9 @@ def is_null(value):
631649
632650def variant_to_csv_row (
633651 variant : Variant ,
634- columns : list [str ],
635- dtype : str ,
652+ columns : dict [str , list [str ]],
636653 mapping : Optional [MappedVariant ] = None ,
654+ namespaced : Optional [bool ] = None ,
637655 na_rep = "NA" ,
638656) -> dict [str , Any ]:
639657 """
@@ -645,17 +663,18 @@ def variant_to_csv_row(
645663 List of variants.
646664 columns : list[str]
647665 Columns to serialize.
648- dtype : str, {'scores', 'counts'}
649- The type of data requested. Either the 'score_data' or 'count_data' .
666+ namespaced: Optional[bool] = None
667+ Namespace the columns or not .
650668 na_rep : str
651669 String to represent null values.
652670
653671 Returns
654672 -------
655673 dict[str, Any]
656674 """
657- row = {}
658- for column_key in columns :
675+ row : dict [str , Any ] = {}
676+ # Handle each column key explicitly as part of its namespace.
677+ for column_key in columns .get ("core" , []):
659678 if column_key == "hgvs_nt" :
660679 value = str (variant .hgvs_nt )
661680 elif column_key == "hgvs_pro" :
@@ -664,7 +683,13 @@ def variant_to_csv_row(
664683 value = str (variant .hgvs_splice )
665684 elif column_key == "accession" :
666685 value = str (variant .urn )
667- elif column_key == "post_mapped_hgvs_g" :
686+ if is_null (value ):
687+ value = na_rep
688+
689+ # export columns in the `core` namespace without a namespace
690+ row [column_key ] = value
691+ for column_key in columns .get ("mavedb" , []):
692+ if column_key == "post_mapped_hgvs_g" :
668693 hgvs_str = get_hgvs_from_post_mapped (mapping .post_mapped ) if mapping and mapping .post_mapped else None
669694 if hgvs_str is not None and is_hgvs_g (hgvs_str ):
670695 value = hgvs_str
@@ -676,21 +701,28 @@ def variant_to_csv_row(
676701 value = hgvs_str
677702 else :
678703 value = ""
679- else :
680- parent = variant .data .get (dtype ) if variant .data else None
681- value = str (parent .get (column_key )) if parent else na_rep
682704 if is_null (value ):
683705 value = na_rep
684- row [column_key ] = value
685-
706+ key = f"mavedb.{ column_key } " if namespaced else column_key
707+ row [key ] = value
708+ for column_key in columns .get ("scores" , []):
709+ parent = variant .data .get ("score_data" ) if variant .data else None
710+ value = str (parent .get (column_key )) if parent else na_rep
711+ key = f"scores.{ column_key } " if namespaced else column_key
712+ row [key ] = value
713+ for column_key in columns .get ("counts" , []):
714+ parent = variant .data .get ("count_data" ) if variant .data else None
715+ value = str (parent .get (column_key )) if parent else na_rep
716+ key = f"counts.{ column_key } " if namespaced else column_key
717+ row [key ] = value
686718 return row
687719
688720
689721def variants_to_csv_rows (
690722 variants : Sequence [Variant ],
691- columns : list [str ],
692- dtype : str ,
723+ columns : dict [str , list [str ]],
693724 mappings : Optional [Sequence [Optional [MappedVariant ]]] = None ,
725+ namespaced : Optional [bool ] = None ,
694726 na_rep = "NA" ,
695727) -> Iterable [dict [str , Any ]]:
696728 """
@@ -702,8 +734,8 @@ def variants_to_csv_rows(
702734 List of variants.
703735 columns : list[str]
704736 Columns to serialize.
705- dtype : str, {'scores', 'counts'}
706- The type of data requested. Either the 'score_data' or 'count_data' .
737+ namespaced: Optional[bool] = None
738+ Namespace the columns or not .
707739 na_rep : str
708740 String to represent null values.
709741
@@ -713,10 +745,10 @@ def variants_to_csv_rows(
713745 """
714746 if mappings is not None :
715747 return map (
716- lambda pair : variant_to_csv_row (pair [0 ], columns , dtype , mapping = pair [1 ], na_rep = na_rep ),
748+ lambda pair : variant_to_csv_row (pair [0 ], columns , mapping = pair [1 ], namespaced = namespaced , na_rep = na_rep ),
717749 zip (variants , mappings ),
718750 )
719- return map (lambda v : variant_to_csv_row (v , columns , dtype , na_rep = na_rep ), variants )
751+ return map (lambda v : variant_to_csv_row (v , columns , namespaced = namespaced , na_rep = na_rep ), variants )
720752
721753
722754def find_meta_analyses_for_score_sets (db : Session , urns : list [str ]) -> list [ScoreSet ]:
0 commit comments