Skip to content

Commit b079229

Browse files
authored
Replaced dependency graph with external library. (#57)
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent 331f282 commit b079229

File tree

8 files changed

+208
-759
lines changed

8 files changed

+208
-759
lines changed

poetry.lock

Lines changed: 197 additions & 165 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ mock = "^4.0.3"
5454
anyio = "^3.6.1"
5555
pytest-xdist = { version = "^2.5.0", extras = ["psutil"] }
5656
types-mock = "^4.0.15"
57+
taskiq_dependencies = "~1.0.0"
5758

5859
[tool.poetry.extras]
5960
zmq = ["pyzmq"]

taskiq/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
"""Distributed task manager."""
2+
from taskiq_dependencies import Depends as TaskiqDepends
3+
24
from taskiq.abc.broker import AsyncBroker, AsyncTaskiqDecoratedTask
35
from taskiq.abc.formatter import TaskiqFormatter
46
from taskiq.abc.middleware import TaskiqMiddleware
@@ -8,7 +10,6 @@
810
from taskiq.brokers.shared_broker import async_shared_broker
911
from taskiq.brokers.zmq_broker import ZeroMQBroker
1012
from taskiq.context import Context
11-
from taskiq.dependencies import TaskiqDepends
1213
from taskiq.events import TaskiqEvents
1314
from taskiq.exceptions import TaskiqError
1415
from taskiq.funcs import gather

taskiq/abc/schedule_source.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,17 @@
88
class ScheduleSource(ABC):
99
"""Abstract class for source of scheduled tasks."""
1010

11-
async def startup(self) -> None:
11+
async def startup(self) -> None: # noqa: B027
1212
"""Action to execute during startup."""
1313

14-
async def shutdown(self) -> None:
14+
async def shutdown(self) -> None: # noqa: B027
1515
"""Actions to execute during shutdown."""
1616

1717
@abstractmethod
1818
async def get_schedules(self) -> List["ScheduledTask"]:
1919
"""Get list of taskiq schedules."""
2020

21-
async def add_schedule(self, schedule: "ScheduledTask") -> None:
21+
async def add_schedule(self, schedule: "ScheduledTask") -> None: # noqa: B027
2222
"""
2323
Add a new schedule.
2424

taskiq/brokers/inmemory_broker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
from collections import OrderedDict
33
from typing import Any, Callable, Coroutine, Optional, TypeVar, get_type_hints
44

5+
from taskiq_dependencies import DependencyGraph
6+
57
from taskiq.abc.broker import AsyncBroker
68
from taskiq.abc.result_backend import AsyncResultBackend, TaskiqResult
79
from taskiq.cli.worker.args import WorkerArgs
810
from taskiq.cli.worker.receiver import Receiver
9-
from taskiq.dependencies import DependencyGraph
1011
from taskiq.events import TaskiqEvents
1112
from taskiq.exceptions import TaskiqError
1213
from taskiq.message import BrokerMessage

taskiq/cli/worker/receiver.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66
from time import time
77
from typing import Any, Callable, Dict, get_type_hints
88

9+
from taskiq_dependencies import DependencyGraph
10+
911
from taskiq.abc.broker import AsyncBroker
1012
from taskiq.abc.middleware import TaskiqMiddleware
1113
from taskiq.cli.worker.args import WorkerArgs
1214
from taskiq.cli.worker.log_collector import log_collector
1315
from taskiq.cli.worker.params_parser import parse_params
1416
from taskiq.context import Context
15-
from taskiq.dependencies import DependencyGraph
1617
from taskiq.message import BrokerMessage, TaskiqMessage
1718
from taskiq.result import TaskiqResult
1819
from taskiq.state import TaskiqState
@@ -164,7 +165,7 @@ async def run_task( # noqa: C901, WPS210
164165
dep_ctx = None
165166
if dependency_graph:
166167
# Create a context for dependency resolving.
167-
dep_ctx = dependency_graph.ctx(
168+
dep_ctx = dependency_graph.async_ctx(
168169
{
169170
Context: Context(message, self.broker),
170171
TaskiqState: self.broker.state,

0 commit comments

Comments
 (0)