Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,49 @@ await bus.dispatch(DataEvent())

<br/>

### 🔧 Middleware System

Add cross-cutting concerns like analytics, error handling, and logging using Django-style middleware:

```python
from bubus import EventBus, BaseEvent
from bubus.middleware import EventBusMiddleware

class LoggingMiddleware(EventBusMiddleware):
def __call__(self, get_handler_result):
async def get_handler_result_wrapped_by_middleware(event: BaseEvent):
print(f"Processing {event.event_type}")

try:
result = await get_handler_result(event)
print(f"Handler succeeded")
return result
except Exception as e:
print(f"Handler failed: {e}")
raise

return get_handler_result_wrapped_by_middleware

# Create event bus with middleware
bus = EventBus(middlewares=[LoggingMiddleware()])
```

**Built-in Middleware:**

```python
from bubus.middleware import WALEventBusMiddleware

# WAL middleware for event persistence
bus = EventBus(middlewares=[
WALEventBusMiddleware('./events.jsonl')
])

# Or enable WAL automatically with wal_path parameter
bus = EventBus(wal_path='./events.jsonl') # Automatically adds WAL middleware
```

<br/>

### 📝 Write-Ahead Logging

Persist events automatically to a `jsonl` file for future replay and debugging:
Expand Down
5 changes: 5 additions & 0 deletions bubus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@

from bubus.models import BaseEvent, EventHandler, EventResult, PythonIdentifierStr, PythonIdStr, UUIDStr
from bubus.service import EventBus
from bubus.middleware import EventBusMiddleware, HandlerStartedAnalyticsEvent, HandlerCompletedAnalyticsEvent, WALEventBusMiddleware

__all__ = [
'EventBus',
'BaseEvent',
'EventResult',
'EventHandler',
'EventBusMiddleware',
'HandlerStartedAnalyticsEvent',
'HandlerCompletedAnalyticsEvent',
'WALEventBusMiddleware',
'UUIDStr',
'PythonIdStr',
'PythonIdentifierStr',
Expand Down
175 changes: 175 additions & 0 deletions bubus/middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
"""Middleware system for event bus with Django-style nested function pattern."""

import asyncio
import traceback
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any

from bubus.models import BaseEvent, EventHandler, PythonIdStr, get_handler_id, get_handler_name

if TYPE_CHECKING:
from bubus.service import EventBus


# Type alias for middleware functions
EventMiddleware = Callable[['EventBus', EventHandler, 'BaseEvent[Any]', Callable[[], Awaitable[Any]]], Awaitable[Any]]


class HandlerStartedAnalyticsEvent(BaseEvent[None]):
"""Analytics event dispatched when a handler starts execution"""

event_id: str # ID of the event being processed
started_at: datetime
event_bus_id: str
event_bus_name: str
handler_id: str
handler_name: str
handler_class: str


class HandlerCompletedAnalyticsEvent(BaseEvent[None]):
"""Analytics event dispatched when a handler completes execution"""

event_id: str # ID of the event being processed
completed_at: datetime
error: Exception | None = None
traceback_info: str = ''
event_bus_id: str
event_bus_name: str
handler_id: str
handler_name: str
handler_class: str


class EventBusMiddleware:
"""Base class for Django-style EventBus middleware"""

def __call__(self, get_handler_result: Callable[['BaseEvent[Any]'], Awaitable[Any]]) -> Callable[['BaseEvent[Any]'], Awaitable[Any]]:
"""
Django-style middleware pattern.

Args:
get_handler_result: The next middleware in the chain or the actual handler

Returns:
Wrapped function that processes events
"""
async def get_handler_result_wrapped_by_middleware(event: BaseEvent[Any]) -> Any:
return await get_handler_result(event)

return get_handler_result_wrapped_by_middleware


class WALEventBusMiddleware(EventBusMiddleware):
"""Write-Ahead Logging middleware for persisting events to JSONL files"""

def __init__(self, wal_path: Path | str):
self.wal_path = Path(wal_path)

def __call__(self, get_handler_result: Callable[['BaseEvent[Any]'], Awaitable[Any]]) -> Callable[['BaseEvent[Any]'], Awaitable[Any]]:
async def get_handler_result_wrapped_by_middleware(event: BaseEvent[Any]) -> Any:
# Just execute the handler and log completed events to WAL
# This is a simplified implementation - the original EventBus did more complex WAL handling
try:
result = await get_handler_result(event)

# Log completed event to WAL
try:
self.wal_path.parent.mkdir(parents=True, exist_ok=True)

# Use async I/O if available, otherwise sync
try:
import anyio
async with await anyio.open_file(self.wal_path, 'a', encoding='utf-8') as f:
await f.write(event.model_dump_json() + '\n')
except ImportError:
# Fallback to sync I/O
with open(self.wal_path, 'a', encoding='utf-8') as f:
f.write(event.model_dump_json() + '\n')
except Exception:
# Don't let WAL errors break the handler
pass

return result
except Exception:
# Could log error events here too, but keeping it simple
raise

return get_handler_result_wrapped_by_middleware


class AnalyticsEventBusMiddleware(EventBusMiddleware):
"""Analytics middleware that dispatches analytics events for handler execution"""

def __init__(self, analytics_bus: 'EventBus'):
self.analytics_bus = analytics_bus

def __call__(self, get_handler_result: Callable[['BaseEvent[Any]'], Awaitable[Any]]) -> Callable[['BaseEvent[Any]'], Awaitable[Any]]:
async def get_handler_result_wrapped_by_middleware(event: BaseEvent[Any]) -> Any:
# Access event bus and handler info from the event context
from bubus.models import get_handler_id, get_handler_name
from bubus.service import _current_handler_id_context, inside_handler_context

# We can access the event bus through event.event_bus
event_bus = event.event_bus

# Get handler information from context
handler_id = _current_handler_id_context.get()

# Get the event result object which contains handler information
event_result = None
if handler_id and handler_id in event.event_results:
event_result = event.event_results[handler_id]

# Dispatch started analytics event if we have the context
if event_result and inside_handler_context.get():
started_event = HandlerStartedAnalyticsEvent(
event_id=event.event_id,
started_at=event_result.started_at or datetime.now(UTC),
event_bus_id=event_bus.id,
event_bus_name=event_bus.name,
handler_id=handler_id,
handler_name=event_result.handler_name,
handler_class=event_result.handler_class,
)
self.analytics_bus.dispatch(started_event)

try:
result = await get_handler_result(event)

# Dispatch completed analytics event
if event_result and inside_handler_context.get():
completed_event = HandlerCompletedAnalyticsEvent(
event_id=event.event_id,
completed_at=datetime.now(UTC),
error=None,
traceback_info='',
event_bus_id=event_bus.id,
event_bus_name=event_bus.name,
handler_id=handler_id,
handler_name=event_result.handler_name,
handler_class=event_result.handler_class,
)
self.analytics_bus.dispatch(completed_event)

return result
except Exception as e:
# Dispatch completed analytics event with error
if event_result and inside_handler_context.get():
completed_event = HandlerCompletedAnalyticsEvent(
event_id=event.event_id,
completed_at=datetime.now(UTC),
error=e,
traceback_info=traceback.format_exc(),
event_bus_id=event_bus.id,
event_bus_name=event_bus.name,
handler_id=handler_id,
handler_name=event_result.handler_name,
handler_class=event_result.handler_class,
)
self.analytics_bus.dispatch(completed_event)
raise

return get_handler_result_wrapped_by_middleware
4 changes: 4 additions & 0 deletions bubus/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import inspect
import logging
import os
import traceback
from collections.abc import Awaitable, Callable, Generator
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Generic, Literal, Protocol, Self, TypeAlias, cast, runtime_checkable
Expand Down Expand Up @@ -923,6 +924,9 @@ def log_tree(
log_eventresult_tree(self, indent, is_last, child_events_by_parent)


# Analytics events are now in bubus.middleware module


# Resolve forward references
BaseEvent.model_rebuild()
EventResult.model_rebuild()
Loading
Loading