Skip to content

Commit d0776df

Browse files
author
Anton
committed
fix: timeout
1 parent 670dff8 commit d0776df

File tree

3 files changed

+5
-4
lines changed

3 files changed

+5
-4
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ classifiers = [
2626
keywords = ["taskiq", "tasks", "distributed", "async"]
2727

2828
[tool.poetry.dependencies]
29-
python = "^3.7"
29+
python = "^3.10"
3030
typing-extensions = ">=3.10.0.0"
3131
pydantic = "^1.6.2"
3232
importlib-metadata = "*"

taskiq/receiver/receiver.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,9 +351,10 @@ async def task_idler(self, wait: float) -> None:
351351
self.sem.release()
352352

353353
# Wait
354-
await asyncio.sleep(wait - (time() - start_time))
354+
await asyncio.sleep(max(wait - (time() - start_time), 0))
355355

356356
# Decrease max_prefetch in runner
357-
await self.queue.put_first(QUEUE_SKIP)
357+
task = asyncio.create_task(self.queue.put_first(QUEUE_SKIP))
358358
# Decrease max_tasks
359359
await self.sem.acquire_first()
360+
await task

tests/cli/worker/test_receiver.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ async def task_add_one(val: int) -> int:
332332
@broker.task
333333
async def task_map(vals: List[int], ctx: Context = Depends()) -> List[int]:
334334
tasks = [await task_add_one.kiq(val) for val in vals]
335-
await ctx.task_idler(0.1)
335+
await ctx.task_idler(0.5)
336336
resps_tasks = [asyncio.create_task(t.wait_result(timeout=1)) for t in tasks]
337337
resps = await asyncio.gather(*resps_tasks)
338338
res = [r.return_value for r in resps]

0 commit comments

Comments
 (0)