Skip to content

Commit 3d41144

Browse files
authored
Merge pull request #7 from taskiq-python/feature/post-save
Middleware now react on post-save hook.
2 parents 2a6ae6e + 77f94ef commit 3d41144

File tree

6 files changed

+21
-10
lines changed

6 files changed

+21
-10
lines changed

poetry.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ authors = ["Pavel Kirilin <[email protected]>"]
66

77
[tool.poetry.dependencies]
88
python = "^3.7"
9-
taskiq = "^0"
9+
taskiq = ">=0.0.8, <1"
1010
typing-extensions = "^4.3.0"
1111
pydantic = "^1.6.2"
1212

@@ -22,7 +22,7 @@ isort = "^5.10.1"
2222
yesqa = "^1.4.0"
2323
wemake-python-styleguide = "^0.16.1"
2424
mypy = "^0.971"
25-
pytest-xdist = {version = "^2.5.0", extras = ["psutil"]}
25+
pytest-xdist = { version = "^2.5.0", extras = ["psutil"] }
2626

2727
[tool.mypy]
2828
strict = true

taskiq_pipelines/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,11 @@
11
"""Pipelines for taskiq tasks."""
2+
from taskiq_pipelines.exceptions import AbortPipeline, PipelineError
3+
from taskiq_pipelines.middleware import PipelineMiddleware
4+
from taskiq_pipelines.pipeliner import Pipeline
5+
6+
__all__ = [
7+
"Pipeline",
8+
"PipelineError",
9+
"AbortPipeline",
10+
"PipelineMiddleware",
11+
]

taskiq_pipelines/middleware.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
class PipelineMiddleware(TaskiqMiddleware):
1616
"""Pipeline middleware."""
1717

18-
async def post_execute( # noqa: C901, WPS212
18+
async def post_save( # noqa: C901, WPS212
1919
self,
2020
message: "TaskiqMessage",
2121
result: "TaskiqResult[Any]",

taskiq_pipelines/steps/filter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ async def filter_tasks( # noqa: C901, WPS210, WPS231
5050
tasks_set.remove(task_id)
5151
except LookupError:
5252
continue
53-
await asyncio.sleep(check_interval)
53+
if tasks_set:
54+
await asyncio.sleep(check_interval)
5455

5556
results = await context.broker.result_backend.get_result(parent_task_id)
5657
filtered_results = []
@@ -135,7 +136,6 @@ async def act(
135136
else:
136137
task = await kicker.kiq(item, **self.additional_kwargs)
137138
sub_task_ids.append(task.task_id)
138-
139139
await filter_tasks.kicker().with_task_id(task_id).with_broker(
140140
broker,
141141
).with_labels(

taskiq_pipelines/steps/mapper.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ async def wait_tasks( # noqa: C901, WPS231
5050
tasks_set.remove(task_id)
5151
except LookupError:
5252
continue
53-
await asyncio.sleep(check_interval)
53+
if tasks_set:
54+
await asyncio.sleep(check_interval)
5455

5556
results = []
5657
for task_id in ordered_ids: # noqa: WPS440

0 commit comments

Comments
 (0)