Skip to content

Commit 1e4c16e

Browse files
committed
event bus fixes
1 parent 92fd5ad commit 1e4c16e

File tree

5 files changed

+210
-26
lines changed

5 files changed

+210
-26
lines changed

backend/app/services/event_bus.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,12 @@ class Subscription:
4040

4141
class EventBus(LifecycleEnabled):
4242
"""
43-
Hybrid event bus with Kafka backing and local in-memory distribution.
43+
Distributed event bus for cross-instance communication via Kafka.
44+
45+
Publishers send events to Kafka. Subscribers receive events from OTHER instances
46+
only - self-published messages are filtered out. This design means:
47+
- Publishers should update their own state directly before calling publish()
48+
- Handlers only run for events from other instances (cache invalidation, etc.)
4449
4550
Supports pattern-based subscriptions using wildcards:
4651
- execution.* - matches all execution events
@@ -60,6 +65,7 @@ def __init__(self, settings: Settings, logger: logging.Logger) -> None:
6065
self._consumer_task: Optional[asyncio.Task[None]] = None
6166
self._lock = asyncio.Lock()
6267
self._topic = f"{self.settings.KAFKA_TOPIC_PREFIX}{KafkaTopic.EVENT_BUS_STREAM}"
68+
self._instance_id = str(uuid4()) # Unique ID for filtering self-published messages
6369

6470
async def _on_start(self) -> None:
6571
"""Start the event bus with Kafka backing."""
@@ -117,32 +123,32 @@ async def _on_stop(self) -> None:
117123

118124
async def publish(self, event_type: str, data: dict[str, Any]) -> None:
119125
"""
120-
Publish an event to Kafka and local subscribers.
126+
Publish an event to Kafka for cross-instance distribution.
127+
128+
Local handlers receive events only from OTHER instances via the Kafka listener.
129+
Publishers should update their own state directly before calling publish().
121130
122131
Args:
123132
event_type: Event type (e.g., "execution.123.started")
124133
data: Event data payload
125134
"""
126135
event = self._create_event(event_type, data)
127136

128-
# Publish to Kafka for distributed handling
129137
if self.producer:
130138
try:
131-
# Serialize and send message asynchronously
132139
value = event.model_dump_json().encode("utf-8")
133140
key = event_type.encode("utf-8") if event_type else None
141+
headers = [("source_instance", self._instance_id.encode("utf-8"))]
134142

135143
await self.producer.send_and_wait(
136144
topic=self._topic,
137145
value=value,
138146
key=key,
147+
headers=headers,
139148
)
140149
except Exception as e:
141150
self.logger.error(f"Failed to publish to Kafka: {e}")
142151

143-
# Publish to local subscribers for immediate handling
144-
await self._distribute_event(event_type, event)
145-
146152
def _create_event(self, event_type: str, data: dict[str, Any]) -> EventBusEvent:
147153
"""Create a standardized event object."""
148154
return EventBusEvent(
@@ -251,7 +257,7 @@ async def _invoke_handler(self, handler: Callable[[EventBusEvent], Any], event:
251257
await asyncio.to_thread(handler, event)
252258

253259
async def _kafka_listener(self) -> None:
254-
"""Listen for Kafka messages and distribute to local subscribers."""
260+
"""Listen for Kafka messages from OTHER instances and distribute to local subscribers."""
255261
if not self.consumer:
256262
return
257263

@@ -260,19 +266,22 @@ async def _kafka_listener(self) -> None:
260266
try:
261267
while self.is_running:
262268
try:
263-
# Use getone() with timeout
264269
msg = await asyncio.wait_for(self.consumer.getone(), timeout=0.1)
265270

271+
# Skip messages from this instance - publisher handles its own state
272+
headers = dict(msg.headers) if msg.headers else {}
273+
source = headers.get("source_instance", b"").decode("utf-8")
274+
if source == self._instance_id:
275+
continue
276+
266277
try:
267-
# Deserialize message - Pydantic parses timestamp string to datetime
268278
event_dict = json.loads(msg.value.decode("utf-8"))
269279
event = EventBusEvent.model_validate(event_dict)
270280
await self._distribute_event(event.event_type, event)
271281
except Exception as e:
272282
self.logger.error(f"Error processing Kafka message: {e}")
273283

274284
except asyncio.TimeoutError:
275-
# No message available, continue polling
276285
continue
277286
except KafkaError as e:
278287
self.logger.error(f"Consumer error: {e}")

backend/app/services/user_settings_service.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import logging
22
from datetime import datetime, timedelta, timezone
33
from typing import Any
4-
from uuid import uuid4
54

65
from cachetools import TTLCache
76
from pydantic import TypeAdapter
@@ -39,7 +38,6 @@ def __init__(
3938
)
4039
self._event_bus_manager: EventBusManager | None = None
4140
self._subscription_id: str | None = None
42-
self._instance_id: str = str(uuid4())
4341

4442
self.logger.info(
4543
"UserSettingsService initialized",
@@ -56,14 +54,15 @@ async def get_user_settings(self, user_id: str) -> DomainUserSettings:
5654
return await self.get_user_settings_fresh(user_id)
5755

5856
async def initialize(self, event_bus_manager: EventBusManager) -> None:
59-
"""Subscribe to settings update events for cross-instance cache invalidation."""
57+
"""Subscribe to settings update events for cross-instance cache invalidation.
58+
59+
Note: EventBus filters out self-published messages, so this handler only
60+
runs for events from OTHER instances.
61+
"""
6062
self._event_bus_manager = event_bus_manager
6163
bus = await event_bus_manager.get_event_bus()
6264

6365
async def _handle(evt: EventBusEvent) -> None:
64-
# Skip events from this instance - we already updated our cache
65-
if evt.payload.get("source_instance") == self._instance_id:
66-
return
6766
uid = evt.payload.get("user_id")
6867
if uid:
6968
await self.invalidate_cache(str(uid))
@@ -111,7 +110,7 @@ async def update_user_settings(
111110

112111
if self._event_bus_manager is not None:
113112
bus = await self._event_bus_manager.get_event_bus()
114-
await bus.publish("user.settings.updated", {"user_id": user_id, "source_instance": self._instance_id})
113+
await bus.publish("user.settings.updated", {"user_id": user_id})
115114

116115
self._add_to_cache(user_id, new_settings)
117116
if (await self.repository.count_events_since_snapshot(user_id)) >= 10:

docs/architecture/event-bus.md

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
# Event bus
2+
3+
This document explains how the EventBus provides cross-instance communication for services that need to react to changes
4+
happening on other instances. If you've wondered how cache invalidation works across multiple backend replicas, this is
5+
where that question gets answered.
6+
7+
## The problem
8+
9+
When running multiple instances of the backend (horizontal scaling), each instance has its own in-memory cache. When
10+
Instance A updates a user's settings, Instance B's cache becomes stale. Without coordination, Instance B would return
11+
outdated data until its cache TTL expires.
12+
13+
```mermaid
14+
graph LR
15+
subgraph "Instance A"
16+
A1[Update settings] --> A2[Update local cache]
17+
end
18+
19+
subgraph "Instance B"
20+
B1[Stale cache] --> B2[Returns old data]
21+
end
22+
23+
A2 -.->|"No communication"| B1
24+
```
25+
26+
The EventBus solves this by providing a Kafka-backed pub/sub mechanism for cross-instance events.
27+
28+
## Architecture
29+
30+
The EventBus uses Kafka as a message broker. When a service publishes an event, it goes to Kafka. Each instance has a
31+
Kafka listener that receives events from other instances and distributes them to local subscribers.
32+
33+
```mermaid
34+
graph TB
35+
subgraph "Instance A"
36+
PA[Publisher] --> KP[Kafka Producer]
37+
KCA[Kafka Consumer] --> HA[Local Handlers]
38+
end
39+
40+
subgraph "Kafka"
41+
T[event-bus-stream topic]
42+
end
43+
44+
subgraph "Instance B"
45+
PB[Publisher] --> KPB[Kafka Producer]
46+
KCB[Kafka Consumer] --> HB[Local Handlers]
47+
end
48+
49+
KP --> T
50+
KPB --> T
51+
T --> KCA
52+
T --> KCB
53+
```
54+
55+
The key insight is that publishers handle their own state changes directly. They don't need to receive their own events
56+
back from Kafka. The EventBus filters out self-published messages so handlers only run for events from other instances.
57+
58+
## Self-filtering mechanism
59+
60+
Each EventBus instance has a unique ID. When publishing to Kafka, this ID is included as a message header:
61+
62+
```python
63+
headers = [("source_instance", self._instance_id.encode("utf-8"))]
64+
await self.producer.send_and_wait(topic=self._topic, value=value, headers=headers)
65+
```
66+
67+
The Kafka listener checks this header and skips messages from itself:
68+
69+
```python
70+
headers = dict(msg.headers) if msg.headers else {}
71+
source = headers.get("source_instance", b"").decode("utf-8")
72+
if source == self._instance_id:
73+
continue # Skip self-published messages
74+
```
75+
76+
This design means:
77+
78+
1. Publishers update their own state before calling `publish()`
79+
2. The `publish()` call tells other instances about the change
80+
3. Handlers only run for events from other instances
81+
82+
## Usage pattern
83+
84+
Services that need cross-instance communication follow this pattern:
85+
86+
```python
87+
class MyService:
88+
async def initialize(self, event_bus_manager: EventBusManager) -> None:
89+
bus = await event_bus_manager.get_event_bus()
90+
91+
async def _handle(evt: EventBusEvent) -> None:
92+
# This only runs for events from OTHER instances
93+
await self.invalidate_cache(evt.payload["id"])
94+
95+
await bus.subscribe("my.event.*", _handle)
96+
97+
async def update_something(self, id: str, data: dict) -> None:
98+
# 1. Update local state
99+
self._cache[id] = data
100+
101+
# 2. Notify other instances
102+
bus = await self._event_bus_manager.get_event_bus()
103+
await bus.publish("my.event.updated", {"id": id})
104+
```
105+
106+
## Pattern matching
107+
108+
Subscriptions support wildcard patterns using `fnmatch` syntax:
109+
110+
| Pattern | Matches |
111+
|--------------------------|----------------------------------|
112+
| `execution.*` | All execution events |
113+
| `execution.123.*` | All events for execution 123 |
114+
| `*.completed` | All completed events |
115+
| `user.settings.updated*` | Settings updates with any suffix |
116+
117+
## Flow diagram
118+
119+
Here's what happens when Instance A updates user settings:
120+
121+
```mermaid
122+
sequenceDiagram
123+
participant API as Instance A
124+
participant Cache as Local Cache A
125+
participant Kafka
126+
participant ListenerB as Instance B Listener
127+
participant CacheB as Local Cache B
128+
129+
API->>Cache: _add_to_cache(user_id, settings)
130+
API->>Kafka: publish("user.settings.updated", {user_id})
131+
Note over Kafka: Message includes source_instance header
132+
133+
Kafka->>API: Listener receives message
134+
API->>API: source == self, SKIP
135+
136+
Kafka->>ListenerB: Listener receives message
137+
ListenerB->>ListenerB: source != self, PROCESS
138+
ListenerB->>CacheB: invalidate_cache(user_id)
139+
```
140+
141+
## EventBusManager
142+
143+
The `EventBusManager` provides singleton access to the EventBus with proper lifecycle management:
144+
145+
```python
146+
async def get_event_bus(self) -> EventBus:
147+
async with self._lock:
148+
if self._event_bus is None:
149+
self._event_bus = EventBus(self.settings, self.logger)
150+
await self._event_bus.__aenter__()
151+
return self._event_bus
152+
```
153+
154+
Services receive the manager via dependency injection and call `get_event_bus()` when needed.
155+
156+
## Key files
157+
158+
| File | Purpose |
159+
|------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------|
160+
| [`services/event_bus.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/event_bus.py) | EventBus and EventBusManager implementation |
161+
| [`services/user_settings_service.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/user_settings_service.py) | Example usage for cache invalidation |
162+
163+
## Related docs
164+
165+
- [User Settings Events](user-settings-events.md) — event sourcing with cache invalidation via EventBus
166+
- [Event System Design](event-system-design.md) — domain events vs integration events
167+
- [Kafka Topics](kafka-topic-architecture.md) — topic naming and partitioning

docs/architecture/user-settings-events.md

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,23 @@ contain the new values in Avro-compatible form.
2020
The service uses Pydantic's `TypeAdapter` for dict-based operations without reflection or branching:
2121

2222
```python
23-
--8<-- "backend/app/services/user_settings_service.py:22:24"
23+
--8<-- "backend/app/services/user_settings_service.py:22:23"
2424
```
2525

2626
### Updating settings
2727

2828
The `update_user_settings` method merges changes into current settings, publishes an event, and manages snapshots:
2929

3030
```python
31-
--8<-- "backend/app/services/user_settings_service.py:88:120"
31+
--8<-- "backend/app/services/user_settings_service.py:91:118"
3232
```
3333

3434
### Applying events
3535

3636
When reconstructing settings from events, `_apply_event` merges each event's changes:
3737

3838
```python
39-
--8<-- "backend/app/services/user_settings_service.py:243:254"
39+
--8<-- "backend/app/services/user_settings_service.py:212:223"
4040
```
4141

4242
The `validate_python` call handles nested dict-to-dataclass conversion, enum parsing, and type coercion automatically.
@@ -63,30 +63,38 @@ while preserving full event history for auditing.
6363
Settings are cached with TTL to avoid repeated reconstruction:
6464

6565
```python
66-
--8<-- "backend/app/services/user_settings_service.py:34:40"
66+
--8<-- "backend/app/services/user_settings_service.py:33:40"
6767
```
6868

69-
Cache invalidation happens via event bus subscription:
69+
Cache invalidation happens via [EventBus](event-bus.md) subscription. The EventBus filters out self-published messages,
70+
so the handler only runs for events from other instances:
7071

7172
```python
72-
--8<-- "backend/app/services/user_settings_service.py:58:68"
73+
--8<-- "backend/app/services/user_settings_service.py:56:70"
7374
```
7475

75-
After each update, the service publishes to the event bus, triggering cache invalidation across instances.
76+
After each update, the service updates its local cache directly, then publishes to the event bus to trigger cache
77+
invalidation on other instances.
7678

7779
## Settings history
7880

7981
The `get_settings_history` method returns a list of changes extracted from events:
8082

8183
```python
82-
--8<-- "backend/app/services/user_settings_service.py:171:189"
84+
--8<-- "backend/app/services/user_settings_service.py:167:184"
8385
```
8486

8587
## Key files
8688

8789
| File | Purpose |
8890
|--------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------|
8991
| [`services/user_settings_service.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/user_settings_service.py) | Settings service with caching and event sourcing |
92+
| [`services/event_bus.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/event_bus.py) | Cross-instance event distribution |
9093
| [`domain/user/settings_models.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/domain/user/settings_models.py) | `DomainUserSettings`, `DomainUserSettingsUpdate` dataclasses |
9194
| [`infrastructure/kafka/events/user.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/infrastructure/kafka/events/user.py) | `UserSettingsUpdatedEvent` definition |
9295
| [`db/repositories/user_settings_repository.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/db/repositories/user_settings_repository.py) | Snapshot and event queries |
96+
97+
## Related docs
98+
99+
- [Event Bus](event-bus.md) — cross-instance communication with self-filtering
100+
- [Pydantic Dataclasses](pydantic-dataclasses.md) — TypeAdapter and dict-to-model conversion

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ nav:
119119
- Model Conversion: architecture/model-conversion.md
120120
- Event Storage: architecture/event-storage.md
121121
- Event System Design: architecture/event-system-design.md
122+
- Event Bus: architecture/event-bus.md
122123
- User Settings Events: architecture/user-settings-events.md
123124
- Frontend Build: architecture/frontend-build.md
124125
- Svelte 5 Migration: architecture/svelte5-migration.md

0 commit comments

Comments
 (0)