Skip to content

Commit 779f421

Browse files
author
Anton
committed
fix: replace with simple acquire
1 parent d13711f commit 779f421

File tree

3 files changed

+5
-75
lines changed

3 files changed

+5
-75
lines changed

taskiq/receiver/receiver.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from taskiq.message import TaskiqMessage
1515
from taskiq.receiver.params_parser import parse_params
1616
from taskiq.result import TaskiqResult
17-
from taskiq.semaphore import DequeSemaphore
1817
from taskiq.state import TaskiqState
1918
from taskiq.utils import DequeQueue, maybe_awaitable
2019

@@ -59,15 +58,15 @@ def __init__( # noqa: WPS211
5958
self.task_signatures[task.task_name] = inspect.signature(task.original_func)
6059
self.task_hints[task.task_name] = get_type_hints(task.original_func)
6160
self.dependency_graphs[task.task_name] = DependencyGraph(task.original_func)
62-
self.sem: "Optional[DequeSemaphore]" = None
61+
self.sem: "Optional[asyncio.Semaphore]" = None
6362
if max_async_tasks is not None and max_async_tasks > 0:
64-
self.sem = DequeSemaphore(max_async_tasks)
63+
self.sem = asyncio.Semaphore(max_async_tasks)
6564
else:
6665
logger.warning(
6766
"Setting unlimited number of async tasks "
6867
+ "can result in undefined behavior",
6968
)
70-
self.sem_prefetch = DequeSemaphore(max_prefetch)
69+
self.sem_prefetch = asyncio.Semaphore(max_prefetch)
7170
self.queue: DequeQueue[bytes] = DequeQueue()
7271

7372
self.sem_idle: Optional[asyncio.Semaphore] = None
@@ -310,7 +309,7 @@ def task_cb(task: "asyncio.Task[Any]") -> None:
310309
break
311310
if message is QUEUE_SKIP:
312311
# Decrease max_prefetch
313-
prefetch_dec = asyncio.create_task(self.sem_prefetch.acquire_first())
312+
prefetch_dec = asyncio.create_task(self.sem_prefetch.acquire())
314313
prefetch_dec.add_done_callback(tasks.discard)
315314
tasks.add(prefetch_dec)
316315

@@ -357,5 +356,5 @@ async def task_idler(self, wait: float) -> None:
357356
# Decrease max_prefetch in runner
358357
task = asyncio.create_task(self.queue.put_first(QUEUE_SKIP))
359358
# Decrease max_tasks
360-
await self.sem.acquire_first()
359+
await self.sem.acquire()
361360
await task

taskiq/semaphore.py

Lines changed: 0 additions & 67 deletions
This file was deleted.

tests/cli/worker/test_receiver.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,6 @@ async def task_map(vals: List[int]) -> List[int]:
308308

309309
return [r.return_value for r in resps]
310310

311-
await broker.startup()
312311
receiver = get_receiver(broker, max_async_tasks=1)
313312
listen_task = asyncio.create_task(receiver.listen())
314313

@@ -338,7 +337,6 @@ async def task_map(vals: List[int], ctx: Context = Depends()) -> List[int]:
338337
res = [r.return_value for r in resps]
339338
return res
340339

341-
await broker.startup()
342340
receiver = get_receiver(broker, max_async_tasks=1, max_idle_tasks=1)
343341
listen_task = asyncio.create_task(receiver.listen())
344342

0 commit comments

Comments
 (0)