@@ -165,6 +165,7 @@ async def resolve_scheduler_request(
165
165
timeout_time : float ,
166
166
results_queue : multiprocessing .Queue ,
167
167
process_id : int ,
168
+ shutdown_event : Optional [MultiprocessingEvent ] = None ,
168
169
):
169
170
info = SchedulerRequestInfo (
170
171
targeted_start_time = start_time ,
@@ -183,7 +184,21 @@ async def resolve_scheduler_request(
183
184
asyncio .create_task (self .send_result (results_queue , request_scheduled_result ))
184
185
185
186
if (wait_time := start_time - time .time ()) > 0 :
186
- await asyncio .sleep (wait_time )
187
+ if shutdown_event is None :
188
+ await asyncio .sleep (wait_time )
189
+ else :
190
+ shutdown_signal_received = \
191
+ await self ._sleep_intermittently_until_timestamp_or_shutdown (
192
+ sleep_until_timestamp = start_time ,
193
+ shutdown_event = shutdown_event ,
194
+ )
195
+ if shutdown_signal_received :
196
+ logger .info (
197
+ "Received shutdown signal "
198
+ "while waiting to start "
199
+ f"|| Process ID { process_id } "
200
+ )
201
+ return
187
202
188
203
info .worker_start = time .time ()
189
204
request_start_result : WorkerProcessResult [RequestT , ResponseT ] = \
@@ -211,6 +226,18 @@ async def resolve_scheduler_request(
211
226
)
212
227
asyncio .create_task (self .send_result (results_queue , result ))
213
228
229
+ async def _sleep_intermittently_until_timestamp_or_shutdown (
230
+ self ,
231
+ sleep_until_timestamp : float ,
232
+ shutdown_event : MultiprocessingEvent ,
233
+ ) -> bool :
234
+ delta = timedelta (seconds = 10 ).total_seconds ()
235
+ while time .time () < sleep_until_timestamp :
236
+ await asyncio .sleep (delta )
237
+ if shutdown_event .is_set ():
238
+ return True
239
+ return False
240
+
214
241
def process_loop_synchronous (
215
242
self ,
216
243
requests_queue : multiprocessing .Queue ,
@@ -240,6 +267,7 @@ async def _process_runner():
240
267
timeout_time = process_request .timeout_time ,
241
268
results_queue = results_queue ,
242
269
process_id = process_id ,
270
+ shutdown_event = shutdown_event ,
243
271
)
244
272
245
273
try :
@@ -271,10 +299,26 @@ async def _process_runner():
271
299
shutdown_event = shutdown_event ,
272
300
process_id = process_id )
273
301
) is not None :
302
+ if shutdown_event and shutdown_event .is_set ():
303
+ logger .error ("This shouldn't happen! "
304
+ "We should catch the "
305
+ "shutdown in the get wrapper" )
306
+ logger .info (f"Shutdown signal received"
307
+ f" in future { process_id } " )
308
+ break
309
+
274
310
dequeued_time = time .time ()
311
+ logger .debug (f"Dequeued Process ID { process_id } || "
312
+ f"Timestamp { dequeued_time } || "
313
+ f"Semaphore { pending ._value } /{ max_concurrency } " )
275
314
276
315
await pending .acquire ()
277
316
317
+ lock_acquired_at = time .time ()
318
+ logger .debug (f"Lock acquired Process ID { process_id } ||"
319
+ f" Timestamp { lock_acquired_at } ||"
320
+ f" Semaphore { pending ._value } /{ max_concurrency } " )
321
+
278
322
def _task_done (_ : asyncio .Task ):
279
323
nonlocal pending
280
324
pending .release ()
@@ -292,6 +336,7 @@ def _task_done(_: asyncio.Task):
292
336
timeout_time = process_request .timeout_time ,
293
337
results_queue = results_queue ,
294
338
process_id = process_id ,
339
+ shutdown_event = shutdown_event ,
295
340
)
296
341
)
297
342
task .add_done_callback (_task_done )
0 commit comments