Skip to content

Commit 4d4ae2b

Browse files
Merge branch 'release/25.5.22'
* release/25.5.22: (43 commits) bump version remove this check. It's flakey + users can make modifications more easily afterwards now more test need to restart mqtt to db since we are changing a struct defintion run mqtt publish in another thread so we aren't blocking the main checks new pump logging adding support for negative dosing events adding support for negative dosing events adding echo about moving ui foldeR changelog and test I don't need some of these pragma for my use case fix more tests try this improvements to creating mqtt clients; adding logic to handle if the BJ subclass's __init__ fails - we gracefully clean up try this try this revert and simply this fixes the db is lcoked errors no error letting you know what happened lets see the consequence of changing the isolation level ...
2 parents 9c547b0 + 0ac241f commit 4d4ae2b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1016
-488
lines changed

.github/workflows/ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ jobs:
7070
7171
- name: Run tests
7272
run: |
73-
pytest pioreactor/tests/ -vv --timeout 600 --random-order --durations 15
73+
pytest pioreactor/tests/ -vv --timeout 240 --random-order --durations 15 --log-level DEBUG
7474
pytest pioreactor/tests/test_automation_imports.py
7575
env:
7676
TESTING: 1

CHANGELOG.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,34 @@
1+
### 25.5.22
2+
3+
#### Enhancements
4+
- new _System logs_ under _Inventory_ to track logs happening outside of experiments in your cluster.
5+
- Better organization of logs in the UI. System logs, like calibrations, worker additions, etc. won't show up on the Overview page.
6+
- Exported data zips have folders for each dataset requested.
7+
- Improvements to the Kalman filter. For users using the growth-rate model with media dosing, you should see improvements to your growth-rate time series. We recommend the following configuration:
8+
```
9+
[growth_rate_kalman]
10+
# obs_std ↑ smooths growth rate, rate_std ↑ more responsive growth rate
11+
obs_std=1.5
12+
od_std=0.0025
13+
rate_std=0.25
14+
```
15+
**Note: the acceleration term is removed**
16+
- Added the column `hours_since_experiment_created` to dataset exports that details hours since experiment was created.
17+
- A running pump now fires off an incremental dosing event every N seconds (N=0.5 currently) to tell the software its progress. Previously, we would fire off a single event that represented the total amount moved. This is most noticeable when watching the vial volume change over time (looks more accurate over a short period).
18+
- When a pump runs, it _first_ fires off a dosing_event, which stores information about how much liquid is moved. However, if the pump is stopped early, there was no correction issued to the amount of liquid actually moved. Now, when a pump is stopped early, a _negative_ volume is sent s.t. the delta between the initial amount and new amount is equal to the dosed amount (so when you sum up the volume changes, you get the actual change, as expected).
19+
- Performance optimizations
20+
- New image installs only:
21+
- updated base OS to the latest 25-05-06 Raspberry Pi OS. The big change is using Linux kernel 6.12.
22+
23+
#### Bug fixes
24+
- fixed stir bar not spinning on Pioreactor page (UI) in some cases
25+
- alert user if their OD reading is constant before starting the growth-rate calculator, which would break things.
26+
- alert user if their software is installed in a non-standard location. If so, try `pio uninstall pioreactor -y`.
27+
- Added a warning if the OD calibration is invalid (ex: a constant line)
28+
- Fix for Raspberry Pi 5 using upstream Adafruit libraries.
29+
30+
31+
132
### 25.5.1
233

334
#### Enhancements

config.dev.ini

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,9 @@ ws_protocol=ws
149149
use_tls=0
150150

151151
[growth_rate_kalman]
152-
acc_std=0.0008
153152
obs_std=1.5
154-
od_std=0.005
155-
rate_std=0.1
153+
od_std=0.0025
154+
rate_std=0.25
156155

157156

158157
[dosing_automation.config]
@@ -165,7 +164,5 @@ max_subdose=1.0
165164
[growth_rate_calculating.config]
166165
# these next two parameters control the length and magnitude
167166
# of the variance shift that our Kalman filter performs after a dosing event
168-
ekf_variance_shift_post_dosing_minutes=0.40
169-
ekf_variance_shift_post_dosing_factor=2500
170167
ekf_outlier_std_threshold=3.0
171168
samples_for_od_statistics=35

pioreactor/actions/leader/experiment_profile.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,7 @@ def _callable() -> None:
550550
"level": options.level,
551551
"source": parent_job.job_key,
552552
"task": parent_job.job_key,
553+
"_souce": "app",
553554
}
554555
post_into_leader(
555556
f"/api/workers/{unit}/experiments/{experiment}/logs", json=body

pioreactor/actions/leader/export_experiment_data.py

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,24 @@ def source_exists(cursor, table_name_to_check: str) -> bool:
2626
return cursor.execute(query, (table_name_to_check,)).fetchone() is not None
2727

2828

29-
def generate_timestamp_to_localtimestamp_clause(timestamp_columns) -> str:
29+
def generate_timestamp_to_localtimestamp_clause(timestamp_columns: list[str]) -> str:
3030
if not timestamp_columns:
3131
return ""
3232

33-
clause = ",".join([f"datetime({c}, 'localtime') as {c}_localtime" for c in timestamp_columns])
33+
clause = ",".join([f"datetime(T.{c}, 'localtime') as {c}_localtime" for c in timestamp_columns])
34+
35+
return clause
36+
37+
38+
def generate_timestamp_to_relative_time_clause(default_order_by: str) -> str:
39+
if not default_order_by:
40+
return ""
41+
42+
START_TIME = "E.created_at"
43+
44+
clause = (
45+
f"(unixepoch({default_order_by}) - unixepoch({START_TIME}))/3600.0 as hours_since_experiment_created"
46+
)
3447

3548
return clause
3649

@@ -73,7 +86,7 @@ def create_experiment_clause(
7386
existing_placeholders = existing_placeholders | {
7487
f"experiment{i}": experiment for i, experiment in enumerate(experiments)
7588
}
76-
return f"experiment IN ({quoted_experiments})", existing_placeholders
89+
return f"T.experiment IN ({quoted_experiments})", existing_placeholders
7790

7891

7992
def create_timespan_clause(
@@ -82,15 +95,15 @@ def create_timespan_clause(
8295
if start_time is not None and end_time is not None:
8396
existing_placeholders["start_time"] = start_time
8497
existing_placeholders["end_time"] = end_time
85-
return f"{time_column} >= :start_time AND {time_column} <= :end_time", existing_placeholders
98+
return f"T.{time_column} >= :start_time AND {time_column} <= :end_time", existing_placeholders
8699

87100
elif start_time is not None:
88101
existing_placeholders["start_time"] = start_time
89-
return f"{time_column} >= :start_time", existing_placeholders
102+
return f"T.{time_column} >= :start_time", existing_placeholders
90103

91104
elif end_time is not None:
92105
existing_placeholders["end_time"] = end_time
93-
return f"{time_column} <= :end_time", existing_placeholders
106+
return f"T.{time_column} <= :end_time", existing_placeholders
94107
else:
95108
raise ValueError
96109

@@ -101,20 +114,24 @@ def create_sql_query(
101114
existing_placeholders: dict[str, str],
102115
where_clauses: list[str] | None = None,
103116
order_by_col: str | None = None,
117+
has_experiment: bool = False,
104118
) -> tuple[str, dict[str, str]]:
105119
"""
106120
Constructs an SQL query with SELECT, FROM, WHERE, and ORDER BY clauses.
107121
"""
108122
# Base SELECT and FROM clause
109-
query = f"SELECT {', '.join(selects)} FROM ({table_or_subquery})"
123+
query = f"SELECT {', '.join(selects)} FROM ({table_or_subquery}) T"
124+
125+
if has_experiment:
126+
query += " JOIN experiments E ON E.experiment = T.experiment"
110127

111128
# Add WHERE clause if provided
112129
if where_clauses:
113130
query += f" WHERE {' AND '.join(where_clauses)}"
114131

115132
# Add ORDER BY clause if provided
116133
if order_by_col:
117-
query += f' ORDER BY "{order_by_col}"'
134+
query += f' ORDER BY "T.{order_by_col}"'
118135

119136
return query, existing_placeholders
120137

@@ -143,7 +160,7 @@ def export_experiment_data(
143160
click.echo("At least one dataset name must be provided.")
144161
sys.exit(1)
145162

146-
logger = create_logger("export_experiment_data")
163+
logger = create_logger("export_experiment_data", experiment="$experiment")
147164
logger.info(
148165
f"Starting export of dataset{'s' if len(dataset_names) > 1 else ''}: {', '.join(dataset_names)}."
149166
)
@@ -159,8 +176,6 @@ def export_experiment_data(
159176
"BASE64", 1, decode_base64
160177
) # TODO: until next OS release which implements a native sqlite3 base64 function
161178

162-
con.set_trace_callback(logger.debug)
163-
164179
cursor = con.cursor()
165180
cursor.executescript(
166181
"""
@@ -171,6 +186,7 @@ def export_experiment_data(
171186
PRAGMA cache_size = -4000;
172187
"""
173188
)
189+
con.set_trace_callback(logger.debug)
174190

175191
for dataset_name in dataset_names:
176192
try:
@@ -185,15 +201,15 @@ def export_experiment_data(
185201

186202
_partition_by_unit = dataset.has_unit and (partition_by_unit or dataset.always_partition_by_unit)
187203
_partition_by_experiment = dataset.has_experiment and partition_by_experiment
188-
filenames: list[str] = []
204+
path_to_files: list[Path] = []
189205
placeholders: dict[str, str] = {}
190206

191207
order_by_col = dataset.default_order_by
192208
table_or_subquery = dataset.table or dataset.query
193209
assert table_or_subquery is not None
194210

195211
where_clauses: list[str] = []
196-
selects = ["*"]
212+
selects = ["T.*"]
197213

198214
if dataset.timestamp_columns:
199215
selects.append(generate_timestamp_to_localtimestamp_clause(dataset.timestamp_columns))
@@ -202,6 +218,9 @@ def export_experiment_data(
202218
experiment_clause, placeholders = create_experiment_clause(experiments, placeholders)
203219
where_clauses.append(experiment_clause)
204220

221+
if dataset.has_experiment and dataset.default_order_by:
222+
selects.append(generate_timestamp_to_relative_time_clause(dataset.default_order_by))
223+
205224
if dataset.timestamp_columns and (start_time or end_time):
206225
assert dataset.default_order_by is not None
207226
timespan_clause, placeholders = create_timespan_clause(
@@ -210,7 +229,7 @@ def export_experiment_data(
210229
where_clauses.append(timespan_clause)
211230

212231
query, placeholders = create_sql_query(
213-
selects, table_or_subquery, placeholders, where_clauses, order_by_col
232+
selects, table_or_subquery, placeholders, where_clauses, order_by_col, dataset.has_experiment
214233
)
215234
cursor.execute(query, placeholders)
216235

@@ -243,15 +262,17 @@ def export_experiment_data(
243262
)
244263

245264
if rows_partition not in parition_to_writer_map:
265+
# create a new csv writer for this partition since it doesn't exist yet
246266
filename = f"{dataset_name}-" + "-".join(rows_partition) + f"-{time}.csv"
247267
filename = filename.replace(" ", "_")
248-
filenames.append(filename)
249-
path_to_file = Path(Path(output).parent / filename)
268+
path_to_file = Path(output).parent / filename
250269
parition_to_writer_map[rows_partition] = csv.writer(
251270
stack.enter_context(open(path_to_file, "w")), delimiter=","
252271
)
253272
parition_to_writer_map[rows_partition].writerow(headers)
254273

274+
path_to_files.append(path_to_file)
275+
255276
parition_to_writer_map[rows_partition].writerow(row)
256277

257278
if count % 10_000 == 0:
@@ -261,10 +282,10 @@ def export_experiment_data(
261282
if count == 0:
262283
logger.warning(f"No data present in {dataset_name}. Check database?")
263284

264-
for filename in filenames:
265-
path_to_file = Path(Path(output).parent / filename)
266-
zf.write(path_to_file, arcname=filename)
267-
Path(path_to_file).unlink()
285+
zf.mkdir(dataset_name)
286+
for path_to_file in path_to_files:
287+
zf.write(path_to_file, arcname=f"{dataset_name}/{path_to_file.name}")
288+
path_to_file.unlink()
268289

269290
logger.info("Finished export.")
270291
return

0 commit comments

Comments
 (0)