@@ -71,9 +71,13 @@ def broadcast_get_across_cluster(endpoint: str, timeout: float = 1.0, return_raw
7171 )
7272
7373
74- def broadcast_post_across_cluster (endpoint : str , json : dict | None = None ) -> Result :
74+ def broadcast_post_across_cluster (
75+ endpoint : str , json : dict | None = None , params : dict | None = None
76+ ) -> Result :
7577 assert endpoint .startswith ("/unit_api" )
76- return tasks .multicast_post_across_cluster (endpoint , get_all_workers (), json = json )
78+ return tasks .multicast_post_across_cluster (
79+ endpoint , get_all_workers (), json = json , params = params
80+ )
7781
7882
7983def broadcast_delete_across_cluster (endpoint : str , json : dict | None = None ) -> Result :
@@ -91,7 +95,7 @@ def stop_all_jobs_in_experiment(experiment: str) -> ResponseReturnValue:
9195 """Kills all jobs for workers assigned to experiment"""
9296 workers_in_experiment = get_all_workers_in_experiment (experiment )
9397 tasks .multicast_post_across_cluster (
94- f "/unit_api/jobs/stop" , workers_in_experiment , params = {' experiment' : experiment }
98+ "/unit_api/jobs/stop" , workers_in_experiment , params = {" experiment" : experiment }
9599 )
96100
97101 # sometimes the leader isn't part of the experiment, but a profile associated with the experiment is running:
@@ -109,17 +113,17 @@ def stop_all_jobs_on_worker_for_experiment(
109113) -> ResponseReturnValue :
110114 """Kills all jobs for worker assigned to experiment"""
111115 if pioreactor_unit == UNIVERSAL_IDENTIFIER :
112- broadcast_post_across_cluster (f "/unit_api/jobs/stop" , params = {' experiment' : experiment })
116+ broadcast_post_across_cluster ("/unit_api/jobs/stop" , params = {" experiment" : experiment })
113117 else :
114118 tasks .multicast_post_across_cluster (
115- f "/unit_api/jobs/stop" , [pioreactor_unit ], params = {' experiment' : experiment }
119+ "/unit_api/jobs/stop" , [pioreactor_unit ], params = {" experiment" : experiment }
116120 )
117121
118122 return Response (status = 202 )
119123
120124
121125@api .route (
122- "/workers/<pioreactor_unit>/jobs/stop/job_name/<job >/experiments/<experiment>" ,
126+ "/workers/<pioreactor_unit>/jobs/stop/job_name/<job_name >/experiments/<experiment>" ,
123127 methods = ["PATCH" , "POST" ],
124128)
125129@api .route (
@@ -137,7 +141,7 @@ def stop_job_on_unit(pioreactor_unit: str, experiment: str, job_name: str) -> Re
137141 except Exception :
138142 # TODO: make this $broadcastable
139143 tasks .multicast_post_across_cluster (
140- f "/unit_api/jobs/stop" , [pioreactor_unit ], params = {' job_name' : job_name }
144+ "/unit_api/jobs/stop" , [pioreactor_unit ], params = {" job_name" : job_name }
141145 )
142146 abort (500 )
143147
@@ -480,15 +484,15 @@ def get_logs_for_unit(pioreactor_unit: str) -> ResponseReturnValue:
480484def publish_new_log (pioreactor_unit : str , experiment : str ) -> ResponseReturnValue :
481485 body = request .get_json ()
482486
483- topic = f"pioreactor/{ pioreactor_unit } /{ experiment } /logs/ui/info "
487+ topic = f"pioreactor/{ pioreactor_unit } /{ experiment } /logs/ui/{ body [ 'level' ]. lower () } "
484488 client .publish (
485489 topic ,
486490 msg_to_JSON (
487491 msg = body ["message" ],
488- source = "user" ,
489- level = "INFO" ,
492+ source = body [ "source" ] ,
493+ level = body [ "level" ]. upper () ,
490494 timestamp = body ["timestamp" ],
491- task = body ["source " ] or "" ,
495+ task = body ["task " ] or "" ,
492496 ),
493497 )
494498 return Response (status = 202 )
@@ -1261,7 +1265,7 @@ def create_experiment() -> ResponseReturnValue:
12611265@api .route ("/experiments/<experiment>" , methods = ["DELETE" ])
12621266def delete_experiment (experiment : str ) -> ResponseReturnValue :
12631267 row_count = modify_app_db ("DELETE FROM experiments WHERE experiment=?;" , (experiment ,))
1264- broadcast_post_across_cluster (f "/unit_api/jobs/stop" , params = {"experiment" : experiment })
1268+ broadcast_post_across_cluster ("/unit_api/jobs/stop" , params = {"experiment" : experiment })
12651269
12661270 if row_count > 0 :
12671271 return Response (status = 200 )
@@ -2089,7 +2093,7 @@ def remove_worker_from_experiment(experiment: str, pioreactor_unit: str) -> Resp
20892093 )
20902094 if row_count > 0 :
20912095 tasks .multicast_post_across_cluster (
2092- f "/unit_api/jobs/stop" , [pioreactor_unit ], params = {' experiment' : experiment }
2096+ "/unit_api/jobs/stop" , [pioreactor_unit ], params = {" experiment" : experiment }
20932097 )
20942098 publish_to_experiment_log (
20952099 f"Removed { pioreactor_unit } from { experiment } ." ,
@@ -2109,7 +2113,7 @@ def remove_workers_from_experiment(experiment: str) -> ResponseReturnValue:
21092113 "DELETE FROM experiment_worker_assignments WHERE experiment = ?" ,
21102114 (experiment ,),
21112115 )
2112- task = broadcast_post_across_cluster (f "/unit_api/jobs/stop" , params = {' experiment' : experiment })
2116+ task = broadcast_post_across_cluster ("/unit_api/jobs/stop" , params = {" experiment" : experiment })
21132117 publish_to_experiment_log (
21142118 f"Removed all workers from { experiment } ." ,
21152119 experiment = experiment ,
0 commit comments