Skip to content

Commit dfe7132

Browse files
authored
Merge pull request #16 from taskiq-python/feature/new-interface
2 parents 7272b57 + f7b752c commit dfe7132

File tree

6 files changed

+789
-816
lines changed

6 files changed

+789
-816
lines changed

poetry.lock

Lines changed: 781 additions & 803 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ keywords = ["taskiq", "tasks", "distributed", "async", "redis", "result_backend"
1919

2020
[tool.poetry.dependencies]
2121
python = "^3.7"
22-
taskiq = "^0"
22+
taskiq = "^0.2.0"
2323
redis = "^4.2.0"
2424

2525
[tool.poetry.dev-dependencies]

taskiq_redis/redis_broker.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
import asyncio
21
import pickle
32
from logging import getLogger
4-
from typing import Any, Callable, Coroutine, Optional, TypeVar
3+
from typing import Any, AsyncGenerator, Callable, Optional, TypeVar
54

65
from redis.asyncio import ConnectionPool, Redis
76
from taskiq.abc.broker import AsyncBroker
@@ -70,19 +69,15 @@ async def kick(self, message: BrokerMessage) -> None:
7069
pickle.dumps(message),
7170
)
7271

73-
async def listen(
74-
self,
75-
callback: Callable[[BrokerMessage], Coroutine[Any, Any, None]],
76-
) -> None:
72+
async def listen(self) -> AsyncGenerator[BrokerMessage, None]:
7773
"""
78-
Listen redis list for new messages.
74+
Listen redis queue for new messages.
7975
80-
This function listens to list calls callback on
81-
new messages.
76+
This function listens to the queue
77+
and yields new messages if they have BrokerMessage type.
8278
83-
:param callback: function to call on new message.
79+
:yields: broker messages.
8480
"""
85-
loop = asyncio.get_event_loop()
8681
async with Redis(connection_pool=self.connection_pool) as redis_conn:
8782
redis_pubsub_channel = redis_conn.pubsub()
8883
await redis_pubsub_channel.subscribe(self.redis_pubsub_channel)
@@ -93,7 +88,7 @@ async def listen(
9388
message["data"],
9489
)
9590
if isinstance(redis_message, BrokerMessage):
96-
loop.create_task(callback(redis_message))
91+
yield redis_message
9792
except (
9893
TypeError,
9994
AttributeError,
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)