|
| 1 | +import asyncio |
| 2 | +from collections import defaultdict |
| 3 | +from typing import Callable, Dict, List, Any |
| 4 | + |
| 5 | +from src.common.utils import get_logger |
| 6 | + |
| 7 | +logger = get_logger("EventBus") |
| 8 | + |
| 9 | + |
| 10 | +class EventBus: |
| 11 | + """ |
| 12 | + A simple in-memory asynchronous event bus that provides: |
| 13 | + - publish(topic, message) |
| 14 | + - subscribe(topic, callback) |
| 15 | +
|
| 16 | + In production this can be replaced with Kafka, Redis Streams, |
| 17 | + NATS, RabbitMQ, AWS SNS/SQS, GCP PubSub, etc. |
| 18 | + """ |
| 19 | + |
| 20 | + def __init__(self): |
| 21 | + # topic → list of subscriber callback functions |
| 22 | + self.subscribers: Dict[str, List[Callable]] = defaultdict(list) |
| 23 | + # lock to avoid race conditions when adding/removing subscribers |
| 24 | + self.lock = asyncio.Lock() |
| 25 | + |
| 26 | + async def publish(self, topic: str, message: Any): |
| 27 | + """ |
| 28 | + Publish an event to a topic. |
| 29 | + Invokes all subscribers concurrently. |
| 30 | + """ |
| 31 | + if topic not in self.subscribers: |
| 32 | + logger.warning(f"[EVENT BUS] No subscribers for topic '{topic}'.") |
| 33 | + return |
| 34 | + |
| 35 | + logger.info(f"[EVENT BUS] Publishing to {topic}: {message}") |
| 36 | + |
| 37 | + callbacks = self.subscribers[topic] |
| 38 | + |
| 39 | + # Run all subscribers concurrently |
| 40 | + await asyncio.gather(*[ |
| 41 | + callback(message) for callback in callbacks |
| 42 | + ]) |
| 43 | + |
| 44 | + async def subscribe(self, topic: str, callback: Callable): |
| 45 | + """ |
| 46 | + Register a subscriber callback for a topic. |
| 47 | + """ |
| 48 | + async with self.lock: |
| 49 | + self.subscribers[topic].append(callback) |
| 50 | + logger.info(f"[EVENT BUS] Subscriber added for topic '{topic}'.") |
0 commit comments