1010
1111def main (argv ):
1212 """
13-
13+
1414 :param list argv: the list elements should be:
1515 [1]: Parquet file path with raw external_model_ids configuration
1616 [2]: Parquet file path with initial df with model_information data
@@ -33,8 +33,12 @@ def main(argv):
3333 output_path = argv [9 ]
3434
3535 spark = SparkSession .builder .getOrCreate ()
36- raw_external_model_ids_df = spark .read .parquet (raw_external_model_ids_resources_parquet_path )
37- initial_model_information_df = spark .read .parquet (initial_model_information_parquet_path )
36+ raw_external_model_ids_df = spark .read .parquet (
37+ raw_external_model_ids_resources_parquet_path
38+ )
39+ initial_model_information_df = spark .read .parquet (
40+ initial_model_information_parquet_path
41+ )
3842 publication_group_df = spark .read .parquet (publication_group_parquet_path )
3943 accessibility_group_df = spark .read .parquet (accessibility_group_parquet_path )
4044 contact_people_df = spark .read .parquet (contact_people_parquet_path )
@@ -50,21 +54,22 @@ def main(argv):
5054 contact_people_df ,
5155 contact_form_df ,
5256 source_database_df ,
53- license_df )
57+ license_df ,
58+ )
5459
5560 model_df .write .mode ("overwrite" ).parquet (output_path )
5661
5762
5863def transform_model (
59- raw_external_model_ids_df : DataFrame ,
60- initial_model_information_df : DataFrame ,
61- publication_group_df : DataFrame ,
62- accessibility_group_df : DataFrame ,
63- contact_people_df : DataFrame ,
64- contact_form_df : DataFrame ,
65- source_database_df : DataFrame ,
66- license_df : DataFrame ) -> DataFrame :
67-
64+ raw_external_model_ids_df : DataFrame ,
65+ initial_model_information_df : DataFrame ,
66+ publication_group_df : DataFrame ,
67+ accessibility_group_df : DataFrame ,
68+ contact_people_df : DataFrame ,
69+ contact_form_df : DataFrame ,
70+ source_database_df : DataFrame ,
71+ license_df : DataFrame ,
72+ ) -> DataFrame :
6873 model_df = initial_model_information_df
6974 model_df = set_fk_publication_group (model_df , publication_group_df )
7075 model_df = set_fk_accessibility_group (model_df , accessibility_group_df )
@@ -73,51 +78,85 @@ def transform_model(
7378 model_df = set_fk_source_database (model_df , source_database_df )
7479 model_df = set_fk_license (model_df , license_df )
7580 model_df = add_model_links (model_df , raw_external_model_ids_df )
76-
81+
7782 model_df = get_columns_expected_order (model_df )
7883
7984 return model_df
8085
8186
82- def set_fk_publication_group (model_df : DataFrame , publication_group_df : DataFrame ) -> DataFrame :
87+ def set_fk_publication_group (
88+ model_df : DataFrame , publication_group_df : DataFrame
89+ ) -> DataFrame :
8390 model_df = transform_to_fk (
84- model_df , publication_group_df , "publications" , "pubmed_ids" , "id" , "publication_group_id" )
91+ model_df ,
92+ publication_group_df ,
93+ "publications" ,
94+ "pubmed_ids" ,
95+ "id" ,
96+ "publication_group_id" ,
97+ )
8598 return model_df
8699
87100
88- def set_fk_accessibility_group (model_df : DataFrame , accessibility_group_df : DataFrame ) -> DataFrame :
89- model_df = model_df .withColumnRenamed ("europdx_access_modality" , "europdx_access_modalities" )
90- accessibility_group_df = accessibility_group_df .withColumnRenamed ("id" , "accessibility_group_id" )
101+ def set_fk_accessibility_group (
102+ model_df : DataFrame , accessibility_group_df : DataFrame
103+ ) -> DataFrame :
104+ model_df = model_df .withColumnRenamed (
105+ "europdx_access_modality" , "europdx_access_modalities"
106+ )
107+ accessibility_group_df = accessibility_group_df .withColumnRenamed (
108+ "id" , "accessibility_group_id"
109+ )
91110 model_df = model_df .join (
92111 accessibility_group_df ,
93- on = ['accessibility' , 'europdx_access_modalities' ], how = 'left' )
112+ on = ["accessibility" , "europdx_access_modalities" ],
113+ how = "left" ,
114+ )
94115 return model_df
95116
96117
97- def set_fk_contact_people (model_df : DataFrame , contact_people_df : DataFrame ) -> DataFrame :
98- contact_people_df = contact_people_df .select ("id" , "email_list" , "name_list" , Constants .DATA_SOURCE_COLUMN )
118+ def set_fk_contact_people (
119+ model_df : DataFrame , contact_people_df : DataFrame
120+ ) -> DataFrame :
121+ contact_people_df = contact_people_df .select (
122+ "id" , "email_list" , "name_list" , Constants .DATA_SOURCE_COLUMN
123+ )
99124 model_df = model_df .withColumnRenamed ("email" , "email_list" )
100125 model_df = model_df .withColumnRenamed ("name" , "name_list" )
101126 contact_people_df = contact_people_df .withColumnRenamed ("id" , "contact_people_id" )
102127
103- cond = [model_df .name_list .eqNullSafe (contact_people_df .name_list ),
104- model_df .email_list .eqNullSafe (contact_people_df .email_list ),
105- model_df [Constants .DATA_SOURCE_COLUMN ] == contact_people_df [Constants .DATA_SOURCE_COLUMN ]]
128+ cond = [
129+ model_df .name_list .eqNullSafe (contact_people_df .name_list ),
130+ model_df .email_list .eqNullSafe (contact_people_df .email_list ),
131+ model_df [Constants .DATA_SOURCE_COLUMN ]
132+ == contact_people_df [Constants .DATA_SOURCE_COLUMN ],
133+ ]
106134
107- model_df = model_df .join (contact_people_df , cond , how = 'left' )
135+ model_df = model_df .join (contact_people_df , cond , how = "left" )
136+ model_df = model_df .drop (contact_people_df .email_list )
137+ model_df = model_df .drop (contact_people_df .name_list )
108138 model_df = model_df .drop (contact_people_df [Constants .DATA_SOURCE_COLUMN ])
109139 return model_df
110140
111141
112142def set_fk_contact_form (model_df : DataFrame , contact_form_df : DataFrame ) -> DataFrame :
113143 model_df = transform_to_fk (
114- model_df , contact_form_df , "form_url" , "form_url" , "id" , "contact_form_id" )
144+ model_df , contact_form_df , "form_url" , "form_url" , "id" , "contact_form_id"
145+ )
115146 return model_df
116147
117148
118- def set_fk_source_database (model_df : DataFrame , source_database_df : DataFrame ) -> DataFrame :
149+ def set_fk_source_database (
150+ model_df : DataFrame , source_database_df : DataFrame
151+ ) -> DataFrame :
119152 model_df = transform_to_fk (
120- model_df , source_database_df , "database_url" , "database_url" , "id" , "source_database_id" )
153+ model_df ,
154+ source_database_df ,
155+ "database_url" ,
156+ "database_url" ,
157+ "id" ,
158+ "source_database_id" ,
159+ )
121160 return model_df
122161
123162
@@ -126,12 +165,16 @@ def set_fk_license(model_df: DataFrame, license_df: DataFrame) -> DataFrame:
126165 license_df = license_df .withColumnRenamed ("name" , "license_name" )
127166 license_df = license_df .withColumnRenamed ("url" , "license_url" )
128167
129- model_df = model_df .join (license_df , model_df .license == license_df .license_name , how = 'left' )
168+ model_df = model_df .join (
169+ license_df , model_df .license == license_df .license_name , how = "left"
170+ )
130171 return model_df
131172
132173
133174def get_provider_type_from_sharing (raw_sharing_df : DataFrame ) -> DataFrame :
134- provider_type_df = raw_sharing_df .select (format_name_column ("provider_type" ).alias ("name" ))
175+ provider_type_df = raw_sharing_df .select (
176+ format_name_column ("provider_type" ).alias ("name" )
177+ )
135178 provider_type_df = provider_type_df .select ("name" ).where ("name is not null" )
136179 provider_type_df = provider_type_df .drop_duplicates ()
137180 return provider_type_df
@@ -179,8 +222,9 @@ def get_columns_expected_order(model_df: DataFrame) -> DataFrame:
179222 "drug_concentration" ,
180223 "other_model_links" ,
181224 "date_submitted" ,
182- "model_availability"
183- )
225+ "model_availability" ,
226+ "email_list" ,
227+ )
184228
185229
186230if __name__ == "__main__" :
0 commit comments