Skip to content

Commit fb58c11

Browse files
Sobes76rusAnton
andauthored
feat: receiver cli args (#124)
Co-authored-by: Anton <[email protected]>
1 parent a764ef4 commit fb58c11

File tree

2 files changed

+30
-2
lines changed

2 files changed

+30
-2
lines changed

taskiq/cli/worker/args.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,24 @@
11
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
2-
from dataclasses import dataclass
3-
from typing import List, Optional, Sequence
2+
from dataclasses import dataclass, field
3+
from typing import List, Optional, Sequence, Tuple
44

55
from taskiq.cli.common_args import LogLevel
66

77

8+
def receiver_arg_type(string: str) -> Tuple[str, str]:
9+
"""
10+
Parse cli --receiver_arg argument value.
11+
12+
:param string: cli argument value in format key=value.
13+
:raises ValueError: if value not in format.
14+
:return: (key, value) pair.
15+
"""
16+
args = string.split("=", 1)
17+
if len(args) != 2:
18+
raise ValueError(f"Invalid value: {string}")
19+
return args[0], args[1]
20+
21+
822
@dataclass
923
class WorkerArgs:
1024
"""Taskiq worker CLI arguments."""
@@ -25,6 +39,7 @@ class WorkerArgs:
2539
no_gitignore: bool = False
2640
max_async_tasks: int = 100
2741
receiver: str = "taskiq.receiver:Receiver"
42+
receiver_arg: List[Tuple[str, str]] = field(default_factory=list)
2843

2944
@classmethod
3045
def from_cli( # noqa: WPS213
@@ -55,6 +70,17 @@ def from_cli( # noqa: WPS213
5570
"'module.module:variable' format."
5671
),
5772
)
73+
parser.add_argument(
74+
"--receiver_arg",
75+
action="append",
76+
type=receiver_arg_type,
77+
default=[],
78+
help=(
79+
"List of args fot receiver. "
80+
"This string must be specified in "
81+
"`key=value` format."
82+
),
83+
)
5884
parser.add_argument(
5985
"--tasks-pattern",
6086
"-tp",

taskiq/cli/worker/run.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ def start_listen(args: WorkerArgs) -> None: # noqa: WPS210, WPS213
9393
raise ValueError("Unknown broker type. Please use AsyncBroker instance.")
9494

9595
receiver_type = get_receiver_type(args)
96+
receiver_args = dict(args.receiver_arg)
9697

9798
# Here how we manage interruptions.
9899
# We have to remember shutting_down state,
@@ -131,6 +132,7 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
131132
executor=pool,
132133
validate_params=not args.no_parse,
133134
max_async_tasks=args.max_async_tasks,
135+
**receiver_args,
134136
)
135137
loop.run_until_complete(receiver.listen())
136138
except KeyboardInterrupt:

0 commit comments

Comments
 (0)