Skip to content

Commit 4390951

Browse files
committed
feat: implement asynchronous Kafka and Redis pub/sub mechanisms with message handling
1 parent 4a485db commit 4390951

File tree

3 files changed

+159
-0
lines changed

3 files changed

+159
-0
lines changed

src/modules/transporter/kafka.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import asyncio
2+
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
3+
from functools import wraps
4+
from typing import Callable, Dict
5+
6+
7+
class AsyncKafkaPubSub:
8+
def __init__(self, bootstrap_servers="ubuntu:9092", group_id="default-py-group"):
9+
self.bootstrap_servers = bootstrap_servers
10+
self.group_id = group_id
11+
self.handlers: Dict[str, Callable[[str], None]] = {}
12+
self._consumer_started = False
13+
self._lock = asyncio.Lock()
14+
self._producer = None
15+
16+
def subscribe(self, topic: str):
17+
print(f"Subscribing to topic: {topic}")
18+
def decorator(func: Callable[[str], None]):
19+
self.handlers[topic] = func
20+
asyncio.create_task(self._ensure_consumer())
21+
return func
22+
return decorator
23+
24+
async def publish(self, topic: str, message: str):
25+
if not self._producer:
26+
self._producer = AIOKafkaProducer(bootstrap_servers=self.bootstrap_servers)
27+
await self._producer.start()
28+
await self._producer.send_and_wait(topic, message.encode())
29+
30+
async def _ensure_consumer(self):
31+
async with self._lock:
32+
if self._consumer_started:
33+
return
34+
self._consumer_started = True
35+
await self._start_consumer()
36+
37+
async def _start_consumer(self):
38+
consumer = AIOKafkaConsumer(
39+
*self.handlers.keys(),
40+
bootstrap_servers=self.bootstrap_servers,
41+
group_id=self.group_id,
42+
auto_offset_reset='latest',
43+
)
44+
await consumer.start()
45+
print(f"Subscribed to: {', '.join(self.handlers.keys())}")
46+
47+
async def listen():
48+
try:
49+
async for msg in consumer:
50+
print(f"Received message on topic '{msg.topic}': {msg.value.decode()}")
51+
topic = msg.topic
52+
data = msg.value.decode()
53+
handler = self.handlers.get(topic)
54+
if handler:
55+
await self._maybe_async(handler, data)
56+
finally:
57+
await consumer.stop()
58+
59+
asyncio.create_task(listen())
60+
61+
async def _maybe_async(self, func: Callable, *args, **kwargs):
62+
result = func(*args, **kwargs)
63+
if asyncio.iscoroutine(result):
64+
await result
65+
66+
67+
kafka_pubsub = AsyncKafkaPubSub()
68+
69+
70+
@kafka_pubsub.subscribe("chat")
71+
def handle_chat(msg):
72+
print(f"[chat kafka] {msg}")
73+

src/modules/transporter/rabbitmq.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import threading
33

44
credentials = pika.PlainCredentials('guest', 'guest')
5+
56
parameters = pika.ConnectionParameters(
67
host='ubuntu',
78
port=5672, # default RabbitMQ port
@@ -39,6 +40,12 @@ def process_message(msg):
3940
print(f"[x] Received: {msg}")
4041

4142

43+
@queue_listener("test")
44+
def test_queue_listener(msg):
45+
# This function is just for testing the queue listener
46+
print(f"[test] Received message: {msg}")
47+
48+
4249
# publish a message to the queue
4350
def publish_message(queue_name, message):
4451
connection = pika.BlockingConnection(parameters)
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
2+
import redis
3+
4+
# Connect to Redis
5+
r = redis.Redis(host='ubuntu', port=6379, decode_responses=True)
6+
7+
# Set a value
8+
r.set('foo', 'bar')
9+
10+
# Get a value
11+
value = r.get('foo')
12+
print(value) # Output: bar
13+
14+
15+
print("Redis client is running...", r.ping())
16+
17+
18+
import asyncio
19+
from redis.asyncio import Redis
20+
from functools import wraps
21+
from typing import Callable, Dict
22+
23+
24+
class AsyncRedisPubSub:
25+
def __init__(self, host="ubuntu", port=6379, db=0):
26+
self.redis = Redis(host=host, port=port, db=db)
27+
self.channel_handlers: Dict[str, Callable[[str], None]] = {}
28+
self._subscriber_started = False
29+
self._lock = asyncio.Lock()
30+
31+
def subscribe(self, channel: str):
32+
def decorator(func: Callable[[str], None]):
33+
self.channel_handlers[channel] = func
34+
asyncio.create_task(self._ensure_subscriber())
35+
return func
36+
return decorator
37+
38+
async def publish(self, channel: str, message: str):
39+
await self.redis.publish(channel, message)
40+
41+
async def _ensure_subscriber(self):
42+
async with self._lock:
43+
if self._subscriber_started:
44+
return
45+
self._subscriber_started = True
46+
await self._start_subscriber()
47+
48+
async def _start_subscriber(self):
49+
pubsub = self.redis.pubsub()
50+
await pubsub.subscribe(*self.channel_handlers.keys())
51+
print(f"Subscribed to: {', '.join(self.channel_handlers.keys())}")
52+
53+
async def listen():
54+
async for message in pubsub.listen():
55+
if message["type"] != "message":
56+
continue
57+
channel = message["channel"].decode()
58+
data = message["data"].decode()
59+
handler = self.channel_handlers.get(channel)
60+
if handler:
61+
await self._maybe_async(handler, data)
62+
63+
asyncio.create_task(listen())
64+
65+
async def _maybe_async(self, func: Callable, *args, **kwargs):
66+
result = func(*args, **kwargs)
67+
if asyncio.iscoroutine(result):
68+
await result
69+
70+
71+
pubsub = AsyncRedisPubSub()
72+
73+
@pubsub.subscribe("chat")
74+
def handle_chat(msg):
75+
print(f"[chat redis] Received: {msg}")
76+
77+
@pubsub.subscribe("news")
78+
def handle_news(msg):
79+
print(f"[news] Breaking: {msg}")

0 commit comments

Comments
 (0)