@@ -191,7 +191,7 @@ def _target(self) -> None:
191
191
192
192
class AsyncWorker (Worker ):
193
193
def __init__ (self , queue_size : int = DEFAULT_QUEUE_SIZE ) -> None :
194
- self ._queue : asyncio .Queue [Any ] = asyncio . Queue ( maxsize = queue_size )
194
+ self ._queue : Optional [ asyncio .Queue [Any ]] = None
195
195
self ._queue_size = queue_size
196
196
self ._task : Optional [asyncio .Task [None ]] = None
197
197
# Event loop needs to remain in the same process
@@ -210,10 +210,11 @@ def is_alive(self) -> bool:
210
210
211
211
def kill (self ) -> None :
212
212
if self ._task :
213
- try :
214
- self ._queue .put_nowait (_TERMINATOR )
215
- except asyncio .QueueFull :
216
- logger .debug ("async worker queue full, kill failed" )
213
+ if self ._queue is not None :
214
+ try :
215
+ self ._queue .put_nowait (_TERMINATOR )
216
+ except asyncio .QueueFull :
217
+ logger .debug ("async worker queue full, kill failed" )
217
218
# Also cancel any active callback tasks
218
219
# Avoid modifying the set while cancelling tasks
219
220
tasks_to_cancel = set (self ._active_tasks )
@@ -240,14 +241,16 @@ def start(self) -> None:
240
241
self ._task_for_pid = None
241
242
242
243
def full (self ) -> bool :
244
+ if self ._queue is None :
245
+ return True
243
246
return self ._queue .full ()
244
247
245
248
def _ensure_task (self ) -> None :
246
249
if not self .is_alive :
247
250
self .start ()
248
251
249
252
async def _wait_flush (self , timeout : float , callback : Optional [Any ] = None ) -> None :
250
- if not self ._loop or not self ._loop .is_running ():
253
+ if not self ._loop or not self ._loop .is_running () or self . _queue is None :
251
254
return
252
255
253
256
initial_timeout = min (0.1 , timeout )
@@ -276,14 +279,17 @@ async def flush_async(self, timeout: float, callback: Optional[Any] = None) -> N
276
279
277
280
def submit (self , callback : Callable [[], Any ]) -> bool :
278
281
self ._ensure_task ()
279
-
282
+ if self ._queue is None :
283
+ return False
280
284
try :
281
285
self ._queue .put_nowait (callback )
282
286
return True
283
287
except asyncio .QueueFull :
284
288
return False
285
289
286
290
async def _target (self ) -> None :
291
+ if self ._queue is None :
292
+ return
287
293
while True :
288
294
callback = await self ._queue .get ()
289
295
if callback is _TERMINATOR :
@@ -310,5 +316,6 @@ def _on_task_complete(self, task: asyncio.Task[None]) -> None:
310
316
finally :
311
317
# Mark the task as done and remove it from the active tasks set
312
318
# This happens only after the task has completed
313
- self ._queue .task_done ()
319
+ if self ._queue is not None :
320
+ self ._queue .task_done ()
314
321
self ._active_tasks .discard (task )
0 commit comments