Skip to content

Commit a64466d

Browse files
authored
457 add csv input processing to scaffold (#458)
* initial csv processing command added * script to generate performance_data, history and preferences in csv format using a bunch of json input files added to bulkup * history and preferences are programmed to be loaded from csv input for batch_csv command.
1 parent 88acb81 commit a64466d

File tree

6 files changed

+266
-12
lines changed

6 files changed

+266
-12
lines changed

README.md

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,27 @@ curl --data "@tests/test_cases/input_message.json" http://localhost:8000/createp
9393
```
9494

9595
Run SCAFFOLD CLI
96-
First install the python app. Then use the following command to run the pipeline on one input file
96+
First install the python app. Then use the following command to run the pipeline on one json input file
9797

9898
```zsh
99-
ENV_PATH=/user/.../dev.env pipeline single '/path/to/input/file.json'
99+
ENV_PATH=/user/.../dev.env pipeline batch '/path/to/input/file.json'
100100
```
101101

102-
or use the following command to run the pipeline api
102+
Use the following command to run the pipeline on some or all json input files in a folder
103+
104+
```zsh
105+
ENV_PATH=/user/.../dev.env pipeline batch '/path/to/input/folder/' --max-files 500
106+
```
107+
Use --max-files if you need to limit the number of files to process.
108+
109+
Use the following command to run the pipeline passing preformance_data, history and preferences as separate CSV files
110+
111+
```zsh
112+
ENV_PATH=/user/.../dev.env pipeline batch_csv '/path/to/performance/data/file.csv' '/path/to/preferences/file.csv' '/path/to/history/file.csv' --performance-month {performance month i.e. 2024-05-01} --max-files 500
113+
```
114+
Use --performance-month to set the performance month for batch_csv command.
115+
116+
Use the following command to run the pipeline api
103117

104118
```zsh
105119
ENV_PATH=/user/.../dev.env pipeline web
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import csv
2+
import re
3+
from pathlib import Path
4+
5+
import orjson
6+
7+
8+
def extract_number(filename):
9+
match = re.search(r"_(\d+)", str(filename))
10+
if match:
11+
return int(match.group(1))
12+
else:
13+
return float("inf") # Return infinity if no numeric part found
14+
15+
16+
file_path = Path("/home/faridsei/dev/test/2024-06-24/2024-05-01/")
17+
input_files = sorted(file_path.glob("*.json"), key=extract_number)
18+
with open("performance_data.csv", "w", newline="") as file:
19+
writer = csv.writer(file)
20+
for index, input_file in enumerate(input_files):
21+
input_data = orjson.loads(input_file.read_bytes())
22+
if index == 0:
23+
writer.writerows(input_data["Performance_data"])
24+
else:
25+
writer.writerows(input_data["Performance_data"][1:])
26+
27+
28+
fieldnames = [
29+
"staff_number",
30+
"Social gain",
31+
"Social stayed better",
32+
"Worsening",
33+
"Improving",
34+
"Social loss",
35+
"Social stayed worse",
36+
"Social better",
37+
"Social worse",
38+
"Social approach",
39+
"Goal gain",
40+
"Goal approach",
41+
"Display_Format",
42+
]
43+
with open("preferences.csv", "w", newline="") as file:
44+
writer = csv.DictWriter(file, fieldnames=fieldnames)
45+
writer.writeheader()
46+
for index, input_file in enumerate(input_files):
47+
input_data = orjson.loads(input_file.read_bytes())
48+
if input_data["Preferences"].get("Utilities", {}).get("Message_Format", {}):
49+
preferences = {"staff_number": input_data["Performance_data"][1][0]}
50+
preferences.update(
51+
input_data["Preferences"].get("Utilities", {}).get("Message_Format", {})
52+
)
53+
preferences["Display_Format"] = next(
54+
(
55+
k
56+
for k, v in input_data["Preferences"]
57+
.get("Utilities", {})
58+
.get("Display_Format", {})
59+
.items()
60+
if v == 1
61+
),
62+
None,
63+
)
64+
writer.writerows([preferences])
65+
66+
all_keys = set(["staff_number"])
67+
for input_file in input_files:
68+
input_data = orjson.loads(input_file.read_bytes())
69+
all_keys.update(input_data["History"].keys())
70+
with open("history.csv", "w", newline="") as file:
71+
writer = csv.DictWriter(file, fieldnames=all_keys)
72+
writer.writeheader()
73+
for index, input_file in enumerate(input_files):
74+
input_data = orjson.loads(input_file.read_bytes())
75+
if input_data["History"]:
76+
history = {"staff_number": input_data["Performance_data"][1][0]}
77+
history.update(input_data["History"])
78+
writer.writerows([history])

scaffold/bitstomach/bitstomach.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,22 @@ def extract_signals(perf_df: pd.DataFrame) -> Graph:
3434

3535

3636
def prepare(req_info):
37+
performance_month = req_info["performance_month"]
3738
if settings.settings.performance_month:
38-
req_info["performance_month"] = settings.settings.performance_month
39+
performance_month = settings.settings.performance_month
40+
41+
3942
performance_data = req_info["Performance_data"]
4043
performance_df = pd.DataFrame(performance_data[1:], columns=performance_data[0])
4144

45+
return prepare_performance_df(performance_month, performance_df)
46+
47+
def prepare_performance_df(performance_month, performance_df):
4248
performance_df.attrs["staff_number"] = int(performance_df.at[0, "staff_number"])
4349

4450
performance_df["goal_comparator_content"] = performance_df["MPOG_goal"]
4551

46-
performance_df.attrs["performance_month"] = req_info.get(
47-
"performance_month", performance_df["month"].max()
48-
)
52+
performance_df.attrs["performance_month"] = performance_month if performance_month else performance_df["month"].max()
4953

5054
performance_df = performance_df[
5155
performance_df["month"] <= performance_df.attrs["performance_month"]

scaffold/cli.py

Lines changed: 108 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
11
import os
22
import pathlib
3+
import subprocess
34
from typing import Annotated
45

56
import orjson
7+
import pandas as pd
68
import typer
7-
import uvicorn
89
from loguru import logger
910

10-
from scaffold.pipeline import run_pipeline
11-
from scaffold.startup import startup
11+
from scaffold.bitstomach.bitstomach import prepare_performance_df
12+
from scaffold.pipeline import pipeline, run_pipeline
13+
from scaffold.startup import set_preferences, startup
1214
from scaffold.utils.utils import (
1315
add_candidates,
1416
add_response,
1517
analyse_candidates,
1618
analyse_responses,
1719
extract_number,
20+
get_history,
21+
get_preferences,
1822
)
1923

2024
cli = typer.Typer(no_args_is_help=True)
@@ -96,8 +100,107 @@ def batch(
96100

97101

98102
@cli.command()
99-
def web():
100-
uvicorn.run("scaffold.api:app", reload=False, use_colors=True)
103+
def batch_csv(
104+
performance_data_path: Annotated[
105+
pathlib.Path,
106+
typer.Argument(help="Path to a CSV file containing performance data"),
107+
],
108+
preferences_path: Annotated[
109+
pathlib.Path,
110+
typer.Argument(help="Path to a CSV file containing the preferences"),
111+
],
112+
history_path: Annotated[
113+
pathlib.Path,
114+
typer.Argument(help="Path to a CSV file containing the history"),
115+
],
116+
max_files: Annotated[
117+
int, typer.Option("--max-files", help="Maximum number of files to process")
118+
] = None,
119+
performance_month: Annotated[
120+
str, typer.Option("--performance-month", help="Performance month")
121+
] = None,
122+
stats_only: Annotated[
123+
bool,
124+
typer.Option(
125+
"--stats-only",
126+
help="Only simulate processing; count successes and failures and additional stats",
127+
),
128+
] = False,
129+
):
130+
startup()
131+
132+
all_performance_data = pd.read_csv(performance_data_path, parse_dates=["month"])
133+
all_preferences = pd.read_csv(preferences_path)
134+
all_hostory = pd.read_csv(history_path)
135+
136+
if max_files is not None:
137+
first_n_staff = (
138+
all_performance_data["staff_number"].drop_duplicates().head(max_files)
139+
)
140+
performance_data = all_performance_data[
141+
all_performance_data["staff_number"].isin(first_n_staff)
142+
].reset_index(drop=True)
143+
# performance_data = all_performance_data[all_performance_data['staff_number'].isin(set(range(1, max_files + 1)))].reset_index(drop=True)
144+
success_count = 0
145+
failure_count = 0
146+
for provider_id in performance_data["staff_number"].unique().tolist():
147+
try:
148+
preferences = set_preferences(
149+
get_preferences(
150+
all_preferences[all_preferences["staff_number"] == provider_id]
151+
)
152+
)
153+
history = get_history(
154+
all_hostory[all_hostory["staff_number"] == provider_id]
155+
)
156+
157+
performance_df = prepare_performance_df(
158+
performance_month,
159+
performance_data[
160+
performance_data["staff_number"] == provider_id
161+
].reset_index(drop=True),
162+
)
163+
result = pipeline(preferences, history, performance_df)
164+
if not stats_only:
165+
directory = performance_data_path.parent / "messages"
166+
os.makedirs(directory, exist_ok=True)
167+
168+
performance_month = performance_month
169+
new_filename = (
170+
f"Provider_{provider_id} - message for {performance_month}.json"
171+
)
172+
output_path = directory / new_filename
173+
174+
output_path.write_bytes(
175+
orjson.dumps(result, option=orjson.OPT_INDENT_2)
176+
)
177+
logger.info(f"Message created at {output_path}")
178+
else:
179+
logger.info(f"✔ Would process: Provider_{provider_id}")
180+
181+
success_count += 1
182+
183+
except Exception as e:
184+
logger.error(f"✘ Failed to process Provider_{provider_id}: {e}")
185+
failure_count += 1
186+
result = e.detail
187+
188+
add_response(result)
189+
if not stats_only:
190+
add_candidates(result, performance_month)
191+
192+
logger.info(f"Successful: {success_count}, Failed: {failure_count}")
193+
analyse_responses()
194+
if not stats_only:
195+
analyse_candidates(performance_data_path.parent / "messages" / "candidates.csv")
196+
197+
198+
@cli.command()
199+
def web(workers: int = 5):
200+
# uvicorn.run(["scaffold.api:app","--workers", str(workers)], reload=False, use_colors=True)
201+
subprocess.run(
202+
["uvicorn", "scaffold.api:app", "--workers", str(workers), "--use-colors"]
203+
)
101204

102205

103206
if __name__ == "__main__":

scaffold/utils/graph_operations.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
from loguru import logger
66
from rdflib import Graph
77

8+
from scaffold.utils.utils import set_logger
9+
10+
set_logger()
811

912
def manifest_to_graph(manifest_path: str) -> Graph:
1013
g: Graph = Graph()

scaffold/utils/utils.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import ast
12
import re
23
import sys
34

5+
import numpy as np
46
import pandas as pd
57
from loguru import logger
68

@@ -142,3 +144,53 @@ def set_logger():
142144
logger.at_least = (
143145
lambda lvl: logger.level(lvl).no >= logger.level(settings.log_level).no
144146
)
147+
148+
149+
def get_preferences(preferences_row):
150+
if preferences_row.empty:
151+
return {}
152+
153+
preferences = {"Utilities": {"Message_Format": {}, "Display_Format": {}}}
154+
155+
# We'll just use the first row of the CSV
156+
row = preferences_row.iloc[0]
157+
158+
for key in preferences_row.columns:
159+
value = row[key]
160+
if key == "staff_number":
161+
continue # skip or store if you need it
162+
elif key == "Display_Format":
163+
# Example: "Bar chart, Line chart"
164+
preferences["Utilities"]["Display_Format"] = {
165+
"Bar chart": 0,
166+
"Line chart": 0,
167+
"Text-only": 0,
168+
"System-generated": "0",
169+
}
170+
preferences["Utilities"]["Display_Format"][value] = 1
171+
else:
172+
if isinstance(value, (np.float64, np.int64)):
173+
value = value.item()
174+
preferences["Utilities"]["Message_Format"][key] = value
175+
176+
return preferences
177+
178+
179+
def get_history(history_row):
180+
if history_row.empty:
181+
return {}
182+
183+
history = {}
184+
row = history_row.iloc[0]
185+
for col in history_row.columns:
186+
if col == "staff_number":
187+
continue
188+
value = row[col]
189+
if pd.notna(value):
190+
try:
191+
value = ast.literal_eval(value)
192+
except Exception:
193+
continue # or handle invalid format
194+
history[col] = value
195+
196+
return history

0 commit comments

Comments
 (0)