Skip to content

Commit 1f5951d

Browse files
authored
fix _jobs_waiting_for_process counter leak on cancellation (#4821)
1 parent db5dd9b commit 1f5951d

File tree

1 file changed

+18
-16
lines changed

1 file changed

+18
-16
lines changed

livekit-agents/livekit/agents/ipc/proc_pool.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -100,23 +100,25 @@ async def aclose(self) -> None:
100100

101101
async def launch_job(self, info: RunningJobInfo) -> None:
102102
self._jobs_waiting_for_process += 1
103-
if (
104-
self._warmed_proc_queue.empty()
105-
and len(self._spawn_tasks) < self._jobs_waiting_for_process
106-
):
107-
# spawn a new process if there are no idle processes
108-
task = asyncio.create_task(self._proc_spawn_task())
109-
self._spawn_tasks.add(task)
110-
task.add_done_callback(self._spawn_tasks.discard)
111-
112-
if self._warmed_proc_queue.empty():
113-
logger.warning(
114-
"no warmed process available for job, waiting for one to be created",
115-
extra={"job_id": info.job.id},
116-
)
103+
try:
104+
if (
105+
self._warmed_proc_queue.empty()
106+
and len(self._spawn_tasks) < self._jobs_waiting_for_process
107+
):
108+
# spawn a new process if there are no idle processes
109+
task = asyncio.create_task(self._proc_spawn_task())
110+
self._spawn_tasks.add(task)
111+
task.add_done_callback(self._spawn_tasks.discard)
112+
113+
if self._warmed_proc_queue.empty():
114+
logger.warning(
115+
"no warmed process available for job, waiting for one to be created",
116+
extra={"job_id": info.job.id},
117+
)
117118

118-
proc = await self._warmed_proc_queue.get()
119-
self._jobs_waiting_for_process -= 1
119+
proc = await self._warmed_proc_queue.get()
120+
finally:
121+
self._jobs_waiting_for_process -= 1
120122

121123
await proc.launch_job(info)
122124
self.emit("process_job_launched", proc)

0 commit comments

Comments
 (0)