@@ -177,24 +177,23 @@ def run_job_on_unit_in_experiment(
177177
178178 if pioreactor_unit == UNIVERSAL_IDENTIFIER :
179179 # make sure the worker is active, too
180- workers = query_app_db (
180+ assigned_workers = query_app_db (
181181 """
182- SELECT a.pioreactor_unit as worker
182+ SELECT a.pioreactor_unit, w.is_active, w.model_name, w.model_version
183183 FROM experiment_worker_assignments a
184184 JOIN workers w
185185 on w.pioreactor_unit = a.pioreactor_unit
186186 WHERE experiment = ? and w.is_active = 1
187187 """ ,
188188 (experiment ,),
189189 )
190- assert isinstance (workers , list )
191- assigned_workers = [w ["worker" ] for w in workers ]
190+ assert isinstance (assigned_workers , list )
192191
193192 else :
194193 # check if worker is part of experiment
195- okay = query_app_db (
194+ worker = query_app_db (
196195 """
197- SELECT count(1) as count
196+ SELECT a.pioreactor_unit, w.is_active, w.model_name, w.model_version
198197 FROM experiment_worker_assignments a
199198 JOIN workers w
200199 on w.pioreactor_unit = a.pioreactor_unit
@@ -203,20 +202,31 @@ def run_job_on_unit_in_experiment(
203202 (experiment , pioreactor_unit ),
204203 one = True ,
205204 )
206- assert isinstance (okay , dict )
207- if okay [ "count" ] == 0 :
205+ assert isinstance (worker , dict )
206+ if worker is None :
208207 assigned_workers = []
209208 else :
210- assigned_workers = [pioreactor_unit ]
209+ assigned_workers = [worker ]
210+
211+ assert isinstance (assigned_workers , list )
211212
212213 if len (assigned_workers ) == 0 :
213- abort (404 , f"Worker { pioreactor_unit } not found" )
214+ abort (404 , f"Worker(s) { pioreactor_unit } not found or not active. " )
214215
215216 # and we can include experiment in the env since we know these workers are in the experiment!
216- json .env = json .env | {"EXPERIMENT" : experiment , "ACTIVE" : "1" }
217+ # json.env = json.env | {"EXPERIMENT": experiment, "ACTIVE": "1"}
217218
218219 t = tasks .multicast_post_across_cluster (
219- f"/unit_api/jobs/run/job_name/{ job } " , assigned_workers , json = json
220+ f"/unit_api/jobs/run/job_name/{ job } " ,
221+ [worker ["pioreactor_unit" ] for worker in assigned_workers ],
222+ json = [
223+ (
224+ json .env
225+ | {"EXPERIMENT" : experiment , "ACTIVE" : "1" }
226+ | {"MODEL_NAME" : worker ["model_name" ], "MODEL_VERSION" : worker ["model_version" ]}
227+ )
228+ for worker in assigned_workers
229+ ],
220230 )
221231 return create_task_response (t )
222232
@@ -1808,7 +1818,7 @@ def get_list_of_units() -> ResponseReturnValue:
18081818def get_list_of_workers () -> ResponseReturnValue :
18091819 # Get a list of all workers
18101820 all_workers = query_app_db (
1811- "SELECT pioreactor_unit, added_at, is_active FROM workers ORDER BY pioreactor_unit;"
1821+ "SELECT pioreactor_unit, added_at, is_active, model_name, model_version FROM workers ORDER BY pioreactor_unit;"
18121822 )
18131823 return jsonify (all_workers )
18141824
@@ -1922,12 +1932,38 @@ def change_worker_status(pioreactor_unit: str) -> ResponseReturnValue:
19221932 abort (404 , f"Worker { pioreactor_unit } not found" )
19231933
19241934
1935+ @api .route ("/workers/<pioreactor_unit>/model" , methods = ["PUT" ])
1936+ def change_worker_model (pioreactor_unit : str ) -> ResponseReturnValue :
1937+ # Get the new status from the request body
1938+ data = request .json
1939+ model_version , model_name = data .get ("model_version" ), data .get ("model_name" )
1940+
1941+ if not model_version or not model_name :
1942+ return jsonify ({"error" : "Missing model_version or model_name" }), 400
1943+
1944+ # Update the status of the worker in the database
1945+ row_count = modify_app_db (
1946+ "UPDATE workers SET model_name = (?), model_version= (?) WHERE pioreactor_unit = (?)" ,
1947+ (model_name , model_version , pioreactor_unit ),
1948+ )
1949+
1950+ if row_count > 0 :
1951+ publish_to_log (
1952+ f"Set { pioreactor_unit } to { model_name } , { model_version } ." ,
1953+ task = "worker_model" ,
1954+ level = "INFO" ,
1955+ )
1956+ return Response (status = 200 )
1957+ else :
1958+ abort (404 , f"Worker { pioreactor_unit } not found" )
1959+
1960+
19251961@api .route ("/workers/<pioreactor_unit>" , methods = ["GET" ])
19261962def get_worker (pioreactor_unit : str ) -> ResponseReturnValue :
1927- # Query the database for the status of the worker in the given experiment
1963+ # Query the database for a worker
19281964 result = query_app_db (
19291965 """
1930- SELECT pioreactor_unit, added_at, is_active
1966+ SELECT pioreactor_unit, added_at, is_active, model_name, model_version
19311967 FROM workers
19321968 WHERE pioreactor_unit = ?
19331969 """ ,
@@ -2004,7 +2040,7 @@ def get_experiment_assignment_for_worker(pioreactor_unit: str) -> ResponseReturn
20042040 # Get the experiment that a worker is assigned to along with its active status
20052041 result = query_app_db (
20062042 """
2007- SELECT w.pioreactor_unit, w.is_active, a.experiment
2043+ SELECT w.pioreactor_unit, w.is_active, a.experiment, w.model_name, w.model_version
20082044 FROM workers w
20092045 LEFT JOIN experiment_worker_assignments a
20102046 on w.pioreactor_unit = a.pioreactor_unit
@@ -2032,7 +2068,7 @@ def get_experiment_assignment_for_worker(pioreactor_unit: str) -> ResponseReturn
20322068def get_list_of_workers_for_experiment (experiment : str ) -> ResponseReturnValue :
20332069 workers = query_app_db (
20342070 """
2035- SELECT w.pioreactor_unit, is_active
2071+ SELECT w.pioreactor_unit, w. is_active, w.model_name, w.model_version
20362072 FROM experiment_worker_assignments a
20372073 JOIN workers w
20382074 on w.pioreactor_unit = a.pioreactor_unit
0 commit comments