Skip to content

Commit 1774ec5

Browse files
author
Anton
committed
fix: more tests
1 parent 4025160 commit 1774ec5

File tree

3 files changed

+60
-5
lines changed

3 files changed

+60
-5
lines changed

taskiq/receiver/receiver.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,5 +357,6 @@ async def task_idler(self, wait: float) -> None:
357357
# Decrease max_prefetch in runner
358358
task = asyncio.create_task(self.queue.put_first(QUEUE_SKIP))
359359
# Decrease max_tasks
360+
360361
await self.sem.acquire_first()
361362
await task

taskiq/semaphore.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class DequeSemaphore:
1515
def __init__(self, value: int) -> None:
1616
self._value = value
1717
self._waiters: Deque[asyncio.Future[Any]] = collections.deque()
18+
self._waiters_first: Deque[asyncio.Future[Any]] = collections.deque()
1819

1920
if self._value < 0:
2021
raise ValueError("Value should be >= 0")
@@ -31,8 +32,10 @@ def locked(self) -> bool:
3132
3233
:returns: true or false
3334
"""
34-
return self._value == 0 or (
35-
any(not waiter.cancelled() for waiter in (self._waiters or ()))
35+
return (
36+
self._value == 0
37+
or any(not waiter.cancelled() for waiter in (self._waiters or ()))
38+
or any(not waiter.cancelled() for waiter in (self._waiters_first or ()))
3639
)
3740

3841
def release(self) -> None:
@@ -62,15 +65,18 @@ async def acquire(self, first: bool = False) -> Literal[True]: # noqa: C901
6265
fut: asyncio.Future[Any] = asyncio.Future()
6366

6467
if first:
65-
self._waiters.appendleft(fut)
68+
self._waiters_first.append(fut)
6669
else:
6770
self._waiters.append(fut)
6871

6972
try:
7073
try: # noqa: WPS501, WPS505
7174
await fut
7275
finally:
73-
self._waiters.remove(fut)
76+
if first:
77+
self._waiters_first.remove(fut)
78+
else:
79+
self._waiters.remove(fut)
7480

7581
except asyncio.exceptions.CancelledError:
7682
if not fut.cancelled():
@@ -95,9 +101,15 @@ async def acquire_first(self) -> Literal[True]:
95101
return await self.acquire(True)
96102

97103
def _wakeup_next(self) -> None:
98-
if not self._waiters:
104+
if not self._waiters and not self._waiters_first:
99105
return
100106

107+
for fut in self._waiters_first:
108+
if not fut.done():
109+
self._value -= 1
110+
fut.set_result(True)
111+
return
112+
101113
for fut in self._waiters:
102114
if not fut.done():
103115
self._value -= 1

tests/cli/worker/test_receiver.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from taskiq.message import TaskiqMessage
1515
from taskiq.receiver import Receiver
1616
from taskiq.result import TaskiqResult
17+
from taskiq.task import AsyncTaskiqTask
1718

1819
_T = TypeVar("_T")
1920

@@ -346,3 +347,44 @@ async def task_map(vals: List[int], ctx: Context = Depends()) -> List[int]:
346347

347348
await broker.shutdown()
348349
await listen_task
350+
351+
352+
@pytest.mark.anyio
353+
async def test_tasks_chain_deep() -> None:
354+
""""""
355+
broker = InMemoryQueueBroker()
356+
357+
@broker.task
358+
async def task_run(depth: int, val: Any, ctx: Context = Depends()) -> Any:
359+
if depth == 0:
360+
return val
361+
362+
t = await task_run.kiq(depth - 1, val)
363+
resp = await wait_for_task(t, interval=0.05, ctx=ctx)
364+
return resp.return_value
365+
366+
async def wait_for_task(
367+
task: AsyncTaskiqTask[Any],
368+
interval: float,
369+
ctx: Context,
370+
) -> TaskiqResult[Any]:
371+
while True:
372+
resp_task = asyncio.create_task(
373+
task.wait_result(interval * 0.4, timeout=interval),
374+
)
375+
await ctx.task_idler(interval)
376+
377+
try:
378+
return await resp_task
379+
except TaskiqResultTimeoutError:
380+
continue
381+
382+
receiver = get_receiver(broker, max_async_tasks=1, max_idle_tasks=10)
383+
listen_task = asyncio.create_task(receiver.listen())
384+
385+
task = await task_run.kiq(10, "hello world!")
386+
resp = await task.wait_result(timeout=1)
387+
assert resp.return_value == "hello world!"
388+
389+
await broker.shutdown()
390+
await listen_task

0 commit comments

Comments
 (0)