Skip to content

Commit a1daa65

Browse files
authored
asyncio.wait replaced with Semaphore, fixed compatibility issues (#93)
1 parent 6a38f33 commit a1daa65

File tree

14 files changed

+133
-84
lines changed

14 files changed

+133
-84
lines changed

.flake8

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[flake8]
2-
max-complexity = 6
2+
max-complexity = 7
33
inline-quotes = double
44
max-line-length = 88
55
extend-ignore = E203
@@ -90,6 +90,8 @@ ignore =
9090
N802,
9191
; Do not perform function calls in argument defaults.
9292
B008,
93+
; Found too many public instance attributes
94+
WPS230,
9395

9496
; all init files
9597
__init__.py:

taskiq/__main__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from taskiq.abc.cmd import TaskiqCMD
88

99

10-
def main() -> None: # noqa: C901, WPS210 # pragma: no cover
10+
def main() -> None: # noqa: WPS210 # pragma: no cover
1111
"""
1212
Main entrypoint of the taskiq.
1313

taskiq/abc/broker.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from taskiq.message import BrokerMessage
3131
from taskiq.result_backends.dummy import DummyResultBackend
3232
from taskiq.state import TaskiqState
33-
from taskiq.utils import maybe_awaitable
33+
from taskiq.utils import maybe_awaitable, remove_suffix
3434

3535
if TYPE_CHECKING: # pragma: no cover
3636
from taskiq.abc.formatter import TaskiqFormatter
@@ -57,7 +57,7 @@ def default_id_generator() -> str:
5757
return uuid4().hex
5858

5959

60-
class AsyncBroker(ABC): # noqa: WPS230
60+
class AsyncBroker(ABC):
6161
"""
6262
Async broker.
6363
@@ -235,11 +235,7 @@ def inner(
235235
fmodule = func.__module__
236236
if fmodule == "__main__": # pragma: no cover
237237
fmodule = ".".join(
238-
sys.argv[0]
239-
.removesuffix(
240-
".py",
241-
)
242-
.split(
238+
remove_suffix(sys.argv[0], ".py").split(
243239
os.path.sep,
244240
),
245241
)

taskiq/cli/utils.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from pathlib import Path
77
from typing import Any, Generator, List
88

9+
from taskiq.utils import remove_suffix
10+
911
logger = getLogger("taskiq.worker")
1012

1113

@@ -81,6 +83,8 @@ def import_tasks(modules: List[str], pattern: str, fs_discover: bool) -> None:
8183
"""
8284
if fs_discover:
8385
for path in Path(".").rglob(pattern):
84-
modules.append(str(path).removesuffix(".py").replace(os.path.sep, "."))
86+
modules.append(
87+
remove_suffix(str(path), ".py").replace(os.path.sep, "."),
88+
)
8589

8690
import_from_modules(modules)

taskiq/cli/watcher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def __init__(
2424
self.gitignore = parse_gitignore(gpath)
2525
self.callback_kwargs = callback_kwargs
2626

27-
def dispatch(self, event: FileSystemEvent) -> None: # noqa: C901
27+
def dispatch(self, event: FileSystemEvent) -> None:
2828
"""
2929
React to event.
3030

taskiq/cli/worker/args.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class WorkerArgs:
2323
shutdown_timeout: float = 5
2424
reload: bool = False
2525
no_gitignore: bool = False
26-
max_async_tasks: int = 100
26+
max_async_tasks: int = 10
2727

2828
@classmethod
2929
def from_cli( # noqa: WPS213
@@ -128,9 +128,8 @@ def from_cli( # noqa: WPS213
128128
"--max-async-tasks",
129129
type=int,
130130
dest="max_async_tasks",
131-
default=100,
132-
help="Maximum simultaneous async tasks per worker process. "
133-
+ "Infinite if less than 1",
131+
default=10,
132+
help="Maximum simultaneous async tasks per worker process. ",
134133
)
135134

136135
namespace = parser.parse_args(args)

taskiq/cli/worker/async_task_runner.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,4 @@ async def async_listen_messages(
3939
# and it considered to be Hisenbug.
4040
# https://textual.textualize.io/blog/2023/02/11/the-heisenbug-lurking-in-your-async-code/
4141
task.add_done_callback(tasks.discard)
42-
43-
# If we have finite number of maximum simultanious tasks,
44-
# we await them when we reached the limit.
45-
# But we don't await all of them, we await only first completed task,
46-
# and then continue.
47-
if 1 <= cli_args.max_async_tasks <= len(tasks):
48-
_, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
49-
tasks = pending
42+
logger.debug("Received {0} tasks".format(len(tasks)))

taskiq/cli/worker/receiver.py

Lines changed: 51 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def __init__(self, broker: AsyncBroker, cli_args: WorkerArgs) -> None:
5050
self.executor = ThreadPoolExecutor(
5151
max_workers=cli_args.max_threadpool_threads,
5252
)
53+
self.sem = asyncio.Semaphore(cli_args.max_async_tasks)
5354

5455
async def callback( # noqa: C901, WPS213
5556
self,
@@ -68,61 +69,62 @@ async def callback( # noqa: C901, WPS213
6869
:param raise_err: raise an error if cannot save result in
6970
result_backend.
7071
"""
71-
logger.debug(f"Received message: {message}")
72-
if message.task_name not in self.broker.available_tasks:
73-
logger.warning(
74-
'task "%s" is not found. Maybe you forgot to import it?',
72+
async with self.sem:
73+
logger.debug(f"Received message: {message}")
74+
if message.task_name not in self.broker.available_tasks:
75+
logger.warning(
76+
'task "%s" is not found. Maybe you forgot to import it?',
77+
message.task_name,
78+
)
79+
return
80+
logger.debug(
81+
"Function for task %s is resolved. Executing...",
7582
message.task_name,
7683
)
77-
return
78-
logger.debug(
79-
"Function for task %s is resolved. Executing...",
80-
message.task_name,
81-
)
82-
try:
83-
taskiq_msg = self.broker.formatter.loads(message=message)
84-
except Exception as exc:
85-
logger.warning(
86-
"Cannot parse message: %s. Skipping execution.\n %s",
87-
message,
88-
exc,
89-
exc_info=True,
90-
)
91-
return
92-
for middleware in self.broker.middlewares:
93-
if middleware.__class__.pre_execute != TaskiqMiddleware.pre_execute:
94-
taskiq_msg = await maybe_awaitable(
95-
middleware.pre_execute(
96-
taskiq_msg,
97-
),
84+
try:
85+
taskiq_msg = self.broker.formatter.loads(message=message)
86+
except Exception as exc:
87+
logger.warning(
88+
"Cannot parse message: %s. Skipping execution.\n %s",
89+
message,
90+
exc,
91+
exc_info=True,
9892
)
93+
return
94+
for middleware in self.broker.middlewares:
95+
if middleware.__class__.pre_execute != TaskiqMiddleware.pre_execute:
96+
taskiq_msg = await maybe_awaitable(
97+
middleware.pre_execute(
98+
taskiq_msg,
99+
),
100+
)
99101

100-
logger.info(
101-
"Executing task %s with ID: %s",
102-
taskiq_msg.task_name,
103-
taskiq_msg.task_id,
104-
)
105-
result = await self.run_task(
106-
target=self.broker.available_tasks[message.task_name].original_func,
107-
message=taskiq_msg,
108-
)
109-
for middleware in self.broker.middlewares:
110-
if middleware.__class__.post_execute != TaskiqMiddleware.post_execute:
111-
await maybe_awaitable(middleware.post_execute(taskiq_msg, result))
112-
try:
113-
await self.broker.result_backend.set_result(message.task_id, result)
114-
except Exception as exc:
115-
logger.exception(
116-
"Can't set result in result backend. Cause: %s",
117-
exc,
118-
exc_info=True,
102+
logger.info(
103+
"Executing task %s with ID: %s",
104+
taskiq_msg.task_name,
105+
taskiq_msg.task_id,
106+
)
107+
result = await self.run_task(
108+
target=self.broker.available_tasks[message.task_name].original_func,
109+
message=taskiq_msg,
119110
)
120-
if raise_err:
121-
raise exc
111+
for middleware in self.broker.middlewares:
112+
if middleware.__class__.post_execute != TaskiqMiddleware.post_execute:
113+
await maybe_awaitable(middleware.post_execute(taskiq_msg, result))
114+
try:
115+
await self.broker.result_backend.set_result(message.task_id, result)
116+
except Exception as exc:
117+
logger.exception(
118+
"Can't set result in result backend. Cause: %s",
119+
exc,
120+
exc_info=True,
121+
)
122+
if raise_err:
123+
raise exc
122124

123-
for middleware in self.broker.middlewares:
124-
if middleware.__class__.post_save != TaskiqMiddleware.post_save:
125-
await maybe_awaitable(middleware.post_save(taskiq_msg, result))
125+
for middleware in self.broker.middlewares:
126+
if middleware.__class__.post_save != TaskiqMiddleware.post_save:
127+
await maybe_awaitable(middleware.post_save(taskiq_msg, result))
126128

127129
async def run_task( # noqa: C901, WPS210
128130
self,

taskiq/cli/worker/run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ async def shutdown_broker(broker: AsyncBroker, timeout: float) -> None:
4747
)
4848

4949

50-
def start_listen(args: WorkerArgs) -> None: # noqa: C901, WPS213
50+
def start_listen(args: WorkerArgs) -> None: # noqa: WPS213
5151
"""
5252
This function starts actual listening process.
5353

taskiq/decor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,15 @@ def __call__( # noqa: D102
6060
return self.original_func(*args, **kwargs)
6161

6262
@overload
63-
async def kiq( # noqa: D102
63+
async def kiq(
6464
self: "AsyncTaskiqDecoratedTask[_FuncParams, Coroutine[Any, Any, _T]]",
6565
*args: _FuncParams.args,
6666
**kwargs: _FuncParams.kwargs,
6767
) -> AsyncTaskiqTask[_T]:
6868
...
6969

7070
@overload
71-
async def kiq( # noqa: D102
71+
async def kiq(
7272
self: "AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]",
7373
*args: _FuncParams.args,
7474
**kwargs: _FuncParams.kwargs,

0 commit comments

Comments
 (0)