1+ # Databricks notebook source
2+ # MAGIC %md
3+ # MAGIC
4+ # MAGIC #### Simulate pVCF
5+ # MAGIC
6+ # MAGIC Uses the 1000 genomes to simulate a project-level VCF at a larger scale and write to delta lake
7+ # MAGIC
8+ # MAGIC For now we manually define functions to handle hardy-weinberg allele frequency and multiallelic variants
9+ # MAGIC
10+ # MAGIC _TODO_ use sim1000G to get realistic families to test offset correction
11+
12+ # COMMAND ----------
13+
14+ # MAGIC %md
15+ # MAGIC ##### import libraries
16+
17+ # COMMAND ----------
18+
19+ import glow
20+ spark = glow .register (spark )
21+ import pyspark .sql .functions as fx
22+ from pyspark .sql .types import *
23+
24+ import random
25+ import string
26+ import pandas as pd
27+ import numpy as np
28+ import os
29+ from pathlib import Path
30+ import itertools
31+ from collections import Counter
32+
33+ # COMMAND ----------
34+
35+ # MAGIC %md
36+ # MAGIC ##### Data Generation Constants
37+
38+ # COMMAND ----------
39+
40+ #genotype matrix
41+ n_samples = 50000
42+ n_variants = 1000
43+
44+ # chromosomes for simulating pvcf
45+ random_seed = 42
46+ random .seed (random_seed )
47+ minor_allele_frequency_cutoff = 0.005
48+ chromosomes = ["21" , "22" ] #glow whole genome regression leave one chromosome out (loco) method requires at least two chromosomes
49+
50+ n_partitions = 5 #good heuristic is 20 variants per partition at 500k samples
51+
52+ # COMMAND ----------
53+
54+ # MAGIC %md
55+ # MAGIC ##### Data Storage Path Constants
56+
57+ # COMMAND ----------
58+
59+ user = dbutils .notebook .entry_point .getDbutils ().notebook ().getContext ().tags ().apply ('user' )
60+ dbfs_home_path_str = "dbfs:/home/{}/" .format (user )
61+ dbfs_fuse_home_path_str = "/dbfs/home/{}/" .format (user )
62+ dbfs_home_path = Path ("dbfs:/home/{}/" .format (user ))
63+ dbfs_fuse_home_path = Path ("/dbfs/home/{}/" .format (user ))
64+
65+ # COMMAND ----------
66+
67+ # MAGIC %md
68+ # MAGIC ##### simulate genotypes helper functions
69+
70+ # COMMAND ----------
71+
72+ def hardy_weinberg_principle (minor_allele_frequency ):
73+ """
74+ given a minor allele frequency for a biallelic variant,
75+ return an array of frequencies for each genotype
76+ """
77+ p = 1 - minor_allele_frequency
78+ q = minor_allele_frequency
79+ aa = p * p
80+ aA = 2 * p * q
81+ AA = q * q
82+ return [aa , aA , AA ]
83+
84+ def get_allele_frequencies (minor_allele_frequency ):
85+ """
86+ given a list of `minor_allele_frequency`
87+ add in the reference allele frequency and return a list of frequencies
88+ """
89+ ref_allele_frequency = 1 - sum (minor_allele_frequency )
90+ allele_frequencies = [ref_allele_frequency ] + minor_allele_frequency
91+ return allele_frequencies
92+
93+ def get_allele_frequency_combos (allele_frequencies ):
94+ """
95+ given a list of allele frequencies,
96+ return all combinations of frequencies
97+ """
98+ allele_frequency_product = list (itertools .product (allele_frequencies , allele_frequencies ))
99+ allele_freq_combos = [i [0 ]* i [1 ] for i in allele_frequency_product ]
100+ return allele_freq_combos
101+
102+ def get_genotype_calls_combinations (allele_frequencies ):
103+ """
104+ given a list of allele frequencies,
105+ return all possible genotype call combinations
106+ for example, if len(allele_frequencies) = 6, one combination may be [0,5]
107+ """
108+ genotypes = [i for i in range (len (allele_frequencies ))]
109+ genotype_combinations = list (itertools .product (genotypes , genotypes ))
110+ genotype_calls = [[i [0 ], i [1 ]] for i in genotype_combinations ]
111+ return genotype_calls
112+
113+ def generate_multiallelic_frequencies (minor_allele_frequency , n_samples ):
114+ """
115+ given a multiallelic variant with a list of `minor_allele_frequency`
116+ return an array of frequencies for each genotype for n_samples
117+ """
118+ allele_frequencies = get_allele_frequencies (minor_allele_frequency )
119+ allele_freq_combos = get_allele_frequency_combos (allele_frequencies )
120+ genotype_calls = get_genotype_calls_combinations (allele_frequencies )
121+ genotype_list = random .choices (genotype_calls , k = n_samples , weights = allele_freq_combos )
122+ return genotype_list
123+
124+ sample_id_list = [str (i ) for i in range (0 , n_samples )]
125+
126+ def simulate_genotypes (minor_allele_frequency , n_samples , sample_list = sample_id_list ):
127+ """
128+ given an array that contains the minor_allele_frequency as the first element,
129+ return a genotypes struct of length=n_samples that conforms to the Glow variant schema,
130+ with genotypes that are in Hardy Weinberg Equilibrium
131+ """
132+ n_samples = int (n_samples )
133+ frequencies = hardy_weinberg_principle (minor_allele_frequency [0 ])
134+ calls = [[0 ,0 ], [0 ,1 ], [1 ,1 ]]
135+ if len (minor_allele_frequency ) > 1 :
136+ genotype_list = generate_multiallelic_frequencies (minor_allele_frequency , n_samples )
137+ else :
138+ genotype_list = random .choices (calls , k = n_samples , weights = frequencies )
139+ new_lst = [list (x ) for x in zip (sample_id_list , genotype_list )]
140+ genotypes = [{"sampleId" :x , "calls" : y } for x , y in new_lst ]
141+ return genotypes
142+
143+ simulate_genotypes_udf = udf (simulate_genotypes , ArrayType (StructType ([
144+ StructField ("sampleId" , StringType (), True ),
145+ StructField ("calls" , ArrayType (IntegerType (), True ))
146+ ])))
147+
148+ # COMMAND ----------
149+
150+ # MAGIC %md
151+ # MAGIC ##### set paths
152+
153+ # COMMAND ----------
154+
155+ vcfs_path = str (dbfs_home_path / "genomics/data/1kg-vcfs-autosomes" )
156+ vcfs_path_local = str (dbfs_fuse_home_path / "genomics/data/1kg-vcfs-autosomes" )
157+
158+ os .environ ['vcfs_path_local' ] = vcfs_path_local
159+ output_vcf_delta = str (dbfs_home_path / f'genomics/data/delta/1kg_variants_pvcf.delta' )
160+ output_simulated_delta = str (dbfs_home_path / f'genomics/data/delta/simulate_{ n_samples } _samples_{ n_variants } _variants_pvcf.delta' )
161+ vcfs_path , vcfs_path_local , output_vcf_delta , output_simulated_delta
162+
163+ # COMMAND ----------
164+
165+ # MAGIC %md
166+ # MAGIC ##### download 1000G data for chrom 21 and 22
167+
168+ # COMMAND ----------
169+
170+ # MAGIC %sh
171+ # MAGIC declare -a chroms=("21" "22")
172+ # MAGIC
173+ # MAGIC for i in "${chroms[@]}"; do wget ftp://hgdownload.cse.ucsc.edu/gbdb/hg19/1000Genomes/phase3/ALL.chr$i.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz; done
174+ # MAGIC
175+ # MAGIC mkdir -p $vcfs_path_local
176+ # MAGIC
177+ # MAGIC cp ALL*.genotypes.vcf.gz $vcfs_path_local
178+
179+ # COMMAND ----------
180+
181+ # MAGIC %md
182+ # MAGIC ##### read 1000 Genomes VCF
183+
184+ # COMMAND ----------
185+
186+ vcf = spark .read .format ("vcf" ).load (vcfs_path ) \
187+ .drop ("genotypes" ) \
188+ .where (fx .col ("INFO_AF" )[0 ] >= minor_allele_frequency_cutoff )
189+ total_variants = vcf .count ()
190+ fraction = n_variants / total_variants
191+
192+ # COMMAND ----------
193+
194+ # MAGIC %md
195+ # MAGIC ##### checkpoint to delta
196+
197+ # COMMAND ----------
198+
199+ vcf .write .mode ("overwrite" ).format ("delta" ).save (output_vcf_delta )
200+
201+ # COMMAND ----------
202+
203+ vcf = spark .read .format ("delta" ).load (output_vcf_delta )
204+
205+ # COMMAND ----------
206+
207+ display (vcf )
208+
209+ # COMMAND ----------
210+
211+ simulated_vcf = vcf .sample (withReplacement = False , fraction = fraction ) \
212+ .repartition (n_partitions ) \
213+ .withColumn ("genotypes" , simulate_genotypes_udf (fx .col ("INFO_AF" ),
214+ fx .lit (n_samples )))
215+
216+ # COMMAND ----------
217+
218+ simulated_vcf .count ()
219+
220+ # COMMAND ----------
221+
222+ display (simulated_vcf .drop ("genotypes" ))
223+
224+ # COMMAND ----------
225+
226+ simulated_vcf .write .mode ("overwrite" ).format ("delta" ).save (output_simulated_delta )
227+
228+ # COMMAND ----------
229+
230+ # MAGIC %md
231+ # MAGIC ##### check output delta table
232+
233+ # COMMAND ----------
234+
235+ delta_vcf = spark .read .format ("delta" ).load (output_simulated_delta ).drop ("genotypes" )
236+
237+ # COMMAND ----------
238+
239+ display (delta_vcf )
240+
241+ # COMMAND ----------
242+
243+ display (delta_vcf .groupBy ("contigName" ).count ())
0 commit comments