-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathredis_bus.py
More file actions
36 lines (29 loc) · 1.05 KB
/
redis_bus.py
File metadata and controls
36 lines (29 loc) · 1.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import json
import asyncio
import aioredis
from .base import EventBus
class RedisEventBus(EventBus):
"""
Redis Streams based implementation.
Great for lightweight event-driven pipelines.
"""
def __init__(self, redis_url="redis://localhost:6379"):
self.redis_url = redis_url
self.redis = None
async def connect(self):
self.redis = await aioredis.from_url(self.redis_url, decode_responses=True)
async def publish(self, topic: str, message: dict):
await self.redis.xadd(topic, {"data": json.dumps(message)})
async def subscribe(self, topic: str, handler):
last_id = "$"
while True:
streams = await self.redis.xread({topic: last_id}, timeout=5000)
if streams:
_, messages = streams[0]
for msg_id, fields in messages:
last_id = msg_id
data = json.loads(fields["data"])
await handler(data)
async def close(self):
if self.redis:
await self.redis.close()