|
| 1 | +# Event System |
| 2 | + |
| 3 | +Hyx includes an event system that allows you to observe and react to component lifecycle events. |
| 4 | +This is the foundation for telemetry integrations and enables building custom monitoring solutions. |
| 5 | + |
| 6 | +## Overview |
| 7 | + |
| 8 | +The event system follows the **Observer pattern** with these key concepts: |
| 9 | + |
| 10 | +- **Events** - Lifecycle moments (retry attempt, circuit breaker state change, timeout, etc.) |
| 11 | +- **Listeners** - Objects that react to events by implementing handler methods |
| 12 | +- **EventDispatcher** - Routes events to registered listeners |
| 13 | +- **EventManager** - Tracks async listener tasks for graceful shutdown |
| 14 | + |
| 15 | +``` |
| 16 | +βββββββββββββββ ββββββββββββββββββββ βββββββββββββββ |
| 17 | +β Component ββββββΆβ EventDispatcher ββββββΆβ Listeners β |
| 18 | +β (Retry, β β β β (OTel, β |
| 19 | +β Breaker) β β β β Custom) β |
| 20 | +βββββββββββββββ ββββββββββββββββββββ βββββββββββββββ |
| 21 | + β |
| 22 | + βΌ |
| 23 | + ββββββββββββββββββββ |
| 24 | + β EventManager β |
| 25 | + β (task tracking) β |
| 26 | + ββββββββββββββββββββ |
| 27 | +``` |
| 28 | + |
| 29 | +## Listener Interfaces |
| 30 | + |
| 31 | +Each component type defines its own listener interface. Implement only the methods you need. |
| 32 | + |
| 33 | +### RetryListener |
| 34 | + |
| 35 | +```python |
| 36 | +from hyx.retry.events import RetryListener |
| 37 | + |
| 38 | +class MyRetryListener(RetryListener): |
| 39 | + async def on_retry(self, retry, exception, counter, backoff): |
| 40 | + """Called when a retry attempt is made.""" |
| 41 | + print(f"Retry #{counter.current} for {retry.name}: {exception}") |
| 42 | + |
| 43 | + async def on_attempts_exceeded(self, retry): |
| 44 | + """Called when all retry attempts are exhausted.""" |
| 45 | + print(f"All retries exhausted for {retry.name}") |
| 46 | + |
| 47 | + async def on_success(self, retry, counter): |
| 48 | + """Called when the operation succeeds (with or without retries).""" |
| 49 | + print(f"Success for {retry.name} after {counter.current} attempts") |
| 50 | +``` |
| 51 | + |
| 52 | +| Method | Parameters | Description | |
| 53 | +|--------|------------|-------------| |
| 54 | +| `on_retry` | `retry`, `exception`, `counter`, `backoff` | Retry attempt made | |
| 55 | +| `on_attempts_exceeded` | `retry` | All attempts exhausted | |
| 56 | +| `on_success` | `retry`, `counter` | Operation succeeded | |
| 57 | + |
| 58 | +### CircuitBreakerListener (BreakerListener) |
| 59 | + |
| 60 | +```python |
| 61 | +from hyx.circuitbreaker.events import BreakerListener |
| 62 | + |
| 63 | +class MyBreakerListener(BreakerListener): |
| 64 | + async def on_working(self, context, current_state, next_state): |
| 65 | + """Called when breaker transitions to working state.""" |
| 66 | + print(f"{context.name}: {current_state.name} -> working") |
| 67 | + |
| 68 | + async def on_recovering(self, context, current_state, next_state): |
| 69 | + """Called when breaker transitions to recovering state.""" |
| 70 | + print(f"{context.name}: {current_state.name} -> recovering") |
| 71 | + |
| 72 | + async def on_failing(self, context, current_state, next_state): |
| 73 | + """Called when breaker transitions to failing (open) state.""" |
| 74 | + print(f"{context.name}: {current_state.name} -> failing") |
| 75 | + |
| 76 | + async def on_success(self, context, state): |
| 77 | + """Called on successful operation through the breaker.""" |
| 78 | + print(f"{context.name}: success in {state.name} state") |
| 79 | +``` |
| 80 | + |
| 81 | +| Method | Parameters | Description | |
| 82 | +|--------|------------|-------------| |
| 83 | +| `on_working` | `context`, `current_state`, `next_state` | Transitioned to working | |
| 84 | +| `on_recovering` | `context`, `current_state`, `next_state` | Transitioned to recovering | |
| 85 | +| `on_failing` | `context`, `current_state`, `next_state` | Transitioned to failing | |
| 86 | +| `on_success` | `context`, `state` | Operation succeeded | |
| 87 | + |
| 88 | +### TimeoutListener |
| 89 | + |
| 90 | +```python |
| 91 | +from hyx.timeout.events import TimeoutListener |
| 92 | + |
| 93 | +class MyTimeoutListener(TimeoutListener): |
| 94 | + async def on_timeout(self, timeout): |
| 95 | + """Called when an operation exceeds the timeout.""" |
| 96 | + print(f"Timeout exceeded for {timeout.name}") |
| 97 | +``` |
| 98 | + |
| 99 | +| Method | Parameters | Description | |
| 100 | +|--------|------------|-------------| |
| 101 | +| `on_timeout` | `timeout` | Operation timed out | |
| 102 | + |
| 103 | +### BulkheadListener |
| 104 | + |
| 105 | +```python |
| 106 | +from hyx.bulkhead.events import BulkheadListener |
| 107 | + |
| 108 | +class MyBulkheadListener(BulkheadListener): |
| 109 | + async def on_bulkhead_full(self, bulkhead): |
| 110 | + """Called when an operation is rejected due to capacity.""" |
| 111 | + print(f"Bulkhead {bulkhead.name} is full, request rejected") |
| 112 | +``` |
| 113 | + |
| 114 | +| Method | Parameters | Description | |
| 115 | +|--------|------------|-------------| |
| 116 | +| `on_bulkhead_full` | `bulkhead` | Request rejected (capacity exceeded) | |
| 117 | + |
| 118 | +### FallbackListener |
| 119 | + |
| 120 | +```python |
| 121 | +from hyx.fallback.events import FallbackListener |
| 122 | + |
| 123 | +class MyFallbackListener(FallbackListener): |
| 124 | + async def on_fallback(self, fallback, result, *args, **kwargs): |
| 125 | + """Called when the fallback handler is triggered.""" |
| 126 | + reason = "exception" if isinstance(result, Exception) else "predicate" |
| 127 | + print(f"Fallback triggered for {fallback.name}: {reason}") |
| 128 | +``` |
| 129 | + |
| 130 | +| Method | Parameters | Description | |
| 131 | +|--------|------------|-------------| |
| 132 | +| `on_fallback` | `fallback`, `result`, `*args`, `**kwargs` | Fallback was triggered | |
| 133 | + |
| 134 | +## Registering Listeners |
| 135 | + |
| 136 | +There are two ways to register listeners: **globally** (for all components of a type) or **locally** (for a specific component instance). |
| 137 | + |
| 138 | +### Global Registration |
| 139 | + |
| 140 | +Global listeners receive events from all components of that type in your application: |
| 141 | + |
| 142 | +```python |
| 143 | +from hyx.retry.events import register_retry_listener |
| 144 | +from hyx.circuitbreaker.events import register_breaker_listener |
| 145 | +from hyx.timeout.events import register_timeout_listener |
| 146 | +from hyx.bulkhead.events import register_bulkhead_listener |
| 147 | +from hyx.fallback.events import register_fallback_listener |
| 148 | + |
| 149 | +# Register once at application startup |
| 150 | +register_retry_listener(MyRetryListener()) |
| 151 | +register_breaker_listener(MyBreakerListener()) |
| 152 | +register_timeout_listener(MyTimeoutListener()) |
| 153 | +register_bulkhead_listener(MyBulkheadListener()) |
| 154 | +register_fallback_listener(MyFallbackListener()) |
| 155 | +``` |
| 156 | + |
| 157 | +### Local Registration |
| 158 | + |
| 159 | +Local listeners are attached to specific component instances: |
| 160 | + |
| 161 | +```python |
| 162 | +from hyx.retry import retry |
| 163 | + |
| 164 | +listener = MyRetryListener() |
| 165 | + |
| 166 | +@retry(attempts=3, listeners=[listener]) |
| 167 | +async def my_function(): |
| 168 | + ... |
| 169 | +``` |
| 170 | + |
| 171 | +```python |
| 172 | +from hyx.circuitbreaker import consecutive_breaker |
| 173 | + |
| 174 | +listener = MyBreakerListener() |
| 175 | + |
| 176 | +breaker = consecutive_breaker( |
| 177 | + failure_threshold=5, |
| 178 | + recovery_time_secs=30, |
| 179 | + listeners=[listener], |
| 180 | +) |
| 181 | +``` |
| 182 | + |
| 183 | +### Combining Global and Local |
| 184 | + |
| 185 | +Both global and local listeners can be active simultaneously. Events are dispatched to all registered listeners: |
| 186 | + |
| 187 | +```python |
| 188 | +from hyx.retry.events import register_retry_listener |
| 189 | +from hyx.retry import retry |
| 190 | + |
| 191 | +# Global listener for metrics |
| 192 | +register_retry_listener(MetricsListener()) |
| 193 | + |
| 194 | +# Local listener for specific logging |
| 195 | +debug_listener = DebugListener() |
| 196 | + |
| 197 | +@retry(attempts=3, listeners=[debug_listener]) |
| 198 | +async def critical_operation(): |
| 199 | + ... |
| 200 | +``` |
| 201 | + |
| 202 | +## Event Manager |
| 203 | + |
| 204 | +The `EventManager` tracks all async listener tasks, enabling graceful shutdown and testing. |
| 205 | + |
| 206 | +### Basic Usage |
| 207 | + |
| 208 | +```python |
| 209 | +from hyx.events import EventManager |
| 210 | +from hyx.retry import retry |
| 211 | + |
| 212 | +event_manager = EventManager() |
| 213 | + |
| 214 | +@retry(attempts=3, event_manager=event_manager) |
| 215 | +async def my_function(): |
| 216 | + ... |
| 217 | + |
| 218 | +# Run your operations |
| 219 | +await my_function() |
| 220 | + |
| 221 | +# Wait for all listener tasks to complete |
| 222 | +await event_manager.wait_for_tasks() |
| 223 | +``` |
| 224 | + |
| 225 | +### Graceful Shutdown |
| 226 | + |
| 227 | +```python |
| 228 | +import signal |
| 229 | +from hyx.events import EventManager |
| 230 | + |
| 231 | +event_manager = EventManager() |
| 232 | + |
| 233 | +async def shutdown(): |
| 234 | + # Cancel all pending listener tasks |
| 235 | + await event_manager.cancel_tasks() |
| 236 | + |
| 237 | +# Register shutdown handler |
| 238 | +loop = asyncio.get_event_loop() |
| 239 | +loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.create_task(shutdown())) |
| 240 | +``` |
| 241 | + |
| 242 | +### Testing |
| 243 | + |
| 244 | +The EventManager is essential for testing to ensure all events are processed: |
| 245 | + |
| 246 | +```python |
| 247 | +import pytest |
| 248 | +from hyx.events import EventManager |
| 249 | +from hyx.retry import retry |
| 250 | + |
| 251 | +async def test_retry_events(): |
| 252 | + event_manager = EventManager() |
| 253 | + captured_events = [] |
| 254 | + |
| 255 | + class TestListener: |
| 256 | + async def on_retry(self, retry, exception, counter, backoff): |
| 257 | + captured_events.append(("retry", retry.name)) |
| 258 | + |
| 259 | + async def on_success(self, retry, counter): |
| 260 | + captured_events.append(("success", retry.name)) |
| 261 | + |
| 262 | + @retry(attempts=3, listeners=[TestListener()], event_manager=event_manager) |
| 263 | + async def flaky(): |
| 264 | + if len(captured_events) < 2: |
| 265 | + raise ValueError("not yet") |
| 266 | + return "ok" |
| 267 | + |
| 268 | + await flaky() |
| 269 | + await event_manager.wait_for_tasks() # Important! |
| 270 | + |
| 271 | + assert len(captured_events) == 3 # 2 retries + 1 success |
| 272 | +``` |
| 273 | + |
| 274 | +## Listener Factories |
| 275 | + |
| 276 | +For advanced use cases, you can use **listener factories** - callables that create listeners dynamically based on the component: |
| 277 | + |
| 278 | +```python |
| 279 | +from hyx.retry.events import register_retry_listener |
| 280 | + |
| 281 | +async def create_listener(component): |
| 282 | + """Factory that creates a listener with component context.""" |
| 283 | + class DynamicListener: |
| 284 | + async def on_retry(self, retry, exception, counter, backoff): |
| 285 | + # Access component info at creation time |
| 286 | + print(f"Retry for component created at startup: {component.name}") |
| 287 | + |
| 288 | + return DynamicListener() |
| 289 | + |
| 290 | +# Register the factory (not an instance) |
| 291 | +register_retry_listener(create_listener) |
| 292 | +``` |
| 293 | + |
| 294 | +Factories are useful when: |
| 295 | + |
| 296 | +- Listeners need component-specific configuration |
| 297 | +- You want lazy initialization |
| 298 | +- The listener needs to reference the component it's attached to |
| 299 | + |
| 300 | +## Architecture |
| 301 | + |
| 302 | +### EventDispatcher |
| 303 | + |
| 304 | +The `EventDispatcher` is the core routing mechanism. It: |
| 305 | + |
| 306 | +1. Collects local and global listeners |
| 307 | +2. Initializes listener factories on first event |
| 308 | +3. Dispatches events to all listeners in parallel |
| 309 | +4. Tracks tasks via EventManager (if provided) |
| 310 | + |
| 311 | +```python |
| 312 | +from hyx.events import EventDispatcher, ListenerRegistry |
| 313 | + |
| 314 | +# Internal usage (you typically don't need this directly) |
| 315 | +dispatcher = EventDispatcher( |
| 316 | + local_listeners=[listener1, listener2], |
| 317 | + global_listener_registry=registry, |
| 318 | + event_manager=event_manager, |
| 319 | +) |
| 320 | +``` |
| 321 | + |
| 322 | +### ListenerRegistry |
| 323 | + |
| 324 | +Each component type has a global `ListenerRegistry`: |
| 325 | + |
| 326 | +```python |
| 327 | +from hyx.events import ListenerRegistry |
| 328 | + |
| 329 | +# Defined in each component's events module |
| 330 | +_RETRY_LISTENERS: ListenerRegistry["RetryManager", "RetryListener"] = ListenerRegistry() |
| 331 | +``` |
| 332 | + |
| 333 | +### Event Flow |
| 334 | + |
| 335 | +1. Component calls event method (e.g., `self._event_dispatcher.on_retry(...)`) |
| 336 | +2. EventDispatcher creates an async task |
| 337 | +3. Task is registered with EventManager (if present) |
| 338 | +4. All listeners receive the event in parallel via `asyncio.gather` |
| 339 | +5. Errors in listeners are isolated (don't affect the main operation) |
| 340 | + |
| 341 | +## Best Practices |
| 342 | + |
| 343 | +1. **Keep listeners fast** - Events are processed asynchronously but slow listeners can accumulate |
| 344 | +2. **Handle errors gracefully** - Listener errors don't propagate to the main operation |
| 345 | +3. **Use EventManager in tests** - Always call `await event_manager.wait_for_tasks()` before assertions |
| 346 | +4. **Prefer global registration for observability** - Use local listeners only for component-specific behavior |
| 347 | +5. **Don't block in listeners** - Use `asyncio.create_task()` for long-running operations |
0 commit comments