Skip to content

Commit 9a14795

Browse files
authored
474 update scaffold to consume new tabular data using the new ingestion model (#480)
* Pipeline updated to consume performance data using the new ingestion model and necessary changes applied to pipeline modules * Referenced comparators by IRIs in signal detectors * comparators are passed to signal detectors instead of merging them with performance data. tests updated. * API and batch CLI are updated to consume new MPOG json inputs that match new ingestion model. script added that converts old MPOG formatted json inouts into new json inoputs * signal detectors updated to work with periodical data. meas_period is added as an environment variable with default 1 for the length of periods in month.
1 parent 47fa7fd commit 9a14795

34 files changed

+2014
-456
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,11 @@ Local file path or URL (see .env.remote for github URL formats). All are require
168168

169169
- default: None
170170

171+
#### meas_period: Defines the length of periods in month for the input data
172+
173+
- default: 1
174+
- note: for example for a data that is collected quarterly this needs to be set to 3
175+
171176
### Scoring
172177

173178
These control the elements of the scoring algorithm.

bulk-up/README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,4 +209,7 @@ INPUT=~/dev/inputs_2024-05-20/history.csv python src/bulk_up/stats.py
209209
210210
```
211211
cat ~/dev/inputs_2024-05-20/history.csv | python src/bulk_up/stats.py
212-
```
212+
```
213+
214+
### json_to_new_json.py
215+
Script that rewrites MPOG json inputs into MPOG data new version based on new ingestion model.
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
import json
2+
import os
3+
import re
4+
import uuid
5+
from pathlib import Path
6+
7+
import pandas as pd
8+
9+
# Path to the directory containing input files
10+
os.environ.pop("INPUT_DIR", None)
11+
INPUT_DIR = os.environ.setdefault(
12+
"INPUT_DIR",
13+
"/home/faridsei/dev/code/scaffold/bulk-up/random_performance_data/data_with_h",
14+
)
15+
16+
output_dir = Path("random_performance_data/data_with_h_new_format")
17+
output_dir.mkdir(exist_ok=True)
18+
19+
20+
def extract_number(filename):
21+
# Extract numeric part from filename
22+
match = re.search(r"_(\d+)", filename)
23+
if match:
24+
return int(match.group(1))
25+
else:
26+
return float("inf") # Return infinity if no numeric part found
27+
28+
29+
def main():
30+
performance_rows = []
31+
preferences_rows = []
32+
history_rows = []
33+
columns = None
34+
input_files = sorted(
35+
[f for f in os.listdir(INPUT_DIR) if f.endswith(".json")], key=extract_number
36+
)
37+
df_providers = pd.DataFrame(
38+
columns=["Provider_Number", "Institution", "Professional_Role"]
39+
)
40+
41+
for filename in input_files:
42+
with open(os.path.join(INPUT_DIR, filename), "r") as file:
43+
data = json.load(file)
44+
performance_data = data["Performance_data"]
45+
if columns is None:
46+
columns = performance_data[0]
47+
for row in performance_data[1:]:
48+
performance_rows.append(row)
49+
50+
preferences_rows.append([performance_data[1][0], data["Preferences"]])
51+
52+
history_data = data["History"]
53+
54+
history_rows.extend(
55+
[
56+
[performance_data[1][0], key, value]
57+
for key, value in history_data.items()
58+
]
59+
)
60+
if data["institution_id"]:
61+
df_providers.loc[len(df_providers)] = [
62+
performance_data[1][0],
63+
data["institution_id"],
64+
"resident",
65+
]
66+
67+
performance_data_df = pd.DataFrame(performance_rows, columns=columns)
68+
performance_data_df["identifier"] = [
69+
str(uuid.uuid4()) for _ in range(len(performance_data_df))
70+
]
71+
72+
performance_data_df.rename(
73+
columns={
74+
"staff_number": "subject",
75+
"month": "period.start",
76+
"denominator": "measureScore.denominator",
77+
},
78+
inplace=True,
79+
)
80+
81+
performance_data_df["measureScore.rate"] = (
82+
performance_data_df["passed_count"]
83+
/ performance_data_df["measureScore.denominator"]
84+
)
85+
86+
performance_data_df["period.end"] = performance_data_df["period.start"]
87+
88+
performance_data_df["period.end"] = pd.to_datetime(
89+
performance_data_df["period.start"]
90+
)
91+
performance_data_df["period.end"] = performance_data_df[
92+
"period.end"
93+
] + pd.offsets.MonthEnd(0)
94+
performance_data_df["period.end"] = performance_data_df["period.end"].dt.strftime(
95+
"%Y-%m-%d"
96+
)
97+
performance_data_df["measureScore.range"] = None
98+
99+
performance_data_df = performance_data_df.merge(
100+
df_providers[["Provider_Number", "Institution", "Professional_Role"]],
101+
left_on="subject",
102+
right_on="Provider_Number",
103+
how="left",
104+
)
105+
comparator_df = performance_data_df[
106+
[
107+
"measure",
108+
"period.start",
109+
"period.end",
110+
"peer_average_comparator",
111+
"peer_75th_percentile_benchmark",
112+
"peer_90th_percentile_benchmark",
113+
"MPOG_goal",
114+
"Institution",
115+
"Professional_Role",
116+
]
117+
]
118+
119+
subject_data_df = performance_data_df[
120+
["subject", "Institution", "Professional_Role"]
121+
].drop_duplicates()
122+
subject_data_df["type"] = "Practitioner"
123+
124+
subject_data_df.rename(
125+
columns={
126+
"subject": "PractitionerRole.practitioner",
127+
"Institution": "PractitionerRole.organization",
128+
"Professional_Role": "PractitionerRole.code",
129+
},
130+
inplace=True,
131+
)
132+
133+
performance_data_df = performance_data_df[
134+
[
135+
"identifier",
136+
"measure",
137+
"subject",
138+
"period.start",
139+
"period.end",
140+
"measureScore.rate",
141+
"measureScore.denominator",
142+
"measureScore.range",
143+
]
144+
]
145+
146+
preferences_data_df = pd.DataFrame(
147+
preferences_rows, columns=["subject", "preferences.json"]
148+
)
149+
preferences_data_df = preferences_data_df[
150+
preferences_data_df["preferences.json"] != {}
151+
]
152+
153+
history_data_df = pd.DataFrame(
154+
history_rows, columns=["subject", "period.start", "history.json"]
155+
)
156+
history_data_df["period.end"] = history_data_df["period.start"]
157+
history_data_df["period.end"] = pd.to_datetime(history_data_df["period.start"])
158+
history_data_df["period.end"] = history_data_df["period.end"] + pd.offsets.MonthEnd(
159+
0
160+
)
161+
history_data_df["period.end"] = history_data_df["period.end"].dt.strftime(
162+
"%Y-%m-%d"
163+
)
164+
history_data_df = history_data_df[
165+
["subject", "period.start", "period.end", "history.json"]
166+
]
167+
168+
comparator_df = comparator_df.drop_duplicates()
169+
comparator_df = comparator_df.melt(
170+
id_vars=[
171+
"measure",
172+
"period.start",
173+
"period.end",
174+
"Institution",
175+
"Professional_Role",
176+
],
177+
value_vars=[
178+
"peer_average_comparator",
179+
"peer_75th_percentile_benchmark",
180+
"peer_90th_percentile_benchmark",
181+
"MPOG_goal",
182+
],
183+
var_name="group.code", # new column for the original column names
184+
value_name="measureScore.rate", # new column for the values
185+
)
186+
comparator_df.rename(
187+
columns={
188+
"Institution": "group.subject",
189+
"Professional_Role": "PractitionerRole.code",
190+
},
191+
inplace=True,
192+
)
193+
comparator_df["identifier"] = [str(uuid.uuid4()) for _ in range(len(comparator_df))]
194+
comparator_df = comparator_df[
195+
[
196+
"identifier",
197+
"measure",
198+
"period.start",
199+
"measureScore.rate",
200+
"period.end",
201+
"group.subject",
202+
"group.code",
203+
"PractitionerRole.code",
204+
]
205+
]
206+
207+
type_mapping = {
208+
"peer_average_comparator": "http://purl.obolibrary.org/obo/PSDO_0000126",
209+
"peer_75th_percentile_benchmark": "http://purl.obolibrary.org/obo/PSDO_0000128",
210+
"peer_90th_percentile_benchmark": "http://purl.obolibrary.org/obo/PSDO_0000129",
211+
"MPOG_goal": "http://purl.obolibrary.org/obo/PSDO_0000094",
212+
}
213+
214+
comparator_df["group.code"] = comparator_df["group.code"].replace(type_mapping)
215+
216+
for filename in input_files:
217+
with open(os.path.join(INPUT_DIR, filename), "r") as file:
218+
data = json.load(file)
219+
subject = data["Performance_data"][1][0]
220+
221+
data_with_new_format = {
222+
"message_instance_id": f"{str(uuid.uuid4())}",
223+
"performance_month": data["performance_month"],
224+
"subject": subject,
225+
"PractitionerRole": [
226+
[
227+
"PractitionerRole.practitioner",
228+
"PractitionerRole.organization",
229+
"PractitionerRole.code",
230+
"type",
231+
],
232+
[subject, data["institution_id"], "resident", "Practitioner"],
233+
],
234+
"performance_measurer_report": [performance_data_df.columns.tolist()]
235+
+ performance_data_df[
236+
performance_data_df["subject"] == subject
237+
].values.tolist(),
238+
"comparator_measurer_report": [comparator_df.columns.tolist()]
239+
+ comparator_df[
240+
comparator_df["group.subject"] == data["institution_id"]
241+
].values.tolist(),
242+
"History": data["History"],
243+
"Preferences": data["Preferences"],
244+
"debug": "no",
245+
}
246+
247+
file_name = f"Provider_{str(subject)}.json"
248+
file_path = output_dir / file_name
249+
250+
# Write JSON file
251+
with open(file_path, "w") as f:
252+
json.dump(data_with_new_format, f, indent=2)
253+
254+
255+
if __name__ == "__main__":
256+
main()

0 commit comments

Comments
 (0)