-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathevent_bus.py
More file actions
50 lines (39 loc) · 1.53 KB
/
event_bus.py
File metadata and controls
50 lines (39 loc) · 1.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import asyncio
from collections import defaultdict
from typing import Callable, Dict, List, Any
from src.common.utils import get_logger
logger = get_logger("EventBus")
class EventBus:
"""
A simple in-memory asynchronous event bus that provides:
- publish(topic, message)
- subscribe(topic, callback)
In production this can be replaced with Kafka, Redis Streams,
NATS, RabbitMQ, AWS SNS/SQS, GCP PubSub, etc.
"""
def __init__(self):
# topic → list of subscriber callback functions
self.subscribers: Dict[str, List[Callable]] = defaultdict(list)
# lock to avoid race conditions when adding/removing subscribers
self.lock = asyncio.Lock()
async def publish(self, topic: str, message: Any):
"""
Publish an event to a topic.
Invokes all subscribers concurrently.
"""
if topic not in self.subscribers:
logger.warning(f"[EVENT BUS] No subscribers for topic '{topic}'.")
return
logger.info(f"[EVENT BUS] Publishing to {topic}: {message}")
callbacks = self.subscribers[topic]
# Run all subscribers concurrently
await asyncio.gather(*[
callback(message) for callback in callbacks
])
async def subscribe(self, topic: str, callback: Callable):
"""
Register a subscriber callback for a topic.
"""
async with self.lock:
self.subscribers[topic].append(callback)
logger.info(f"[EVENT BUS] Subscriber added for topic '{topic}'.")