Skip to content

Commit 03b309c

Browse files
author
Anton
committed
fix: check semaphore values
1 parent dcc00ac commit 03b309c

File tree

2 files changed

+11
-0
lines changed

2 files changed

+11
-0
lines changed

taskiq/receiver/receiver.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,8 @@ def task_cb(task: "asyncio.Task[Any]") -> None:
307307
self.sem_prefetch.release()
308308
message = await queue.get()
309309
if message is QUEUE_DONE:
310+
if self.sem is not None:
311+
self.sem.release()
310312
break
311313
if message is QUEUE_SKIP:
312314
# Decrease max_prefetch

tests/cli/worker/test_receiver.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,9 @@ async def task_map(vals: List[int], ctx: Context = Depends()) -> List[int]:
349349
await broker.shutdown()
350350
await listen_task
351351

352+
assert receiver.sem_idle._value == 1 # type: ignore
353+
assert receiver.sem._value == 1 # type: ignore
354+
352355

353356
@pytest.mark.anyio
354357
async def test_tasks_chain_deep() -> None:
@@ -390,6 +393,9 @@ async def wait_for_task(
390393
await broker.shutdown()
391394
await listen_task
392395

396+
assert receiver.sem_idle._value == 10 # type: ignore
397+
assert receiver.sem._value == 1 # type: ignore
398+
393399

394400
@pytest.mark.anyio
395401
async def test_tasks_sleep() -> None:
@@ -416,3 +422,6 @@ async def task_run(ind: int, ctx: Context = Depends()) -> int:
416422

417423
await broker.shutdown()
418424
await listen_task
425+
426+
assert receiver.sem_idle._value == 20 # type: ignore
427+
assert receiver.sem._value == 1 # type: ignore

0 commit comments

Comments
 (0)