Skip to content

Commit 770b87a

Browse files
committed
add example ukb RAP convert notebook
1 parent df082c7 commit 770b87a

File tree

1 file changed

+235
-0
lines changed

1 file changed

+235
-0
lines changed
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"id": "c6606cb2",
6+
"metadata": {},
7+
"source": [
8+
"# General example for converting UK Biobank data into the research environemnt (RAP) to DELPHI format\n"
9+
]
10+
},
11+
{
12+
"cell_type": "markdown",
13+
"id": "ba4bfe9e",
14+
"metadata": {},
15+
"source": [
16+
"Notes:\n",
17+
" - This is setup to run as a notebook in a spark server job\n",
18+
" - it needs access to a dataset record (which you may need to explicitly specify for your project)\n",
19+
" - and also a cohort which is here refered to as \"full_cohort\" this may differ from your project\n",
20+
" - the token labels to ukb field ids should be modified to suit your needs\n",
21+
" - your should change the token ids to be integer and create a token_id to field_id (or disease icd10 name)\n",
22+
" - this output a file with all individuals included - you will want to split this into \"train.bin\" and \"val.bin\" "
23+
]
24+
},
25+
{
26+
"cell_type": "code",
27+
"execution_count": null,
28+
"id": "3c85eb2e",
29+
"metadata": {},
30+
"outputs": [],
31+
"source": [
32+
"import dxdata\n",
33+
"import pandas as pd\n",
34+
"import numpy as np\n",
35+
"from tqdm import tqdm\n",
36+
"from pyspark.sql.functions import lit, udf, col\n",
37+
"from pyspark.sql.types import DoubleType\n",
38+
"from functools import reduce\n",
39+
"import os\n",
40+
"\n",
41+
"def get_first_occ_fields(main_entity):\n",
42+
" fo_fields = []\n",
43+
" for field in main_entity.fields:\n",
44+
" parts = field.name.split(\"_\")\n",
45+
" if len(str(parts[0])) > 3:\n",
46+
" field_num = int(parts[0][1:])\n",
47+
" if (field_num >= 130000 and field_num <= 132604):\n",
48+
" if field.title.startswith(\"Date\"):\n",
49+
" fo_fields.append(field)\n",
50+
" return fo_fields\n",
51+
"\n",
52+
"def compute_age_from_eid_and_event(eid, event_date):\n",
53+
" dob = dob_lookup.get(eid)\n",
54+
" if dob is None or event_date is None:\n",
55+
" return None\n",
56+
" try:\n",
57+
" return (pd.to_datetime(event_date) - dob).days / 365.25\n",
58+
" except Exception:\n",
59+
" return None\n",
60+
"\n",
61+
"# Initialize dxdata engine\n",
62+
"engine = dxdata.connect(dialect=\"hive+pyspark\")\n",
63+
"\n",
64+
"project = os.getenv('DX_PROJECT_CONTEXT_ID')\n",
65+
"record = os.popen(\"dx find data --type Dataset --delimiter ',' | awk -F ',' '{print $5}'\").read().rstrip()\n",
66+
"# find what is presumed to be the relevant dataset record\n",
67+
"record = record.split('\\n')[0]\n",
68+
"\n",
69+
"DATASET_ID = project + \":\" + record\n",
70+
"dataset = dxdata.load_dataset(id=DATASET_ID)\n",
71+
"\n",
72+
"# we retrieve the priamry entity from the dataset\n",
73+
"main_entity = dataset.primary_entity\n",
74+
"\n",
75+
"# use cohort - change to whichever name:path you have for this object\n",
76+
"cohort = dxdata.load_cohort(folder=\"/\", name=\"full_cohort\")\n",
77+
"cohort_eids_df = engine.execute(cohort.sql)"
78+
]
79+
},
80+
{
81+
"cell_type": "code",
82+
"execution_count": null,
83+
"id": "361f018d-fe1c-40ec-b2f4-d43f7c9887ca",
84+
"metadata": {
85+
"tags": []
86+
},
87+
"outputs": [],
88+
"source": [
89+
"# hard coded sex dob and basic demographic data\n",
90+
"eid_f = main_entity.find_field(name=\"eid\")\n",
91+
"sex_f = main_entity.find_field(title=\"Sex\")\n",
92+
"year_f = main_entity.find_field(title=\"Year of birth\")\n",
93+
"month_f = main_entity.find_field(title=\"Month of birth\")\n",
94+
"death_f = dataset['death'].find_field(title=\"Date of death\")\n",
95+
"assessment_f = main_entity[\"p53_i0\"]\n",
96+
"bmi_f = main_entity[\"p21001_i0\"]\n",
97+
"smoking_f = main_entity[\"p1239_i0\"]\n",
98+
"alcohol_f = main_entity[\"p1558_i0\"]\n"
99+
]
100+
},
101+
{
102+
"cell_type": "code",
103+
"execution_count": 5,
104+
"id": "004328b8-abfe-4e63-b704-87efbe7138cd",
105+
"metadata": {
106+
"tags": []
107+
},
108+
"outputs": [],
109+
"source": [
110+
"# collect the cancer code enteries\n",
111+
"cancer_codes = {}\n",
112+
"cancer_codes['type'] = []\n",
113+
"cancer_codes['date'] = []\n",
114+
"for i in range(22):\n",
115+
" cancer_codes['type'].append(main_entity.find_field(name=\"p40006_i\" + str(i)))\n",
116+
" cancer_codes['date'].append(main_entity.find_field(name=\"p40005_i\" + str(i)))\n"
117+
]
118+
},
119+
{
120+
"cell_type": "code",
121+
"execution_count": null,
122+
"id": "0381027b-de51-494e-aa12-d3a0c126d35f",
123+
"metadata": {
124+
"tags": []
125+
},
126+
"outputs": [],
127+
"source": [
128+
"# Deal with first occrances and demographic data - this takes a little while\n",
129+
"fo_fields = get_first_occ_fields(main_entity)\n",
130+
"fields_to_get = [eid_f, sex_f, year_f, month_f, assessment_f, bmi_f, smoking_f, alcohol_f] + cancer_codes['type'] + cancer_codes['date'] + fo_fields + [death_f]\n",
131+
"\n",
132+
"df = main_entity.retrieve_fields(fields=fields_to_get, filter_sql=cohort.sql, engine=engine)\n",
133+
"df1 = df.select(\"eid\", \"p31\",\"p34\",\"p52\",\"p53_i0\",\"p21001_i0\",\"p1239_i0\",\"p1558_i0\").toPandas()\n",
134+
"\n",
135+
"dobf1 = df1[['p34', 'p52']]\n",
136+
"dobf1.columns = [\"YEAR\", \"MONTH\"]\n",
137+
"df1['dob'] = pd.to_datetime(dobf1.assign(DAY=1))\n",
138+
"df1['bmi_status'] = np.where(df1['p21001_i0']>28,5,np.where(df1['p21001_i0']>22,4,3))\n",
139+
"df1['smoking_status'] = np.where(df1['p1239_i0']==1,8,np.where(df1['p1239_i0']==2,7,6))\n",
140+
"df1['alcohol_status'] = np.where(df1['p1558_i0']==1,11,np.where(df1['p1558_i0'] < 4,10,9))"
141+
]
142+
},
143+
{
144+
"cell_type": "code",
145+
"execution_count": null,
146+
"id": "343290db-0f6c-4717-bc25-b821005c0c31",
147+
"metadata": {},
148+
"outputs": [],
149+
"source": [
150+
"# Prepare a pandas dictionary for fast eid to dob lookup\n",
151+
"dob_lookup = df1.set_index('eid')['dob'].to_dict()\n",
152+
"age_event_udf = udf(compute_age_from_eid_and_event, DoubleType())"
153+
]
154+
},
155+
{
156+
"cell_type": "code",
157+
"execution_count": null,
158+
"id": "d0a7b99c-7dd9-43c8-9766-b10256282117",
159+
"metadata": {},
160+
"outputs": [],
161+
"source": [
162+
"# Remove all NULL enteries for each ICD10 code seperately and combine into an overall spark table \n",
163+
"\n",
164+
"# deal with the tokens and dates \n",
165+
"d_all = df.select(\"eid\", cancer_codes['date'][0].name, cancer_codes['type'][0].name).where(df[cancer_codes['date'][0].name].isNotNull())\n",
166+
"d_all = d_all.withColumnRenamed(cancer_codes['date'][i].name, \"date\")\n",
167+
"d_all = d_all.withColumnRenamed(cancer_codes['type'][i].name, \"token\")\n",
168+
"d_all = d_all.withColumn(\"age\", age_event_udf(col(\"eid\"), col(\"date\")))\n",
169+
" \n",
170+
"for i in tqdm(1, len(cancer_codes['date'])):\n",
171+
" cf1 = df.select(\"eid\", cancer_codes['date'][i].name, cancer_codes['type'][i].name).where(df[cancer_codes['date'][i].name].isNotNull())\n",
172+
" cf1 = cf1.withColumnRenamed(cancer_codes['date'][i].name, \"date\")\n",
173+
" cf1 = cf1.withColumnRenamed(cancer_codes['type'][i].name, \"token\")\n",
174+
" cf1 = cf1.withColumn(\"age\", age_event_udf(col(\"eid\"), col(\"date\")))\n",
175+
" d_all = d_all.union(cf1)\n"
176+
]
177+
},
178+
{
179+
"cell_type": "code",
180+
"execution_count": null,
181+
"id": "edeb7116-d937-4e8a-90dd-fff159e64a07",
182+
"metadata": {},
183+
"outputs": [],
184+
"source": [
185+
"# Deal with all first occurances - this takes a long time\n",
186+
"for i in tqdm(range(0,len(fo_fields))):\n",
187+
" f = fo_fields[i]\n",
188+
" d = df.select(['eid', f.name]).where(df[f.name].isNotNull())\n",
189+
" d1 = d.withColumn(\"token\", lit(f.name))\n",
190+
" d1 = d1.withColumnRenamed(f.name, \"date\")\n",
191+
" d1 = d1.withColumn(\"age\", age_event_udf(col(\"eid\"), col(\"date\")))\n",
192+
" d_all = d_all.union(d1)\n"
193+
]
194+
},
195+
{
196+
"cell_type": "code",
197+
"execution_count": null,
198+
"id": "0770c44a-7585-42b3-8f48-cadba006c54f",
199+
"metadata": {},
200+
"outputs": [],
201+
"source": [
202+
"# Format, sort and write out to a file\n",
203+
"df_all = d_all.select(\"eid\", \"age\", \"token\").toPandas()\n",
204+
"df_all['age'] = df_all['age'] * 365.25\n",
205+
"data = np.array(df_all).squeeze()\n",
206+
"data[:,0] = data[:,0].astype(np.uint32)\n",
207+
"data[:,1] = data[:,1].astype(np.uint32)\n",
208+
"data = data[np.lexsort((data[:,1], data[:,0]))]\n",
209+
"\n",
210+
"data.tofile('all_records.bin')\n"
211+
]
212+
}
213+
],
214+
"metadata": {
215+
"kernelspec": {
216+
"display_name": "Python 3 (ipykernel)",
217+
"language": "python",
218+
"name": "python3"
219+
},
220+
"language_info": {
221+
"codemirror_mode": {
222+
"name": "ipython",
223+
"version": 3
224+
},
225+
"file_extension": ".py",
226+
"mimetype": "text/x-python",
227+
"name": "python",
228+
"nbconvert_exporter": "python",
229+
"pygments_lexer": "ipython3",
230+
"version": "3.12.9"
231+
}
232+
},
233+
"nbformat": 4,
234+
"nbformat_minor": 5
235+
}

0 commit comments

Comments
 (0)