Skip to content

Commit 343ca87

Browse files
[refactor] Remove legacy thread management for TaskQueue
- Add proper async worker management to TaskQueue class - Remove redundant task_worker_thread and task_worker_lock global variables - Replace manual threading with async task management - Update is_processing() logic to use TaskQueue state instead of thread status - Implement automatic worker cleanup when queue processing completes - Simplify queue start endpoint to use TaskQueue.start_worker()
1 parent e0a82bc commit 343ca87

File tree

1 file changed

+34
-24
lines changed

1 file changed

+34
-24
lines changed

comfyui_manager/glob/manager_server.py

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,31 @@ def __init__(self):
210210
self.batch_id = None
211211
self.batch_start_time = None
212212
self.batch_state_before = None
213+
self._worker_task = None
213214

215+
def is_processing(self) -> bool:
216+
"""Check if the queue is currently processing tasks"""
217+
return (
218+
self._worker_task is not None
219+
and not self._worker_task.done()
220+
and (len(self.running_tasks) > 0 or len(self.pending_tasks) > 0)
221+
)
222+
223+
async def start_worker(self) -> bool:
224+
"""Start the task worker if not already running. Returns True if started, False if already running."""
225+
if self._worker_task is not None and not self._worker_task.done():
226+
return False # Already running
227+
228+
self._worker_task = asyncio.create_task(self._worker())
229+
return True
230+
231+
async def _worker(self):
232+
"""Internal worker that processes the task queue"""
233+
try:
234+
await task_worker()
235+
finally:
236+
# Clean up worker reference when done
237+
self._worker_task = None
214238

215239
def get_current_state(self) -> TaskStateMessage:
216240
return TaskStateMessage(
@@ -332,7 +356,7 @@ async def task_done(
332356
ui_id=item.ui_id,
333357
result=result_msg,
334358
kind=item.kind,
335-
status=pydantic_status,
359+
status=status,
336360
timestamp=datetime.fromisoformat(timestamp),
337361
state=self.get_current_state(),
338362
),
@@ -536,9 +560,6 @@ def _extract_batch_operations(self) -> list[BatchOperation]:
536560

537561
task_queue = TaskQueue()
538562

539-
# Legacy variables for compatibility
540-
task_worker_thread = None
541-
task_worker_lock = threading.Lock()
542563

543564

544565
# Note: Model path utilities moved to model_utils.py to avoid duplication
@@ -1385,8 +1406,7 @@ async def queue_count(request):
13851406
"done_count": len(history_client_tasks),
13861407
"in_progress_count": len(running_client_tasks),
13871408
"pending_count": len(pending_client_tasks),
1388-
"is_processing": task_worker_thread is not None
1389-
and task_worker_thread.is_alive(),
1409+
"is_processing": len(running_client_tasks) > 0,
13901410
}
13911411
)
13921412
else:
@@ -1397,32 +1417,22 @@ async def queue_count(request):
13971417
"done_count": task_queue.done_count(),
13981418
"in_progress_count": len(task_queue.running_tasks),
13991419
"pending_count": len(task_queue.pending_tasks),
1400-
"is_processing": task_worker_thread is not None
1401-
and task_worker_thread.is_alive(),
1420+
"is_processing": task_queue.is_processing(),
14021421
}
14031422
)
14041423

14051424

1406-
task_worker_thread: threading.Thread = None
14071425

14081426

14091427
@routes.get("/v2/manager/queue/start")
14101428
async def queue_start(request):
1411-
with task_worker_lock:
1412-
# finalize_temp_queue_batch()
1413-
return _queue_start()
1414-
1415-
1416-
def _queue_start():
1417-
global task_worker_thread
1418-
1419-
if task_worker_thread is not None and task_worker_thread.is_alive():
1420-
return web.Response(status=201) # already in-progress
1421-
1422-
task_worker_thread = threading.Thread(target=lambda: asyncio.run(task_worker()))
1423-
task_worker_thread.start()
1424-
1425-
return web.Response(status=200)
1429+
# finalize_temp_queue_batch()
1430+
started = await task_queue.start_worker()
1431+
1432+
if started:
1433+
return web.Response(status=200) # Started successfully
1434+
else:
1435+
return web.Response(status=201) # Already in-progress
14261436

14271437

14281438
@routes.get("/v2/manager/queue/update_comfyui")

0 commit comments

Comments
 (0)