Skip to content

Commit 6703770

Browse files
single worker time series
1 parent 801212e commit 6703770

File tree

3 files changed

+209
-8
lines changed

3 files changed

+209
-8
lines changed

pioreactorui/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ def _get_app_db_connection():
176176
logger.error("Database is locked, please close any other connections or restart.")
177177
elif "unable to open database file" in str(e):
178178
logger.error(
179-
"Permissions on database are probably incorrect, ownership should be pioreactor:www-data on all sqlite files."
179+
"Permissions on database are probably incorrect, ownership should be pioreactor:www-data on ALL sqlite files AND THE .pioreactor/storage DIR! ."
180180
)
181181
raise e
182182

pioreactorui/api.py

Lines changed: 202 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ def get_logs() -> ResponseReturnValue:
404404
f"""SELECT l.timestamp, level, l.pioreactor_unit, message, task, l.experiment
405405
FROM logs AS l
406406
WHERE ({get_level_string(min_level)})
407-
ORDER BY l.timestamp DESC LIMIT 50 OFFSET {skip};"""
407+
ORDER BY l.timestamp DESC LIMIT 100 OFFSET {skip};"""
408408
)
409409

410410
return jsonify(recent_logs)
@@ -422,7 +422,7 @@ def get_exp_logs(experiment: str) -> ResponseReturnValue:
422422
FROM logs AS l
423423
WHERE (l.experiment=? )
424424
AND ({get_level_string(min_level)})
425-
ORDER BY l.timestamp DESC LIMIT 50 OFFSET {skip};""",
425+
ORDER BY l.timestamp DESC LIMIT 100 OFFSET {skip};""",
426426
(experiment,),
427427
)
428428

@@ -471,7 +471,7 @@ def get_logs_for_unit_and_experiment(pioreactor_unit: str, experiment: str) -> R
471471
WHERE (l.experiment=?)
472472
AND (l.pioreactor_unit=? or l.pioreactor_unit=?)
473473
AND ({get_level_string(min_level)})
474-
ORDER BY l.timestamp DESC LIMIT 50 OFFSET {skip};""",
474+
ORDER BY l.timestamp DESC LIMIT 100 OFFSET {skip};""",
475475
(experiment, pioreactor_unit, UNIVERSAL_IDENTIFIER),
476476
)
477477

@@ -491,7 +491,7 @@ def get_system_logs_for_unit(pioreactor_unit: str) -> ResponseReturnValue:
491491
WHERE (l.experiment="$experiment")
492492
AND (l.pioreactor_unit=? or l.pioreactor_unit=?)
493493
AND ({get_level_string(min_level)})
494-
ORDER BY l.timestamp DESC LIMIT 50 OFFSET {skip};""",
494+
ORDER BY l.timestamp DESC LIMIT 100 OFFSET {skip};""",
495495
(pioreactor_unit, UNIVERSAL_IDENTIFIER),
496496
)
497497

@@ -510,7 +510,7 @@ def get_logs_for_unit(pioreactor_unit: str) -> ResponseReturnValue:
510510
FROM logs AS l
511511
WHERE (l.pioreactor_unit=? or l.pioreactor_unit=?)
512512
AND ({get_level_string(min_level)})
513-
ORDER BY l.timestamp DESC LIMIT 50 OFFSET {skip};""",
513+
ORDER BY l.timestamp DESC LIMIT 100 OFFSET {skip};""",
514514
(pioreactor_unit, UNIVERSAL_IDENTIFIER),
515515
)
516516

@@ -723,6 +723,203 @@ def get_fallback_time_series(experiment: str, data_source: str, column: str) ->
723723
return attach_cache_control(as_json_response(r["json"]))
724724

725725

726+
@api.route(
727+
"/workers/<pioreactor_unit>/experiments/<experiment>/time_series/growth_rates", methods=["GET"]
728+
)
729+
def get_growth_rates_per_unit(pioreactor_unit: str, experiment: str) -> ResponseReturnValue:
730+
args = request.args
731+
filter_mod_n = float(args.get("filter_mod_N", 100.0))
732+
lookback = float(args.get("lookback", 4.0))
733+
734+
growth_rates = query_app_db(
735+
"""
736+
SELECT
737+
json_object('series', json_group_array(unit), 'data', json_group_array(json(data))) as json
738+
FROM (
739+
SELECT pioreactor_unit as unit,
740+
json_group_array(json_object('x', timestamp, 'y', round(rate, 5))) as data
741+
FROM growth_rates
742+
WHERE experiment=? AND pioreactor_unit=?
743+
((ROWID * 0.61803398875) - cast(ROWID * 0.61803398875 as int) < 1.0/?) AND
744+
timestamp > STRFTIME('%Y-%m-%dT%H:%M:%f000Z', 'NOW', ?)
745+
GROUP BY 1
746+
);
747+
""",
748+
(experiment, pioreactor_unit, filter_mod_n, f"-{lookback} hours"),
749+
one=True,
750+
)
751+
752+
assert isinstance(growth_rates, dict)
753+
return attach_cache_control(as_json_response(growth_rates["json"]))
754+
755+
756+
@api.route(
757+
"/workers/<pioreactor_unit>/experiments/<experiment>/time_series/temperature_readings",
758+
methods=["GET"],
759+
)
760+
def get_temperature_readings_per_unit(pioreactor_unit: str, experiment: str) -> ResponseReturnValue:
761+
args = request.args
762+
filter_mod_n = float(args.get("filter_mod_N", 100.0))
763+
lookback = float(args.get("lookback", 4.0))
764+
765+
temperature_readings = query_app_db(
766+
"""
767+
SELECT json_object('series', json_group_array(unit), 'data', json_group_array(json(data))) as json
768+
FROM (
769+
SELECT
770+
pioreactor_unit as unit,
771+
json_group_array(json_object('x', timestamp, 'y', round(temperature_c, 2))) as data
772+
FROM temperature_readings
773+
WHERE experiment=? AND pioreactor_unit=? AND
774+
((ROWID * 0.61803398875) - cast(ROWID * 0.61803398875 as int) < 1.0/?) AND
775+
timestamp > STRFTIME('%Y-%m-%dT%H:%M:%f000Z', 'NOW' , ?)
776+
GROUP BY 1
777+
);
778+
""",
779+
(experiment, pioreactor_unit, filter_mod_n, f"-{lookback} hours"),
780+
one=True,
781+
)
782+
783+
assert isinstance(temperature_readings, dict)
784+
return attach_cache_control(as_json_response(temperature_readings["json"]))
785+
786+
787+
@api.route(
788+
"/workers/<pioreactor_unit>/experiments/<experiment>/time_series/od_readings_filtered",
789+
methods=["GET"],
790+
)
791+
def get_od_readings_filtered_per_unit(pioreactor_unit: str, experiment: str) -> ResponseReturnValue:
792+
args = request.args
793+
filter_mod_n = float(args.get("filter_mod_N", 100.0))
794+
lookback = float(args.get("lookback", 4.0))
795+
796+
filtered_od_readings = query_app_db(
797+
"""
798+
SELECT
799+
json_object('series', json_group_array(unit), 'data', json_group_array(json(data))) as json
800+
FROM (
801+
SELECT
802+
pioreactor_unit as unit,
803+
json_group_array(json_object('x', timestamp, 'y', round(normalized_od_reading, 7))) as data
804+
FROM od_readings_filtered
805+
WHERE experiment=? AND pioreactor_unit=? AND
806+
((ROWID * 0.61803398875) - cast(ROWID * 0.61803398875 as int) < 1.0/?) AND
807+
timestamp > STRFTIME('%Y-%m-%dT%H:%M:%f000Z', 'NOW', ?)
808+
GROUP BY 1
809+
);
810+
""",
811+
(experiment, pioreactor_unit, filter_mod_n, f"-{lookback} hours"),
812+
one=True,
813+
)
814+
815+
assert isinstance(filtered_od_readings, dict)
816+
return attach_cache_control(as_json_response(filtered_od_readings["json"]))
817+
818+
819+
@api.route(
820+
"/workers/<pioreactor_unit>/experiments/<experiment>/time_series/od_readings", methods=["GET"]
821+
)
822+
def get_od_readings_per_unit(pioreactor_unit: str, experiment: str) -> ResponseReturnValue:
823+
args = request.args
824+
filter_mod_n = float(args.get("filter_mod_N", 100.0))
825+
lookback = float(args.get("lookback", 4.0))
826+
827+
raw_od_readings = query_app_db(
828+
"""
829+
SELECT
830+
json_object('series', json_group_array(unit), 'data', json_group_array(json(data))) as json
831+
FROM (
832+
SELECT pioreactor_unit || '-' || channel as unit, json_group_array(json_object('x', timestamp, 'y', round(od_reading, 7))) as data
833+
FROM od_readings
834+
WHERE experiment=? AND pioreactor_unit=? AND
835+
((ROWID * 0.61803398875) - cast(ROWID * 0.61803398875 as int) < 1.0/?) AND
836+
timestamp > STRFTIME('%Y-%m-%dT%H:%M:%f000Z', 'NOW', ?)
837+
GROUP BY 1
838+
);
839+
""",
840+
(experiment, pioreactor_unit, filter_mod_n, f"-{lookback} hours"),
841+
one=True,
842+
)
843+
844+
assert isinstance(raw_od_readings, dict)
845+
return attach_cache_control(as_json_response(raw_od_readings["json"]))
846+
847+
848+
@api.route(
849+
"/workers/<pioreactor_unit>/experiments/<experiment>/time_series/raw_od_readings",
850+
methods=["GET"],
851+
)
852+
def get_od_raw_readings_per_unit(pioreactor_unit: str, experiment: str) -> ResponseReturnValue:
853+
args = request.args
854+
filter_mod_n = float(args.get("filter_mod_N", 100.0))
855+
lookback = float(args.get("lookback", 4.0))
856+
857+
raw_od_readings = query_app_db(
858+
"""
859+
SELECT
860+
json_object('series', json_group_array(unit), 'data', json_group_array(json(data))) as json
861+
FROM (
862+
SELECT pioreactor_unit || '-' || channel as unit, json_group_array(json_object('x', timestamp, 'y', round(od_reading, 7))) as data
863+
FROM raw_od_readings
864+
WHERE experiment=? AND pioreactor_unit=? AND
865+
((ROWID * 0.61803398875) - cast(ROWID * 0.61803398875 as int) < 1.0/?) AND
866+
timestamp > STRFTIME('%Y-%m-%dT%H:%M:%f000Z', 'NOW', ?)
867+
GROUP BY 1
868+
);
869+
""",
870+
(experiment, pioreactor_unit, filter_mod_n, f"-{lookback} hours"),
871+
one=True,
872+
)
873+
874+
assert isinstance(raw_od_readings, dict)
875+
return attach_cache_control(as_json_response(raw_od_readings["json"]))
876+
877+
878+
@api.route(
879+
"/workers/<pioreactor_unit>/experiments/<experiment>/time_series/<data_source>/<column>",
880+
methods=["GET"],
881+
)
882+
def get_fallback_time_series_per_unit(
883+
pioreactor_unit: str, experiment: str, data_source: str, column: str
884+
) -> ResponseReturnValue:
885+
args = request.args
886+
filter_mod_n = float(args.get("filter_mod_N", 100.0))
887+
lookback = float(args.get("lookback", 4.0))
888+
889+
try:
890+
data_source = scrub_to_valid(data_source)
891+
column = scrub_to_valid(column)
892+
r = query_app_db(
893+
f"""
894+
WITH incrementing_data AS (
895+
SELECT pioreactor_unit as unit, timestamp, {column} as data, row_number() OVER () AS row_num
896+
FROM {data_source}
897+
WHERE experiment=? AND
898+
timestamp > STRFTIME('%Y-%m-%dT%H:%M:%f000Z', 'NOW',?) AND
899+
pioreactor_unit=?
900+
{column} IS NOT NULL
901+
)
902+
SELECT
903+
json_object('series', json_group_array(unit), 'data', json_group_array(json(rdata))) as json
904+
FROM (
905+
SELECT unit, json_group_array(json_object('x', timestamp, 'y', round(data, 7))) as rdata
906+
FROM incrementing_data
907+
WHERE ((row_num * 0.61803398875) - cast(row_num * 0.61803398875 as int) < 1.0/?)
908+
GROUP BY 1
909+
);
910+
""",
911+
(experiment, f"-{lookback} hours", pioreactor_unit, filter_mod_n),
912+
one=True,
913+
)
914+
915+
except Exception as e:
916+
publish_to_error_log(str(e), "get_fallback_time_series")
917+
abort(400, str(e))
918+
919+
assert isinstance(r, dict)
920+
return attach_cache_control(as_json_response(r["json"]))
921+
922+
726923
@api.route("/experiments/<experiment>/media_rates", methods=["GET"])
727924
def get_media_rates(experiment: str) -> ResponseReturnValue:
728925
"""

pioreactorui/tasks.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -434,10 +434,14 @@ def multicast_get_across_cluster(
434434
tasks = get_from_worker.map(
435435
((workers[i], endpoint, json[i], timeout, return_raw) for i in range(len(workers)))
436436
)
437-
return {
438-
worker: response for (worker, response) in tasks.get(blocking=True, timeout=30)
437+
unsorted_responses = {
438+
worker: response for (worker, response) in tasks.get(blocking=True, timeout=15)
439439
} # add a timeout so that we don't hold up a thread forever.
440440

441+
return dict(
442+
sorted(unsorted_responses.items())
443+
) # always sort alphabetically for downstream uses.
444+
441445

442446
@huey.task(priority=10)
443447
def patch_to_worker(worker: str, endpoint: str, json: dict | None = None) -> tuple[str, Any]:

0 commit comments

Comments
 (0)