-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathgenerate_variant_report.py
More file actions
241 lines (211 loc) · 8.96 KB
/
generate_variant_report.py
File metadata and controls
241 lines (211 loc) · 8.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# 1st argument is the path to the snpedia SNP csv file
# 2nd argument is the path to the snpedia genotypes csv file
# 3rd argument is the path to the user's genotype file generated by 23andme
import pandas as pd
import re
import base64
import sys
# ignore settingwithcopy warning
import warnings
warnings.filterwarnings("ignore", category=pd.errors.DtypeWarning)
warnings.filterwarnings("ignore", category=pd.errors.SettingWithCopyWarning)
if len(sys.argv)==2:
user_path = sys.argv[1]
snp_path = 'data/snp_df.csv'
geno_path = 'data/geno_df.csv'
elif len(sys.argv)==4:
user_path = sys.argv[3]
snp_path = sys.argv[1]
geno_path = sys.argv[2]
else:
print(f"Usage: python {sys.argv[0]} <snp df> <geno df> <23andme file>")
exit()
# takes the html text of a SNPedia page and return the rsid, orientation,
# gene, and description of the SNP using regex parsing
def process_SNP_page(text):
rsid = re.findall(r'rsid=[^\$]+', text)
if len(rsid) == 0:
rsid = ""
else:
rsid = "rs" + rsid[0].split("=")[1]
orientation = re.findall(r'StabilizedOrientation=[^\$]+', text)
if not(orientation):
return rsid, "", "", ""
orientation = orientation[0].split("=")[1]
gene = re.findall(r'Gene=[^\$]+', text)
if gene == []:
gene = ""
else:
gene = gene[0].split('=')[1]
# remove special characters from the text using some complicated regex
description = re.sub(r'\{\{[^\}]+\}\}', '', text)
description = re.sub(r'\$+', '$', description) #newlines are represented as $ for now
description = re.sub(r'^\$', '', description) #remove $ from the beginning of the string
description = re.sub(r'\$$', '', description) #remove $ from the end of the string
return rsid, orientation, gene, description
# takes a csv file of SNPedia SNP pages and returns a dataframe of the
# rsid, orientation, description, and gene of each SNP
def process_SNP_dataset(filename):
# read the file with the SNP pages
df = pd.read_csv(filename, sep=',', quotechar="\"")
rsids = []
orientations = []
descriptions = []
genes = []
for row in df.itertuples():
rsid, orientation, gene, description = process_SNP_page(row.text)
rsids.append(rsid)
orientations.append(orientation)
genes.append(gene)
descriptions.append(description)
return pd.DataFrame(
{"rsid": rsids, "orientation": orientations,
"gene": genes, "description": descriptions})
# extracts a given field from a SNPedia page, using regex parsing
def extractField(name, text):
field = re.findall(r'{}=[^\$]+'.format(name), text)
if field == []:
return ""
else:
return field[0].split('=')[1]
# extracts information from a string formatted SNPedia page
def process_geno_page(text):
allele1 = extractField("allele1", text)
allele2 = extractField("allele2", text)
magnitude = extractField("magnitude", text)
rsid = extractField("rsid", text)
repute = extractField("repute", text)
summary = extractField("summary", text)
description = re.sub(r'\{\{[^\}]+\}\}', '', text)
description = re.sub(r'\$+', '$', description)
description = re.sub(r'^\$', '', description)
description = re.sub(r'\$$', '', description)
genotype = sorted([allele1, allele2])
genotype = "".join(genotype)
if genotype.startswith('='):
genotype = genotype[1:]
return genotype, magnitude, rsid, repute, summary, description
# processes a csv file containing three rows of SNPedia genotype pages (one
# for each genotype) and returns a dataframe of the rsid, genotype, magnitude,
# repute, summary, and description of each genotype
def process_geno_dataset(filename):
all_genos = pd.read_csv(filename, sep=',', quotechar="\"")
rsids = []
genos = [[], [], []]
mags = [[], [], []]
reputes = [[], [], []]
summaries = [[], [], []]
descriptions = [[], [], []]
# create a dataframe with all the information from the SNPedia genotype pages
for row in all_genos.itertuples():
r1 = process_geno_page(row.gt1)
genos[0].append(r1[0])
mags[0].append(r1[1])
reputes[0].append(r1[3])
summaries[0].append(r1[4])
descriptions[0].append(r1[5])
rsids.append("rs"+r1[2])
r2 = process_geno_page(row.gt2)
genos[1].append(r2[0])
mags[1].append(r2[1])
reputes[1].append(r2[3])
summaries[1].append(r2[4])
descriptions[1].append(r2[5])
if rsids[-1] == "rs":
rsids[-1] = "rs"+r2[2]
r3 = process_geno_page(row.gt3)
genos[2].append(r3[0])
mags[2].append(r3[1])
reputes[2].append(r3[3])
summaries[2].append(r3[4])
descriptions[2].append(r3[5])
if rsids[-1] == "rs":
rsids[-1] = "rs"+r3[2]
return pd.DataFrame({'rsid': rsids,
'geno1': genos[0], 'mag1': mags[0], 'repute1': reputes[0], 'summary1': summaries[0], 'description1': descriptions[0],
'geno2': genos[1], 'mag2': mags[1], 'repute2': reputes[1], 'summary2': summaries[1], 'description2': descriptions[1],
'geno3': genos[2], 'mag3': mags[2], 'repute3': reputes[2], 'summary3': summaries[2], 'description3': descriptions[2]})
# process the two files from SNPedia to get dataframes
snp_df = process_SNP_dataset(snp_path)
geno_df = process_geno_dataset(geno_path)
# read the user's genome file from 23andMe
genome = pd.read_csv(user_path, sep='\t')
mags = []
rsids = []
genos = []
mags = []
reputes = []
summaries = []
descriptions = []
geno_descriptions = []
chrs = []
pos = []
genes = []
# filter genome to only include SNPs that are in SNPedia
genome = genome[genome['rsid'].isin(snp_df['rsid'].values)]
# iterate through all the SNPs from SNPedia
for i, row in enumerate(geno_df.itertuples()):
# print progress
if i % 1000 == 0:
print(f"Processed {100*i/len(geno_df):.2f}% of data...")
if row.rsid in genome['rsid'].values: # if the SNP is genotyped by 23andme
user_snp_data = genome[genome['rsid'] == row.rsid].iloc[0]
snp_page_info = snp_df[snp_df['rsid'] == row.rsid].iloc[0]
user_geno = user_snp_data.genotype
# if the orientation is on the minus strand, the SNPs are reversed, so
# we need to reverse the genotype
if snp_page_info['orientation'] == 'minus':
user_geno = re.sub(r'[ATGC]', lambda x: {'A': 'T', 'T': 'A', 'G': 'C', 'C': 'G'}[
x.group()], user_geno)[::-1]
# we now go though all the genotypes from SNPedia and find the one that
# matches the user's genotype and append that information to our dataset
if user_geno == row.geno1:
mags.append(row.mag1)
reputes.append(row.repute1)
summaries.append(row.summary1)
geno_descriptions.append(row.description1)
elif user_geno == row.geno2:
mags.append(row.mag2)
reputes.append(row.repute2)
summaries.append(row.summary2)
geno_descriptions.append(row.description2)
elif user_geno == row.geno3:
mags.append(row.mag3)
reputes.append(row.repute3)
summaries.append(row.summary3)
geno_descriptions.append(row.description3)
# if the user's genotype was found, we also add information about the
# SNP (not specific to the user's genotype) to our dataset
if user_geno in [row.geno1, row.geno2, row.geno3]:
rsids.append(row.rsid)
genos.append(user_geno)
descriptions.append(snp_page_info['description'])
chrs.append(user_snp_data.chromosome)
pos.append(user_snp_data.position)
genes.append(snp_page_info.gene)
# turn the lists into a dataframe
df = pd.DataFrame({'rsid': rsids, 'geno': genos, 'mag': mags, 'repute': reputes, 'summary': summaries,
'description': descriptions, 'geno_description': geno_descriptions, 'chr': chrs,
'pos': pos, 'gene': genes})
# write it to a csv for the user
df.to_csv('snpedia_data.csv', index=False)
# a little data cleanup
df['mag'][df['mag'] == ""] = "0"
df["repute"][df["repute"] == ""] = "neutral"
df['mag'] = df['mag'].astype(float)
# pre-sorting by magnitude means the report doesn't have to sort it
df = df.sort_values(by=['mag'], ascending=False)
# read the template
with open("variant_template.html", "r") as f:
template = f.read()
# write the final output to the html file as base-64 encoded strings
with open('variant_report.html', 'w') as f:
# write the javascript to the html file
to_write = df[df['mag'] > 0].to_json(orient='records')
to_write = base64.b64encode(to_write.encode('utf-8')).decode('utf-8')
script_text = f"var data = JSON.parse(atob(\"{str(to_write)}\"));"
to_write = df[df['mag'] == 0].to_json(orient='records')
to_write = base64.b64encode(to_write.encode('utf-8')).decode('utf-8')
script_text += f"var zero_mag = JSON.parse(atob(\"{str(to_write)}\"));"
f.write(template.format(script_text=script_text))
print("Done!")