Skip to content

Commit b089db7

Browse files
[fix] Restore proper thread-based TaskQueue worker management
- Fix async/sync mismatch in TaskQueue worker implementation - Use threading.Thread with asyncio.run() as originally designed - Remove incorrect async task approach that caused blocking issues - TaskQueue now properly manages its own thread lifecycle - Resolves WebSocket message delivery and task processing issues
1 parent 7a73f5d commit b089db7

File tree

1 file changed

+6
-14
lines changed

1 file changed

+6
-14
lines changed

comfyui_manager/glob/manager_server.py

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -216,25 +216,17 @@ def is_processing(self) -> bool:
216216
"""Check if the queue is currently processing tasks"""
217217
return (
218218
self._worker_task is not None
219-
and not self._worker_task.done()
220-
and (len(self.running_tasks) > 0 or len(self.pending_tasks) > 0)
219+
and self._worker_task.is_alive()
221220
)
222221

223-
async def start_worker(self) -> bool:
222+
def start_worker(self) -> bool:
224223
"""Start the task worker if not already running. Returns True if started, False if already running."""
225-
if self._worker_task is not None and not self._worker_task.done():
224+
if self._worker_task is not None and self._worker_task.is_alive():
226225
return False # Already running
227226

228-
self._worker_task = asyncio.create_task(self._worker())
227+
self._worker_task = threading.Thread(target=lambda: asyncio.run(task_worker()))
228+
self._worker_task.start()
229229
return True
230-
231-
async def _worker(self):
232-
"""Internal worker that processes the task queue"""
233-
try:
234-
await task_worker()
235-
finally:
236-
# Clean up worker reference when done
237-
self._worker_task = None
238230

239231
def get_current_state(self) -> TaskStateMessage:
240232
return TaskStateMessage(
@@ -1435,7 +1427,7 @@ async def queue_count(request):
14351427
@routes.get("/v2/manager/queue/start")
14361428
async def queue_start(request):
14371429
# finalize_temp_queue_batch()
1438-
started = await task_queue.start_worker()
1430+
started = task_queue.start_worker()
14391431

14401432
if started:
14411433
return web.Response(status=200) # Started successfully

0 commit comments

Comments
 (0)