55# MAGIC
66# MAGIC By running glow transform functions `split_multiallelics`, `mean_substitute`, and `genotype_states`
77# MAGIC
8+ # MAGIC Important: please checkpoint to parquet/delta after each step in this process
9+ # MAGIC
810# MAGIC Then filter,
911# MAGIC
1012# MAGIC 1. monomorphic variants using `array_distinct`
1315
1416# COMMAND ----------
1517
16- # MAGIC %md
17- # MAGIC ##### adjust spark confs
18- # MAGIC
19- # MAGIC see [split-multiallelics](https://glow.readthedocs.io/en/latest/etl/variant-splitter.html#split-multiallelics) docs
18+ # MAGIC %md ##### setup constants
2019
2120# COMMAND ----------
2221
23- spark . conf . set ( "spark.sql.codegen.wholeStage" , False )
22+ # MAGIC %run ../0_setup_constants_glow
2423
2524# COMMAND ----------
2625
27- # MAGIC %md ##### setup constants
26+ # MAGIC %run ../2_setup_metadata
2827
2928# COMMAND ----------
3029
31- # MAGIC %run ../0_setup_constants_glow
30+ # MAGIC %md
31+ # MAGIC ##### adjust spark confs
32+ # MAGIC
33+ # MAGIC see [split-multiallelics](https://glow.readthedocs.io/en/latest/etl/variant-splitter.html#split-multiallelics) docs
3234
3335# COMMAND ----------
3436
35- # MAGIC %run ../2_setup_metadata
37+ spark .conf .set ("spark.sql.codegen.wholeStage" , False )
38+
39+ # COMMAND ----------
40+
41+ # MAGIC %md
42+ # MAGIC ##### Define QC steps
3643
3744# COMMAND ----------
3845
3946method = 'quality_control'
40- step1 = 'glow_qc_transformers'
41- step2 = 'call_summary_stats'
42- step3 = 'variant_filter'
43- step4 = 'sample_filter'
47+ step1 = 'split_multiallelics'
48+ step2 = 'left_normalize_indels'
49+ step3 = 'mean_substitute'
50+ step4 = 'call_summary_stats'
51+ step5 = 'variant_filter'
4452library = 'glow'
4553datetime = datetime .now (pytz .timezone ('US/Pacific' ))
4654
@@ -87,19 +95,85 @@ def calculate_pval_bonferroni_cutoff(df, cutoff=0.05):
8795# COMMAND ----------
8896
8997# MAGIC %md
90- # MAGIC ##### prepare simulated delta table for GWAS using glow transformers
98+ # MAGIC ##### split simulated data
99+ # MAGIC
100+ # MAGIC 1. biallelic SNPs
101+ # MAGIC 2. multiallelic variants
102+ # MAGIC 3. indels
91103
92104# COMMAND ----------
93105
94- start_time_step1 = time .time ()
95106delta_vcf = spark .read .format ("delta" ).load (output_delta )
96- delta_gwas_vcf = (glow .transform ('split_multiallelics' , delta_vcf ). \
97- withColumn ('values' , glow .mean_substitute (glow .genotype_states ('genotypes' ))). \
107+
108+ # COMMAND ----------
109+
110+ display (delta_vcf .drop ("genotypes" ))
111+
112+ # COMMAND ----------
113+
114+ # MAGIC %md
115+ # MAGIC ##### split multiallelics
116+ # MAGIC
117+ # MAGIC write out biallelics and split multiallelics
118+
119+ # COMMAND ----------
120+
121+ start_time = time .time ()
122+ multiallelic_df = delta_vcf .where (fx .size (fx .col ("alternateAlleles" )) > 1 )
123+ multiallelic_df = glow .transform ('split_multiallelics' , multiallelic_df )
124+ bialleleic_df = delta_vcf .where (fx .size (fx .col ("alternateAlleles" )) == 1 )
125+
126+ multiallelic_df .write .mode ("overwrite" ).format ("delta" ).save (output_delta_split_multiallelics )
127+ bialleleic_df .write .mode ("append" ).format ("delta" ).save (output_delta_split_multiallelics )
128+
129+ end_time = time .time ()
130+ log_metadata (datetime , n_samples , n_variants , 0 , 0 , 'etl' , step1 , library , spark_version , node_type_id , n_workers , start_time , end_time , run_metadata_delta_path )
131+
132+ # COMMAND ----------
133+
134+ # MAGIC %md
135+ # MAGIC ##### extract indels and left-normalize
136+
137+ # COMMAND ----------
138+
139+ start_time = time .time ()
140+ split_multiallelic_df = spark .read .format ("delta" ).load (output_delta_split_multiallelics )
141+ indels_df = split_multiallelic_df .where ((fx .length ("referenceAllele" ) > 1 ) | (fx .length (fx .col ("alternateAlleles" )[0 ]) > 1 ))
142+ snps_df = split_multiallelic_df .where ((fx .length ("referenceAllele" ) == 1 ) & (fx .length (fx .col ("alternateAlleles" )[0 ]) == 1 ))
143+
144+ normalized_variants_df = glow .transform (
145+ "normalize_variants" ,
146+ indels_df ,
147+ reference_genome_path = reference_genome_path
148+ )
149+
150+ num_variants_changed = normalized_variants_df .where (fx .col ("normalizationStatus.changed" ) == True ).count ()
151+
152+ print ("number of variants left normalized = " + str (num_variants_changed ))
153+
154+ snps_df .write .mode ("overwrite" ).format ("delta" ).save (output_delta_split_multiallelics_normalize )
155+ normalized_variants_df .drop ("normalizationStatus" ). \
156+ write .mode ("append" ).format ("delta" ). \
157+ save (output_delta_split_multiallelics_normalize )
158+
159+ end_time = time .time ()
160+ log_metadata (datetime , n_samples , n_variants , 0 , 0 , 'etl' , step2 , library , spark_version , node_type_id , n_workers , start_time , end_time , run_metadata_delta_path )
161+
162+ # COMMAND ----------
163+
164+ # MAGIC %md
165+ # MAGIC ##### prepare simulated delta table for GWAS using glow transformers
166+
167+ # COMMAND ----------
168+
169+ start_time = time .time ()
170+ delta_vcf = spark .read .format ("delta" ).load (output_delta_split_multiallelics_normalize )
171+ delta_gwas_vcf = (delta_vcf .withColumn ('values' , glow .mean_substitute (glow .genotype_states ('genotypes' ))). \
98172 filter (fx .size (fx .array_distinct ('values' )) > 1 )
99173 )
100174delta_gwas_vcf .write .mode ("overwrite" ).format ("delta" ).save (output_delta_glow_qc_transformers )
101- end_time_step1 = time .time ()
102- log_metadata (datetime , n_samples , n_variants , 0 , 0 , 'etl' , step1 , library , spark_version , node_type_id , n_workers , start_time_step1 , end_time_step1 , run_metadata_delta_path )
175+ end_time = time .time ()
176+ log_metadata (datetime , n_samples , n_variants , 0 , 0 , 'etl' , step3 , library , spark_version , node_type_id , n_workers , start_time , end_time , run_metadata_delta_path )
103177
104178# COMMAND ----------
105179
@@ -108,7 +182,7 @@ def calculate_pval_bonferroni_cutoff(df, cutoff=0.05):
108182
109183# COMMAND ----------
110184
111- start_time_step2 = time .time ()
185+ start_time = time .time ()
112186delta_gwas_vcf = spark .read .format ("delta" ).load (output_delta_glow_qc_transformers )
113187
114188summary_stats_df = delta_gwas_vcf .select (
@@ -119,8 +193,8 @@ def calculate_pval_bonferroni_cutoff(df, cutoff=0.05):
119193 withColumn ("log10pValueHwe" , fx .when (fx .col ("pValueHwe" ) == 0 , 26 ).otherwise (- fx .log10 (fx .col ("pValueHwe" ))))
120194summary_stats_df .drop ("genotypes" ).write .mode ("overwrite" ).format ("delta" ).save (output_delta_glow_qc_variants )
121195
122- end_time_step2 = time .time ()
123- log_metadata (datetime , n_samples , n_variants , 0 , 0 , method , step2 , library , spark_version , node_type_id , n_workers , start_time_step2 , end_time_step2 , run_metadata_delta_path )
196+ end_time = time .time ()
197+ log_metadata (datetime , n_samples , n_variants , 0 , 0 , method , step4 , library , spark_version , node_type_id , n_workers , start_time , end_time , run_metadata_delta_path )
124198
125199# COMMAND ----------
126200
@@ -145,7 +219,7 @@ def calculate_pval_bonferroni_cutoff(df, cutoff=0.05):
145219
146220# COMMAND ----------
147221
148- start_time_step3 = time .time ()
222+ start_time = time .time ()
149223variant_filter_df = spark .read .format ("delta" ).load (output_delta_glow_qc_variants )
150224
151225variant_filter_df = summary_stats_df .where ((fx .col ("alleleFrequencies" ).getItem (0 ) >= allele_freq_cutoff ) &
@@ -155,8 +229,8 @@ def calculate_pval_bonferroni_cutoff(df, cutoff=0.05):
155229
156230variant_filter_df .write .option ("overwriteSchema" , "true" ).mode ("overwrite" ).format ("delta" ).save (output_delta_transformed )
157231
158- end_time_step3 = time .time ()
159- log_metadata (datetime , n_samples , n_variants , 0 , 0 , method , step3 , library , spark_version , node_type_id , n_workers , start_time_step3 , end_time_step3 , run_metadata_delta_path )
232+ end_time = time .time ()
233+ log_metadata (datetime , n_samples , n_variants , 0 , 0 , method , step5 , library , spark_version , node_type_id , n_workers , start_time , end_time , run_metadata_delta_path )
160234
161235# COMMAND ----------
162236
0 commit comments