Skip to content

Commit aeaebf1

Browse files
authored
464 update data handling (#465)
* moved prepare call inside pipeline. cleaned up attachment of performance month to performance data frame. moved responsibility of cleaning up performance data for a specific provider to the prepare sending as input all performance data in a df with staff number and have it clean up the data frame as first step. * simplified context update, get preferences and get history and moved history and preferences load to startup. * context changed to have a create method which would create and store preferences and history from request or loaded files in context dictionaries. Get preferences and history methods are updated accordingly and context.create is called in api and batch modes.
1 parent 921cd99 commit aeaebf1

File tree

11 files changed

+131
-228
lines changed

11 files changed

+131
-228
lines changed

bulk-up/src/bulk_up/log_to_data.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ def add_signal_properties(row, output_message, input_message):
110110
columns=input_message["Performance_data"][0],
111111
)
112112

113-
performance_df = prepare(performance_month, performance_df)
113+
performance_df, performance_month = prepare(
114+
performance_df, performance_df.at[0, "staff_number"], performance_month
115+
)
114116
performance_df = performance_df[
115117
performance_df["measure"] == output_message["selected_candidate"]["measure"]
116118
].tail(12)

bulk-up/src/bulk_up/prepare_csv_inputs.py

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -25,43 +25,17 @@ def extract_number(filename):
2525
writer.writerows(input_data["Performance_data"][1:])
2626

2727

28-
fieldnames = [
29-
"staff_number",
30-
"Social gain",
31-
"Social stayed better",
32-
"Worsening",
33-
"Improving",
34-
"Social loss",
35-
"Social stayed worse",
36-
"Social better",
37-
"Social worse",
38-
"Social approach",
39-
"Goal gain",
40-
"Goal approach",
41-
"Display_Format",
42-
]
4328
with open("preferences.csv", "w", newline="") as file:
44-
writer = csv.DictWriter(file, fieldnames=fieldnames)
29+
writer = csv.DictWriter(file, fieldnames=["staff_number", "preferences"])
4530
writer.writeheader()
4631
for index, input_file in enumerate(input_files):
4732
input_data = orjson.loads(input_file.read_bytes())
48-
if input_data["Preferences"].get("Utilities", {}).get("Message_Format", {}):
49-
preferences = {"staff_number": input_data["Performance_data"][1][0]}
50-
preferences.update(
51-
input_data["Preferences"].get("Utilities", {}).get("Message_Format", {})
52-
)
53-
preferences["Display_Format"] = next(
54-
(
55-
k
56-
for k, v in input_data["Preferences"]
57-
.get("Utilities", {})
58-
.get("Display_Format", {})
59-
.items()
60-
if v == 1
61-
),
62-
None,
63-
)
64-
writer.writerows([preferences])
33+
preferences = {
34+
"staff_number": input_data["Performance_data"][1][0],
35+
"preferences": orjson.dumps(input_data["Preferences"]).decode(),
36+
}
37+
38+
writer.writerows([preferences])
6539

6640
all_keys = set(["staff_number", "month", "history"])
6741
for input_file in input_files:
@@ -75,6 +49,6 @@ def extract_number(filename):
7549
history = {
7650
"staff_number": input_data["Performance_data"][1][0],
7751
"month": key,
78-
"history": value,
52+
"history": orjson.dumps(value).decode(),
7953
}
8054
writer.writerows([history])

scaffold/api.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from fastapi.responses import RedirectResponse
44

55
from scaffold import context
6-
from scaffold.bitstomach.bitstomach import prepare
76
from scaffold.pipeline import pipeline
87
from scaffold.startup import startup
98
from scaffold.utils.settings import settings
@@ -34,17 +33,16 @@ async def template():
3433
@app.post("/createprecisionfeedback/")
3534
async def createprecisionfeedback(info: Request):
3635
req_info = await info.json()
37-
context.update(req_info)
3836

3937
performance_month = get_performance_month(req_info)
40-
performance_df = prepare(
41-
performance_month,
42-
pd.DataFrame(
43-
req_info["Performance_data"][1:], columns=req_info["Performance_data"][0]
44-
),
38+
performance_df = pd.DataFrame(
39+
req_info["Performance_data"][1:], columns=req_info["Performance_data"][0]
4540
)
41+
context.create(req_info, performance_df.at[0, "staff_number"])
4642
try:
47-
full_message = pipeline(performance_df)
43+
full_message = pipeline(
44+
performance_df, performance_df.at[0, "staff_number"], performance_month
45+
)
4846
full_message["message_instance_id"] = req_info["message_instance_id"]
4947
full_message["performance_data"] = req_info["Performance_data"]
5048
except HTTPException as e:

scaffold/bitstomach/bitstomach.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import pandas as pd
2-
from rdflib import RDF, BNode, Graph, Literal
2+
from rdflib import RDF, BNode, Graph
33

44
from scaffold.bitstomach.signals import SIGNALS
55
from scaffold.utils.namespace import PSDO, SLOWMO
@@ -14,7 +14,6 @@ def extract_signals(perf_df: pd.DataFrame) -> Graph:
1414
g = Graph()
1515
r = g.resource(BNode("performance_content"))
1616
r.set(RDF.type, PSDO.performance_content)
17-
r.set(SLOWMO.PerformanceMonth, Literal(perf_df.attrs["performance_month"]))
1817
if perf_df.empty:
1918
return g
2019

@@ -32,19 +31,18 @@ def extract_signals(perf_df: pd.DataFrame) -> Graph:
3231
return g
3332

3433

35-
def prepare(performance_month, performance_df):
36-
# we would have multiple staff performance data at this point so this like won't work right
34+
def prepare(performance_df, staff_number, performance_month):
35+
performance_df = performance_df[
36+
performance_df["staff_number"] == staff_number
37+
].reset_index(drop=True)
38+
39+
if not performance_month:
40+
performance_month = performance_df["month"].max()
3741
performance_df.attrs["staff_number"] = int(performance_df.at[0, "staff_number"])
3842

3943
performance_df["goal_comparator_content"] = performance_df["MPOG_goal"]
4044

41-
performance_df.attrs["performance_month"] = (
42-
performance_month if performance_month else performance_df["month"].max()
43-
)
44-
45-
performance_df = performance_df[
46-
performance_df["month"] <= performance_df.attrs["performance_month"]
47-
]
45+
performance_df = performance_df[performance_df["month"] <= performance_month]
4846

4947
performance_df["valid"] = performance_df["denominator"] >= 10
5048

@@ -55,10 +53,7 @@ def prepare(performance_month, performance_df):
5553
performance_df.attrs["measures"] = performance_df["measure"].unique()
5654

5755
performance_df.attrs["valid_measures"] = performance_df[
58-
(
59-
(performance_df["month"] == performance_df.attrs["performance_month"])
60-
& performance_df["valid"]
61-
)
56+
((performance_df["month"] == performance_month) & performance_df["valid"])
6257
]["measure"]
6358

64-
return performance_df
59+
return performance_df, performance_month

scaffold/cli.py

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from loguru import logger
1111

1212
from scaffold import context
13-
from scaffold.bitstomach.bitstomach import prepare
1413
from scaffold.pipeline import pipeline
1514
from scaffold.startup import startup
1615
from scaffold.utils.utils import (
@@ -61,18 +60,18 @@ def batch(
6160
try:
6261
input_data = orjson.loads(input_file.read_bytes())
6362

64-
context.update(input_data)
65-
6663
performance_month = get_performance_month(input_data)
67-
performance_df = prepare(
68-
performance_month,
69-
pd.DataFrame(
70-
input_data["Performance_data"][1:],
71-
columns=input_data["Performance_data"][0],
72-
),
64+
performance_df = pd.DataFrame(
65+
input_data["Performance_data"][1:],
66+
columns=input_data["Performance_data"][0],
7367
)
68+
context.create(input_data, performance_df.at[0, "staff_number"])
7469
try:
75-
full_message = pipeline(performance_df)
70+
full_message = pipeline(
71+
performance_df,
72+
performance_df.at[0, "staff_number"],
73+
performance_month,
74+
)
7675
full_message["message_instance_id"] = input_data["message_instance_id"]
7776
full_message["performance_data"] = input_data["Performance_data"]
7877
except HTTPException as e:
@@ -134,7 +133,6 @@ def batch_csv(
134133
] = False,
135134
):
136135
startup()
137-
context.init()
138136

139137
performance_data = pd.read_csv(performance_data_path, parse_dates=["month"])
140138
success_count = 0
@@ -143,13 +141,8 @@ def batch_csv(
143141
performance_data["staff_number"].drop_duplicates().head(max_files)
144142
):
145143
try:
146-
performance_df = prepare(
147-
performance_month,
148-
performance_data[
149-
performance_data["staff_number"] == provider_id
150-
].reset_index(drop=True),
151-
)
152-
result = pipeline(performance_df)
144+
context.create({}, provider_id)
145+
result = pipeline(performance_data, provider_id, performance_month)
153146
if not stats_only:
154147
directory = performance_data_path.parent / "messages"
155148
os.makedirs(directory, exist_ok=True)

scaffold/context.py

Lines changed: 32 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -1,141 +1,52 @@
1-
import json
1+
from scaffold import startup
22

3-
import numpy as np
4-
import pandas as pd
3+
preferences_dict = {}
4+
history_dict = {}
55

6-
from scaffold import startup
7-
from scaffold.utils.settings import settings
86

9-
preferences: pd.DataFrame = pd.DataFrame()
10-
history: pd.DataFrame = pd.DataFrame()
7+
def create(req_info, staff_number):
8+
global preferences_dict, history_dict
119

10+
history_dict = {}
11+
preferences_dict = {}
1212

13-
def init():
1413
try:
15-
global preferences, history
16-
17-
if settings.preferences is not None:
18-
preferences = pd.read_csv(settings.preferences)
19-
if settings.history is not None:
20-
history = pd.read_csv(settings.history, converters={"history": json.loads})
14+
if req_info.get("Preferences", {}):
15+
preferences_dict = set_preferences(req_info.get("Preferences", {}))
16+
else:
17+
p = startup.preferences.loc[staff_number, "preferences"]
18+
preferences_dict = set_preferences(p)
19+
except Exception:
20+
return set_preferences({})
2121

22-
except Exception as e:
23-
print("context init aborted, see traceback:")
24-
raise e
22+
try:
23+
if req_info.get("History", {}):
24+
history_dict = req_info.get("History", {})
25+
else:
26+
staff_data = startup.history[
27+
startup.history["staff_number"] == staff_number
28+
]
29+
history_dict = staff_data.set_index("month")["history"].to_dict()
30+
except Exception:
31+
pass
2532

2633

27-
def update(req_info):
28-
try:
29-
global preferences, history
30-
staff_number = req_info["Performance_data"][1][0]
34+
def get_preferences():
35+
global preferences_dict
36+
return preferences_dict
3137

32-
preferences_dict = req_info.get("Preferences", {}).get("Utilities", {})
33-
if preferences_dict:
34-
new_row = {"staff_number": staff_number} | preferences_dict[
35-
"Message_Format"
36-
]
37-
display_format = next(
38-
(
39-
k
40-
for k, v in preferences_dict.get("Display_Format", {}).items()
41-
if v == 1
42-
),
43-
"None",
44-
)
45-
new_row["Display_Format"] = display_format
46-
47-
if preferences.empty:
48-
preferences = pd.DataFrame([new_row])
49-
elif not (preferences["staff_number"] == staff_number).any():
50-
preferences = pd.concat(
51-
[preferences, pd.DataFrame([new_row])], ignore_index=True
52-
)
53-
54-
history_dict: dict = req_info.get("History", {})
55-
56-
if history.empty:
57-
history = pd.DataFrame(columns=["staff_number", "month", "history"])
58-
59-
for key, value in history_dict.items():
60-
if (
61-
history.empty
62-
or history[
63-
(history["staff_number"] == staff_number)
64-
& (history["month"] == key)
65-
].empty
66-
):
67-
new_row = pd.DataFrame(
68-
[{"staff_number": staff_number, "month": key, "history": value}]
69-
)
70-
history = pd.concat([history, pd.DataFrame(new_row)], ignore_index=True)
71-
except Exception as e:
72-
print("context update aborted, see traceback:")
73-
raise e
74-
75-
76-
def get_preferences(staff_number):
77-
global preferences
78-
if preferences.empty:
79-
return set_preferences({})
8038

81-
preferences_row = preferences[preferences["staff_number"] == staff_number]
82-
provider_preferences = {}
83-
if not preferences_row.empty:
84-
provider_preferences = {
85-
"Preferences": {"Utilities": {"Message_Format": {}, "Display_Format": {}}}
86-
}
87-
88-
# We'll just use the first row of the CSV
89-
row = preferences_row.iloc[0]
90-
91-
for key in preferences_row.columns:
92-
value = row[key]
93-
if key == "staff_number":
94-
continue # skip or store if you need it
95-
elif key == "Display_Format":
96-
# Example: "Bar chart, Line chart"
97-
provider_preferences["Preferences"]["Utilities"]["Display_Format"] = {
98-
"Bar chart": 0,
99-
"Line chart": 0,
100-
"Text-only": 0,
101-
"System-generated": "0",
102-
}
103-
provider_preferences["Preferences"]["Utilities"]["Display_Format"][
104-
value
105-
] = 1
106-
else:
107-
if isinstance(value, (np.float64, np.int64)):
108-
value = value.item()
109-
provider_preferences["Preferences"]["Utilities"]["Message_Format"][
110-
key
111-
] = value
112-
return set_preferences(provider_preferences)
113-
114-
115-
def get_history(staff_number):
116-
global history
117-
118-
if history.empty:
119-
return {}
120-
history_rows = history[history["staff_number"] == staff_number]
121-
if history_rows.empty:
122-
return {}
123-
124-
provider_history = {}
125-
for index, row in history_rows.iterrows():
126-
history_item = row["history"] # ast.literal_eval(row["history"])
127-
month = row["month"]
128-
provider_history[month] = history_item
129-
130-
return provider_history
39+
def get_history():
40+
global history_dict
41+
return history_dict
13142

13243

13344
def set_preferences(req_info):
134-
preferences_utilities = req_info.get("Preferences", {}).get("Utilities", {})
45+
preferences_utilities = req_info.get("Utilities", {})
13546
input_preferences: dict = preferences_utilities.get("Message_Format", {})
13647
individual_preferences: dict = {}
13748
for key in input_preferences:
138-
individual_preferences[key] = float(input_preferences[key])
49+
individual_preferences[key.lower()] = float(input_preferences[key])
13950

14051
preferences: dict = startup.default_preferences.copy()
14152
preferences.update(individual_preferences)

0 commit comments

Comments
 (0)