@@ -155,14 +155,21 @@ async def launch_coordinator(self):
155155 stderr = asyncio .subprocess .STDOUT ,
156156 )
157157
158- async def launch_worker (self ):
159- assert self .worker_proc is None , "Worker already running"
158+ async def launch_worker (self , job_timeout = 480 ):
159+ if self .worker_proc and self .worker_proc .returncode is None :
160+ return
160161 config = self .dir / "pod" / "_var" / "worker.json"
161162 assert config .exists (), "Worker config not found"
162163 config_dict = json .loads (config .read_text (encoding = "utf-8" ))
163164 self .worker_url = config_dict ["public_url" ]
164165 self .worker_secret = config_dict ["admin_secret" ]
165- self .worker_log = open (self .log_dir / "worker.log" , "w" , encoding = "utf-8" )
166+ if config_dict ["job_timeout" ] != job_timeout :
167+ config_dict ["job_timeout" ] = job_timeout
168+ config .write_text (json .dumps (config_dict ), encoding = "utf-8" )
169+
170+ if self .worker_log is None :
171+ self .worker_log = open (self .log_dir / "worker.log" , "w" , encoding = "utf-8" )
172+
166173 if await self .check (f"{ self .worker_url } /health" , token = self .worker_secret ):
167174 print (f"Worker running in external process at { self .worker_url } " , file = self .worker_log )
168175 return
@@ -223,15 +230,20 @@ async def create_user(self, username: str) -> dict[str, Any]:
223230 return result
224231
225232 async def set_worker_job_timeout (self , timeout : int ):
226- headers = {
227- "Authorization" : f"Bearer { self .worker_secret } " ,
228- "Content-Type" : "application/json" ,
229- }
230- async with aiohttp .ClientSession (headers = headers ) as session :
231- async with session .post (
232- f"{ self .worker_url } /configure" , json = {"job_timeout" : timeout }
233- ) as response :
234- response .raise_for_status ()
233+ if self .worker_proc is not None :
234+ self .worker_proc .terminate ()
235+ await self .worker_proc .wait ()
236+ await self .launch_worker (job_timeout = timeout )
237+ else : # running in external process which will restart itself automatically
238+ headers = {
239+ "Authorization" : f"Bearer { self .worker_secret } " ,
240+ "Content-Type" : "application/json" ,
241+ }
242+ async with aiohttp .ClientSession (headers = headers ) as session :
243+ async with session .post (
244+ f"{ self .worker_url } /configure" , json = {"job_timeout" : timeout }
245+ ) as response :
246+ response .raise_for_status ()
235247
236248 def __enter__ (self ):
237249 self .loop .run (self .start ())
0 commit comments