Skip to content

Commit 88acb81

Browse files
authored
455 add a new command to the cli for batch processing 2 (#456)
* Adding batch command with detailed stats. removing single command * factored out individual data structure preparation from pipeline.
1 parent 9b6de29 commit 88acb81

File tree

8 files changed

+208
-71
lines changed

8 files changed

+208
-71
lines changed

.python-version

Lines changed: 0 additions & 1 deletion
This file was deleted.

scaffold/api.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from fastapi import FastAPI, Request
22
from fastapi.responses import RedirectResponse
33

4-
from scaffold.pipeline import pipeline
4+
from scaffold.pipeline import run_pipeline
55
from scaffold.startup import startup
66
from scaffold.utils.settings import settings
77

@@ -30,5 +30,6 @@ async def template():
3030
@app.post("/createprecisionfeedback/")
3131
async def createprecisionfeedback(info: Request):
3232
req_info = await info.json()
33+
full_message = run_pipeline(req_info)
3334

34-
return pipeline(req_info)
35+
return full_message

scaffold/bitstomach/bitstomach.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from rdflib import RDF, BNode, Graph, Literal
33

44
from scaffold.bitstomach.signals import SIGNALS
5+
from scaffold.utils import settings
56
from scaffold.utils.namespace import PSDO, SLOWMO
67

78

@@ -33,6 +34,8 @@ def extract_signals(perf_df: pd.DataFrame) -> Graph:
3334

3435

3536
def prepare(req_info):
37+
if settings.settings.performance_month:
38+
req_info["performance_month"] = settings.settings.performance_month
3639
performance_data = req_info["Performance_data"]
3740
performance_df = pd.DataFrame(performance_data[1:], columns=performance_data[0])
3841

scaffold/cli.py

Lines changed: 24 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,35 +7,19 @@
77
import uvicorn
88
from loguru import logger
99

10-
from scaffold.pipeline import pipeline
10+
from scaffold.pipeline import run_pipeline
1111
from scaffold.startup import startup
12+
from scaffold.utils.utils import (
13+
add_candidates,
14+
add_response,
15+
analyse_candidates,
16+
analyse_responses,
17+
extract_number,
18+
)
1219

1320
cli = typer.Typer(no_args_is_help=True)
1421

1522

16-
@cli.command()
17-
def single(
18-
file_path: Annotated[
19-
pathlib.Path, typer.Argument(help="Path to input data in JSON format")
20-
],
21-
) -> None:
22-
startup()
23-
24-
input = orjson.loads(file_path.read_bytes())
25-
result = pipeline(input)
26-
27-
directory = file_path.parent / "messages"
28-
os.makedirs(directory, exist_ok=True)
29-
30-
new_filename = f"{file_path.stem} - message for {input['performance_month']}.json"
31-
32-
output_path = directory / new_filename
33-
34-
output_path.write_bytes(orjson.dumps(result, option=orjson.OPT_INDENT_2))
35-
36-
logger.info(f"Message created at {output_path}")
37-
38-
3923
@cli.command()
4024
def batch(
4125
file_path: Annotated[
@@ -45,11 +29,11 @@ def batch(
4529
max_files: Annotated[
4630
int, typer.Option("--max-files", help="Maximum number of files to process")
4731
] = None,
48-
count_only: Annotated[
32+
stats_only: Annotated[
4933
bool,
5034
typer.Option(
51-
"--count-only",
52-
help="Only simulate processing; count successes and failures",
35+
"--stats-only",
36+
help="Only simulate processing; count successes and failures and additional stats",
5337
),
5438
] = False,
5539
) -> None:
@@ -58,7 +42,7 @@ def batch(
5842
if file_path.is_file() and file_path.suffix == ".json":
5943
input_files = [file_path]
6044
elif file_path.is_dir():
61-
input_files = sorted(file_path.glob("*.json"))
45+
input_files = sorted(file_path.glob("*.json"), key=extract_number)
6246
else:
6347
logger.error(
6448
f"Invalid input: {file_path} is neither a .json file nor a directory containing .json files."
@@ -74,9 +58,10 @@ def batch(
7458
for input_file in input_files:
7559
try:
7660
input_data = orjson.loads(input_file.read_bytes())
77-
result = pipeline(input_data)
7861

79-
if not count_only:
62+
response_data = run_pipeline(input_data)
63+
64+
if not stats_only:
8065
directory = input_file.parent / "messages"
8166
os.makedirs(directory, exist_ok=True)
8267

@@ -87,7 +72,7 @@ def batch(
8772
output_path = directory / new_filename
8873

8974
output_path.write_bytes(
90-
orjson.dumps(result, option=orjson.OPT_INDENT_2)
75+
orjson.dumps(response_data, option=orjson.OPT_INDENT_2)
9176
)
9277
logger.info(f"Message created at {output_path}")
9378
else:
@@ -97,9 +82,17 @@ def batch(
9782
except Exception as e:
9883
logger.error(f"✘ Failed to process {input_file}: {e}")
9984
failure_count += 1
85+
response_data = e.detail
86+
87+
add_response(response_data)
88+
if not stats_only:
89+
add_candidates(response_data, input_data["performance_month"])
10090

10191
logger.info(f"Total files scanned: {len(input_files)}")
10292
logger.info(f"Successful: {success_count}, Failed: {failure_count}")
93+
analyse_responses()
94+
if not stats_only:
95+
analyse_candidates(file_path / "messages" / "candidates.csv")
10396

10497

10598
@cli.command()

scaffold/pictoralist/pictoralist.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,12 @@ class Pictoralist:
1818
def __init__(
1919
self,
2020
performance_dataframe,
21-
serialized_perf_df,
2221
selected_candidate,
2322
settings,
24-
message_instance_id,
2523
):
2624
## Setup variables to process selected message
2725
# Needs cleanup to stop redundant var declaration (those passed directly to prepare_selected_message)
2826
self.performance_data = performance_dataframe # Dataframe of recipient perf data (performance_data_df)
29-
self.performance_block = str(
30-
serialized_perf_df
31-
) # Pull un-altered performance (serialized JSON) data to append output messsage with
3227

3328
# Need refactor
3429
self.selected_measure = str(
@@ -57,7 +52,6 @@ def __init__(
5752
self.acceptable_by.append(
5853
pathway
5954
) # Add string value of rdflib literal to list
60-
self.message_instance_id = message_instance_id
6155
self.base64_image = [] # Initialize as empty key to later fill image into
6256
self.staff_ID = int(
6357
performance_dataframe["staff_number"].iloc[0]
@@ -168,8 +162,10 @@ def fill_missing_months(self):
168162
) # reset col name from index to month
169163

170164
# Forward fill 'measure' and percent-scale version of 'MPOG_goal' columns with the previous valid values
171-
self.performance_data["measure"].fillna(method="ffill", inplace=True)
172-
self.performance_data["goal_percent"].fillna(method="ffill", inplace=True)
165+
self.performance_data["measure"] = self.performance_data["measure"].ffill()
166+
self.performance_data["goal_percent"] = self.performance_data[
167+
"goal_percent"
168+
].ffill()
173169

174170
# Debugging statement
175171
# logger.debug(f"After gap fill, dataframe is:")
@@ -505,11 +501,8 @@ def prepare_selected_message(self):
505501
"performance_month": self.performance_data["month"]
506502
.iloc[-1]
507503
.strftime("%B %Y"), # Becomes string in response, format here
508-
"performance_data": self.performance_block,
509504
"message_generated_datetime": self.init_time,
510505
"message": message,
511506
}
512-
if self.message_instance_id is not None:
513-
full_message["message_instance_id"] = self.message_instance_id
514507

515508
return full_message

scaffold/pipeline.py

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22

3+
import pandas as pd
34
import psutil
45
from fastapi import HTTPException
56
from loguru import logger
@@ -12,36 +13,32 @@
1213
from scaffold.pictoralist.pictoralist import Pictoralist
1314
from scaffold.utils.namespace import PSDO, SLOWMO
1415
from scaffold.utils.settings import settings
16+
from scaffold.utils.utils import set_logger
1517

18+
set_logger()
1619

17-
def pipeline(req_info):
18-
if settings.performance_month:
19-
req_info["performance_month"] = settings.performance_month
20-
21-
preferences = startup.set_preferences(req_info)
2220

21+
def pipeline(preferences: dict, history: dict, performance_df: pd.DataFrame):
2322
cool_new_super_graph = Graph()
2423
cool_new_super_graph += startup.base_graph
2524

2625
# BitStomach
2726
logger.debug("Calling BitStomach from main...")
2827

29-
performance_data_df = bitstomach.prepare(req_info)
30-
# TODO: find a place for measures to live...mabe move these two line into prepare or make a measurees class
28+
# TODO: find a place for measures to live...may be move these two line into prepare or make a measurees class
3129
measures = set(cool_new_super_graph[: RDF.type : PSDO.performance_measure_content])
3230

33-
performance_data_df.attrs["valid_measures"] = [
34-
m for m in performance_data_df.attrs["valid_measures"] if BNode(m) in measures
31+
performance_df.attrs["valid_measures"] = [
32+
m for m in performance_df.attrs["valid_measures"] if BNode(m) in measures
3533
]
36-
g: Graph = bitstomach.extract_signals(performance_data_df)
34+
g: Graph = bitstomach.extract_signals(performance_df)
3735

3836
performance_content = g.resource(BNode("performance_content"))
3937
if len(list(performance_content[PSDO.motivating_information])) == 0:
4038
cool_new_super_graph.close()
4139
detail = {
4240
"message": "Insufficient significant data found for providing feedback, process aborted.",
43-
"message_instance_id": req_info["message_instance_id"],
44-
"staff_number": performance_data_df.attrs["staff_number"],
41+
"staff_number": performance_df.attrs["staff_number"],
4542
}
4643
raise HTTPException(
4744
status_code=400,
@@ -57,11 +54,11 @@ def pipeline(req_info):
5754

5855
# #Esteemer
5956
logger.debug("Calling Esteemer from main...")
60-
history: dict = req_info.get("History", {})
57+
6158
history = {
6259
key: value
6360
for key, value in history.items()
64-
if key < performance_data_df.attrs["performance_month"]
61+
if key < performance_df.attrs["performance_month"]
6562
}
6663

6764
measures: set[BNode] = set(
@@ -90,11 +87,9 @@ def pipeline(req_info):
9087
if selected_message["message_text"] != "No message selected":
9188
## Initialize and run message and display generation:
9289
pc = Pictoralist(
93-
performance_data_df,
94-
req_info["Performance_data"],
90+
performance_df,
9591
selected_message,
9692
settings,
97-
req_info["message_instance_id"],
9893
)
9994
pc.prep_data_for_graphing() # Setup dataframe of one measure, cleaned for graphing
10095
pc.fill_missing_months() # Fill holes in dataframe where they exist
@@ -120,11 +115,25 @@ def pipeline(req_info):
120115
(
121116
BNode("p1"),
122117
URIRef("http://example.com/slowmo#IsAboutPerformer"),
123-
Literal(int(performance_data_df["staff_number"].iloc[0])),
118+
Literal(int(performance_df["staff_number"].iloc[0])),
124119
)
125120
)
126121
response["candidates"] = utils.candidates_records(cool_new_super_graph)
127122

128123
response.update(full_selected_message)
129124

130125
return response
126+
127+
128+
def run_pipeline(req_info):
129+
preferences = startup.set_preferences(req_info)
130+
history: dict = req_info.get("History", {})
131+
performance_df = bitstomach.prepare(req_info)
132+
try:
133+
full_message = pipeline(preferences, history, performance_df)
134+
full_message["message_instance_id"] = req_info["message_instance_id"]
135+
full_message["performance_data"] = req_info["Performance_data"]
136+
except HTTPException as e:
137+
e.detail["message_instance_id"] = req_info["message_instance_id"]
138+
raise e
139+
return full_message

scaffold/startup.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import csv
22
import json
3-
import sys
43
from io import StringIO
54

65
import matplotlib
@@ -11,14 +10,10 @@
1110

1211
from scaffold.utils.graph_operations import manifest_to_graph
1312
from scaffold.utils.settings import settings
13+
from scaffold.utils.utils import set_logger
14+
15+
set_logger()
1416

15-
logger.remove()
16-
logger.add(
17-
sys.stdout, colorize=True, format="{level}| {message}", level=settings.log_level
18-
)
19-
logger.at_least = (
20-
lambda lvl: logger.level(lvl).no >= logger.level(settings.log_level).no
21-
)
2217
matplotlib.use("Agg")
2318
mpm: dict = {}
2419
default_preferences: dict = {}

0 commit comments

Comments
 (0)