-
Notifications
You must be signed in to change notification settings - Fork 14
Description
Make it easier to trigger side effects before/after/during handler execution, e.g. as a result of exceptions raised by handlers.
Could be useful to allow registering custom middlewares that wrap the naiive handler exeuction flow so you can catch errors, modify events going in and out, check auth parameters, dispatch custom events, etc.
import traceback
class HandlerStartedAnalyticsEvent(BaseEvent[None]):
event_id: str # 'uuid-of-event-lkjsdf'
started_at: datetime # 2025-01-01 23:59:59.999+0
event_bus_id: str # '234123452345'
event_bus_name: str # 'BrowserEventBus123'
handler_id: str # '234235345'
handler_name: str # 'some_handler_method'
handler_class: str # 'some_module.some_file.SomeClass.some_handler_method'
class HandlerCompletedAnalyticsEvent(BaseEvent[None]):
event_id: str # 'uuid-of-event-lkjsdf'
completed_at: datetime # 2025-01-01 23:59:59.999+0
error: Exception | None # ValueError('some exception raised by a handler')
traceback_info: str # 'Exception: Some exception ... in xyz_file.py:234 from ...'
event_bus_id: str # '234123452345'
event_bus_name: str # 'BrowserEventBus123'
handler_id: str # '234235345'
handler_name: str # 'some_handler_method'
handler_class: str # 'some_module.some_file.SomeClass.some_handler_method'
analytics_bus = EventBus()
async def error_handling_middleware(event_bus: EventBus, event_result: EventResult) -> EventResult:
global analytics_bus
# dispatch an analytics event before every single handler execution
await analytics_bus.dispatch(HandlerStartedAnalyticsEvent(
event_id=event_result.event_id,
started_at=datetime.now(),
event_bus_id=event_bus.id,
...
))
status, result, error, traceback_str = 'started', None, None, ''
try:
# execute the handler and get the updated result
result = await event_bus.run_single_event_handler(event_result)
status = 'completed'
except Exception as e:
result = None
status = 'failed'
error = e
traceback_str = traceback.format_exc()
# can do other side-effects here, e.g. attempt to reconnect to underlying resources
# GlobalDBConnection.reconnect()
# SomeAPIClient.clear_cache()
# some_file.write_text('some handler failed ...')
finally:
# dispatch an analytics event after every single handler execution, regardless of success/failure
await analytics_bus.dispatch(HandlerCompletedAnalyticsEvent(
event_id=event_result.event_id,
error=error,
traceback=traceback_str,
event_bus_id=event_bus.id,
event_bus_name=event_bus.name,
handler_id=event_result.handler_id,
handler_name=event_result.handler_name,
handler_class=event_result.handler_class,
)
updated_event_result = await event_result.update(status=status, result=result, error=error)
return updated_event_result
event_bus = EventBus(
name='BrowserEventBus123',
middlewares=[error_handling_middleware],
)
This is useful and distinct from event_bus.on('*', error_handling_middleware)
because it allows fine-grained control over the runtime behavior / execution of handlers and error handling.
The middlewares should be defined django-style as a function that takes the event and a second arg it can use to call to execute the handlers and get the handler return value (look through bubus/service.py
/ bubus/models.py
for the correct helper method to execute a single handler and update its result in event_results
that we can pass here to hook).
The current WAL and logging functionality should be entirely moved to a new bubus/middlewares.py
and be optionally imported to enable those features by users that need them.
Update the README for the WAL section demonstrating it:
from bubus.middlewares import WALEventBusMiddlware, LoggerEventBusMiddleware
event_bus = EventBus(
...
middlewares=[WALEventBusMiddlware, LoggerEventBusMiddleware],
)