Skip to content

Commit e8b0c3b

Browse files
author
Anton
committed
refactor: better names
1 parent 5397858 commit e8b0c3b

File tree

7 files changed

+42
-39
lines changed

7 files changed

+42
-39
lines changed

taskiq/cli/worker/args.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class WorkerArgs:
4141
receiver: str = "taskiq.receiver:Receiver"
4242
receiver_arg: List[Tuple[str, str]] = field(default_factory=list)
4343
max_prefetch: int = 0
44-
max_idle_tasks: Optional[int] = None
44+
max_sleeping_tasks: Optional[int] = None
4545
no_propagate_errors: bool = False
4646

4747
@classmethod
@@ -189,11 +189,11 @@ def from_cli( # noqa: WPS213
189189
help="Maximum prefetched tasks per worker process. ",
190190
)
191191
parser.add_argument(
192-
"--max-idle-tasks",
192+
"--max-sleeping-tasks",
193193
type=int,
194-
dest="max_idle_tasks",
194+
dest="max_sleeping_tasks",
195195
default=None,
196-
help="Maximum idle tasks per worker process. ",
196+
help="Maximum sleeping tasks per worker process. ",
197197
)
198198

199199
namespace = parser.parse_args(args)

taskiq/cli/worker/run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
133133
validate_params=not args.no_parse,
134134
max_async_tasks=args.max_async_tasks,
135135
max_prefetch=args.max_prefetch,
136-
max_idle_tasks=args.max_idle_tasks,
136+
max_sleeping_tasks=args.max_sleeping_tasks,
137137
propagate_exceptions=not args.no_propagate_errors,
138138
**receiver_args,
139139
)

taskiq/context.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ def __init__(
1414
self,
1515
message: TaskiqMessage,
1616
broker: AsyncBroker,
17-
task_idler: Callable[[float], Awaitable[None]],
17+
sleep: Callable[[float], Awaitable[None]],
1818
) -> None:
1919
self.message = message
2020
self.broker = broker
2121
self.state: "TaskiqState" = None # type: ignore
2222
self.state = broker.state
23-
self.task_idler = task_idler
23+
self.sleep = sleep

taskiq/receiver/receiver.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from taskiq.message import TaskiqMessage
1616
from taskiq.receiver.params_parser import parse_params
1717
from taskiq.result import TaskiqResult
18-
from taskiq.semaphore import DequeSemaphore
18+
from taskiq.semaphore import PrioritySemaphore
1919
from taskiq.state import TaskiqState
2020
from taskiq.utils import DequeQueue, maybe_awaitable
2121

@@ -48,7 +48,7 @@ def __init__( # noqa: WPS211
4848
validate_params: bool = True,
4949
max_async_tasks: "Optional[int]" = None,
5050
max_prefetch: int = 0,
51-
max_idle_tasks: Optional[int] = None,
51+
max_sleeping_tasks: Optional[int] = None,
5252
propagate_exceptions: bool = True,
5353
) -> None:
5454
self.broker = broker
@@ -62,22 +62,22 @@ def __init__( # noqa: WPS211
6262
self.task_signatures[task.task_name] = inspect.signature(task.original_func)
6363
self.task_hints[task.task_name] = get_type_hints(task.original_func)
6464
self.dependency_graphs[task.task_name] = DependencyGraph(task.original_func)
65-
self.sem: "Optional[DequeSemaphore]" = None
65+
self.sem: "Optional[PrioritySemaphore]" = None
6666
if max_async_tasks is not None and max_async_tasks > 0:
67-
self.sem = DequeSemaphore(max_async_tasks)
67+
self.sem = PrioritySemaphore(max_async_tasks)
6868
else:
6969
logger.warning(
7070
"Setting unlimited number of async tasks "
7171
+ "can result in undefined behavior",
7272
)
73-
self.sem_prefetch = DequeSemaphore(max_prefetch)
73+
self.sem_prefetch = PrioritySemaphore(max_prefetch)
7474
self.queue: DequeQueue[bytes] = DequeQueue()
7575

76-
self.sem_idle: Optional[asyncio.Semaphore] = None
77-
if max_idle_tasks is not None and max_idle_tasks <= 0:
76+
self.sem_sleeping: Optional[asyncio.Semaphore] = None
77+
if max_sleeping_tasks is not None and max_sleeping_tasks <= 0:
7878
raise ValueError("`max_idle_tasks` should be greater then zero or None.")
79-
if max_idle_tasks is not None and max_idle_tasks > 0:
80-
self.sem_idle = asyncio.Semaphore(max_idle_tasks)
79+
if max_sleeping_tasks is not None and max_sleeping_tasks > 0:
80+
self.sem_sleeping = asyncio.Semaphore(max_sleeping_tasks)
8181

8282
async def callback( # noqa: C901, WPS213
8383
self,
@@ -190,7 +190,7 @@ async def run_task( # noqa: C901, WPS210
190190
broker_ctx = self.broker.custom_dependency_context
191191
broker_ctx.update(
192192
{
193-
Context: Context(message, self.broker, self.task_idler),
193+
Context: Context(message, self.broker, self.task_sleeper),
194194
TaskiqState: self.broker.state,
195195
},
196196
)
@@ -344,24 +344,26 @@ def task_cb(task: "asyncio.Task[Any]") -> None:
344344
# https://textual.textualize.io/blog/2023/02/11/the-heisenbug-lurking-in-your-async-code/
345345
task.add_done_callback(task_cb)
346346

347-
async def task_idler(self, wait: float) -> None: # noqa: WPS213, WPS217
347+
async def task_sleeper(self, wait: float) -> None: # noqa: WPS213, WPS217
348348
"""
349-
Temporary increasing `max_async_tasks` for at least `wait` amount of time.
349+
Non-blocking sleep for running tasks.
350+
351+
Temporary increasing `max_async_tasks` for at least `wait` amount of time
350352
351353
:param wait: time
352354
"""
353355
if not self.sem:
354356
await asyncio.sleep(wait)
355357
return
356358

357-
if not self.sem_idle:
358-
logger.warning("`max_idle_tasks` is undefined. Idle is unavailable.")
359+
if not self.sem_sleeping:
360+
logger.warning("`max_sleeping_tasks` is undefined. Sleep is unavailable.")
359361
await asyncio.sleep(wait)
360362
return
361363

362364
start_time = time()
363365
with anyio.move_on_after(wait) as scope:
364-
await self.sem_idle.acquire()
366+
await self.sem_sleeping.acquire()
365367

366368
if scope.cancel_called: # noqa: WPS441
367369
return
@@ -381,4 +383,4 @@ async def task_idler(self, wait: float) -> None: # noqa: WPS213, WPS217
381383
await task
382384

383385
finally:
384-
self.sem_idle.release()
386+
self.sem_sleeping.release()

taskiq/semaphore.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
from typing_extensions import Literal
66

77

8-
class DequeSemaphore:
8+
class PrioritySemaphore:
99
"""
10-
Custom deque based semaphore.
10+
Custom semaphore with prioerities.
1111
1212
https://neopythonic.blogspot.com/2022/10/reasoning-about-asynciosemaphore.html
1313
"""

tests/cli/worker/test_receiver.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def get_receiver(
3434
broker: Optional[AsyncBroker] = None,
3535
no_parse: bool = False,
3636
max_async_tasks: Optional[int] = None,
37-
max_idle_tasks: Optional[int] = None,
37+
max_sleeping_tasks: Optional[int] = None,
3838
) -> Receiver:
3939
"""
4040
Returns receiver with custom broker and args.
@@ -51,7 +51,7 @@ def get_receiver(
5151
executor=ThreadPoolExecutor(max_workers=10),
5252
validate_params=not no_parse,
5353
max_async_tasks=max_async_tasks,
54-
max_idle_tasks=max_idle_tasks,
54+
max_sleeping_tasks=max_sleeping_tasks,
5555
)
5656

5757

@@ -333,13 +333,13 @@ async def task_add_one(val: int) -> int:
333333
@broker.task
334334
async def task_map(vals: List[int], ctx: Context = Depends()) -> List[int]:
335335
tasks = [await task_add_one.kiq(val) for val in vals]
336-
await ctx.task_idler(0.5)
336+
await ctx.sleep(0.5)
337337
resps_tasks = [asyncio.create_task(t.wait_result(timeout=1)) for t in tasks]
338338
resps = await asyncio.gather(*resps_tasks)
339339
res = [r.return_value for r in resps]
340340
return res
341341

342-
receiver = get_receiver(broker, max_async_tasks=1, max_idle_tasks=1)
342+
receiver = get_receiver(broker, max_async_tasks=1, max_sleeping_tasks=1)
343343
listen_task = asyncio.create_task(receiver.listen())
344344

345345
task = await task_map.kiq(list(range(0, 10)))
@@ -349,7 +349,7 @@ async def task_map(vals: List[int], ctx: Context = Depends()) -> List[int]:
349349
await broker.shutdown()
350350
await listen_task
351351

352-
assert receiver.sem_idle._value == 1 # type: ignore
352+
assert receiver.sem_sleeping._value == 1 # type: ignore
353353
assert receiver.sem._value == 1 # type: ignore
354354

355355

@@ -376,14 +376,14 @@ async def wait_for_task(
376376
resp_task = asyncio.create_task(
377377
task.wait_result(interval * 0.4, timeout=interval),
378378
)
379-
await ctx.task_idler(interval)
379+
await ctx.sleep(interval)
380380

381381
try:
382382
return await resp_task
383383
except TaskiqResultTimeoutError:
384384
continue
385385

386-
receiver = get_receiver(broker, max_async_tasks=1, max_idle_tasks=10)
386+
receiver = get_receiver(broker, max_async_tasks=1, max_sleeping_tasks=10)
387387
listen_task = asyncio.create_task(receiver.listen())
388388

389389
task = await task_run.kiq(10, "hello world!")
@@ -393,7 +393,7 @@ async def wait_for_task(
393393
await broker.shutdown()
394394
await listen_task
395395

396-
assert receiver.sem_idle._value == 10 # type: ignore
396+
assert receiver.sem_sleeping._value == 10 # type: ignore
397397
assert receiver.sem._value == 1 # type: ignore
398398

399399

@@ -404,10 +404,10 @@ async def test_tasks_sleep() -> None:
404404

405405
@broker.task
406406
async def task_run(ind: int, ctx: Context = Depends()) -> int:
407-
await ctx.task_idler(0.1)
407+
await ctx.sleep(0.1)
408408
return ind
409409

410-
receiver = get_receiver(broker, max_async_tasks=1, max_idle_tasks=20)
410+
receiver = get_receiver(broker, max_async_tasks=1, max_sleeping_tasks=20)
411411
listen_task = asyncio.create_task(receiver.listen())
412412

413413
with anyio.fail_after(1):
@@ -423,10 +423,11 @@ async def task_run(ind: int, ctx: Context = Depends()) -> int:
423423
await broker.shutdown()
424424
await listen_task
425425

426-
assert receiver.sem_idle._value == 20 # type: ignore
426+
assert receiver.sem_sleeping._value == 20 # type: ignore
427427
assert receiver.sem._value == 1 # type: ignore
428428

429429

430+
@pytest.mark.anyio
430431
async def test_no_result_error() -> None:
431432
broker = InMemoryBroker()
432433
executed = asyncio.Event()

tests/test_semaphore.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,18 @@
33
import anyio
44
import pytest
55

6-
from taskiq.semaphore import DequeSemaphore
6+
from taskiq.semaphore import PrioritySemaphore
77

88

99
@pytest.mark.anyio
1010
async def test_semaphore_exception() -> None:
1111
with pytest.raises(ValueError):
12-
DequeSemaphore(-1)
12+
PrioritySemaphore(-1)
1313

1414

1515
@pytest.mark.anyio
1616
async def test_semaphore() -> None:
17-
sem = DequeSemaphore(1)
17+
sem = PrioritySemaphore(1)
1818

1919
async def c1() -> None:
2020
await sem.acquire()

0 commit comments

Comments
 (0)