Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,49 @@ await bus.dispatch(DataEvent())

<br/>

### 🔧 Middleware System

Add cross-cutting concerns like analytics, error handling, and logging using Django-style middleware:

```python
from bubus import EventBus, BaseEvent
from bubus.middleware import EventBusMiddleware

class LoggingMiddleware(EventBusMiddleware):
def __call__(self, get_handler_result):
async def get_handler_result_wrapped_by_middleware(event: BaseEvent):
print(f"Processing {event.event_type}")

try:
result = await get_handler_result(event)
print(f"Handler succeeded")
return result
except Exception as e:
print(f"Handler failed: {e}")
raise

return get_handler_result_wrapped_by_middleware

# Create event bus with middleware
bus = EventBus(middlewares=[LoggingMiddleware()])
```

**Built-in Middleware:**

```python
from bubus.middleware import WALEventBusMiddleware

# WAL middleware for event persistence
bus = EventBus(middlewares=[
WALEventBusMiddleware('./events.jsonl')
])

# Or enable WAL automatically with wal_path parameter
bus = EventBus(wal_path='./events.jsonl') # Automatically adds WAL middleware
```

<br/>

### 📝 Write-Ahead Logging

Persist events automatically to a `jsonl` file for future replay and debugging:
Expand Down
5 changes: 5 additions & 0 deletions bubus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@

from bubus.models import BaseEvent, EventHandler, EventResult, PythonIdentifierStr, PythonIdStr, UUIDStr
from bubus.service import EventBus
from bubus.middleware import EventBusMiddleware, HandlerStartedAnalyticsEvent, HandlerCompletedAnalyticsEvent, WALEventBusMiddleware

__all__ = [
'EventBus',
'BaseEvent',
'EventResult',
'EventHandler',
'EventBusMiddleware',
'HandlerStartedAnalyticsEvent',
'HandlerCompletedAnalyticsEvent',
'WALEventBusMiddleware',
'UUIDStr',
'PythonIdStr',
'PythonIdentifierStr',
Expand Down
121 changes: 121 additions & 0 deletions bubus/middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""Middleware system for event bus with Django-style nested function pattern."""

import asyncio
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any

from bubus.models import BaseEvent, EventHandler, PythonIdStr, get_handler_id, get_handler_name

if TYPE_CHECKING:
from bubus.service import EventBus


# Type alias for middleware functions
EventMiddleware = Callable[['EventBus', EventHandler, 'BaseEvent[Any]', Callable[[], Awaitable[Any]]], Awaitable[Any]]


class HandlerStartedAnalyticsEvent(BaseEvent[None]):
"""Analytics event dispatched when a handler starts execution"""

event_id: str # ID of the event being processed
started_at: datetime
event_bus_id: str
event_bus_name: str
handler_id: str
handler_name: str
handler_class: str


class HandlerCompletedAnalyticsEvent(BaseEvent[None]):
"""Analytics event dispatched when a handler completes execution"""

event_id: str # ID of the event being processed
completed_at: datetime
error: Exception | None = None
traceback_info: str = ''
event_bus_id: str
event_bus_name: str
handler_id: str
handler_name: str
handler_class: str


class EventBusMiddleware:
"""Base class for Django-style EventBus middleware"""

def __call__(self, get_handler_result: Callable[['BaseEvent[Any]'], Awaitable[Any]]) -> Callable[['BaseEvent[Any]'], Awaitable[Any]]:
"""
Django-style middleware pattern.

Args:
get_handler_result: The next middleware in the chain or the actual handler

Returns:
Wrapped function that processes events
"""
async def get_handler_result_wrapped_by_middleware(event: BaseEvent[Any]) -> Any:
return await get_handler_result(event)

return get_handler_result_wrapped_by_middleware


class WALEventBusMiddleware(EventBusMiddleware):
"""Write-Ahead Logging middleware for persisting events to JSONL files"""

def __init__(self, wal_path: Path | str):
self.wal_path = Path(wal_path)

def __call__(self, get_handler_result: Callable[['BaseEvent[Any]'], Awaitable[Any]]) -> Callable[['BaseEvent[Any]'], Awaitable[Any]]:
async def get_handler_result_wrapped_by_middleware(event: BaseEvent[Any]) -> Any:
# Just execute the handler and log completed events to WAL
# This is a simplified implementation - the original EventBus did more complex WAL handling
try:
result = await get_handler_result(event)

# Log completed event to WAL
try:
self.wal_path.parent.mkdir(parents=True, exist_ok=True)

# Use async I/O if available, otherwise sync
try:
import anyio
async with await anyio.open_file(self.wal_path, 'a', encoding='utf-8') as f:
await f.write(event.model_dump_json() + '\n')
except ImportError:
# Fallback to sync I/O
with open(self.wal_path, 'a', encoding='utf-8') as f:
f.write(event.model_dump_json() + '\n')
except Exception:
# Don't let WAL errors break the handler
pass

return result
except Exception:
# Could log error events here too, but keeping it simple
raise

return get_handler_result_wrapped_by_middleware


class AnalyticsEventBusMiddleware(EventBusMiddleware):
"""Analytics middleware that dispatches analytics events for handler execution"""

def __init__(self, analytics_bus: 'EventBus'):
self.analytics_bus = analytics_bus

def __call__(self, get_handler_result: Callable[['BaseEvent[Any]'], Awaitable[Any]]) -> Callable[['BaseEvent[Any]'], Awaitable[Any]]:
async def get_handler_result_wrapped_by_middleware(event: BaseEvent[Any]) -> Any:
# Note: We can't easily access the handler and event_bus from this middleware pattern
# This would need to be refactored to work with the Django pattern
# For now, this is a placeholder implementation
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@claude yes you can, it's just event.event_bus

You should pass the event_result in too so you can get the handler id and stuff

Copy link

@claude claude bot Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude finished @pirate's task —— View job


Tasks Completed ✅

  • Read current middleware implementation to understand context
  • Analyze the suggestion to use event.event_bus and event_result
  • Implement the requested changes
  • Test the changes work correctly (syntax checks passed)
  • Commit and push the changes

Perfect! I've successfully implemented your suggestions and pushed the changes.

Changes Made ✅

Updated AnalyticsEventBusMiddleware in bubus/middleware.py:

  1. Access EventBus via event.event_bus

    • Now properly accessing the event bus through event.event_bus as suggested
    • This gives us access to event_bus.id and event_bus.name
  2. Pass event_result information

    • Now accessing the event_result object from event.event_results[handler_id]
    • This gives us access to handler_name, handler_class, started_at, and other handler metadata
  3. Proper analytics event dispatching

    • HandlerStartedAnalyticsEvent dispatched with complete handler and bus information
    • HandlerCompletedAnalyticsEvent dispatched on success with all metadata
    • Error handling with HandlerCompletedAnalyticsEvent including exception and traceback info
  4. Context-aware implementation

    • Uses _current_handler_id_context.get() to get current handler ID
    • Uses inside_handler_context.get() to ensure we're within handler execution
    • Added proper imports for required context variables and traceback module

The middleware now has full access to both the event bus and event result information, enabling comprehensive analytics tracking! 🎉

Commit: 1fe4f65 - Updated middleware to use event.event_bus and event_result data



try:
result = await get_handler_result(event)
return result
except Exception as e:
# Could dispatch analytics events here if we had access to handler info
raise

return get_handler_result_wrapped_by_middleware
4 changes: 4 additions & 0 deletions bubus/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import inspect
import logging
import os
import traceback
from collections.abc import Awaitable, Callable, Generator
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Generic, Literal, Protocol, Self, TypeAlias, cast, runtime_checkable
Expand Down Expand Up @@ -923,6 +924,9 @@ def log_tree(
log_eventresult_tree(self, indent, is_last, child_events_by_parent)


# Analytics events are now in bubus.middleware module


# Resolve forward references
BaseEvent.model_rebuild()
EventResult.model_rebuild()
68 changes: 48 additions & 20 deletions bubus/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import contextvars
import inspect
import logging
import traceback
import warnings
import weakref
from collections import defaultdict, deque
from collections.abc import Callable
from collections.abc import Awaitable, Callable
from contextvars import ContextVar
from pathlib import Path
from typing import Any, Literal, TypeVar, cast, overload
Expand Down Expand Up @@ -51,6 +52,8 @@ class QueueShutDown(Exception):

EventPatternType = PythonIdentifierStr | Literal['*'] | type['BaseEvent[Any]']

# EventBusMiddleware will be imported dynamically to avoid circular imports


class CleanShutdownQueue(asyncio.Queue[QueueEntryType]):
"""asyncio.Queue subclass that handles shutdown cleanly without warnings."""
Expand Down Expand Up @@ -264,6 +267,7 @@ def __init__(
wal_path: Path | str | None = None,
parallel_handlers: bool = False,
max_history_size: int | None = 50, # Keep only 50 events in history
middlewares: list[Any] | None = None,
):
self.id = uuid7str()
self.name = name or f'{self.__class__.__name__}_{self.id[-8:]}'
Expand Down Expand Up @@ -317,6 +321,13 @@ def __init__(
self.parallel_handlers = parallel_handlers
self.wal_path = Path(wal_path) if wal_path else None
self._on_idle = None

# Set up middlewares, adding WAL middleware if wal_path is provided
self.middlewares = middlewares or []
if wal_path:
# Import here to avoid circular imports
from bubus.middleware import WALEventBusMiddleware
self.middlewares.append(WALEventBusMiddleware(wal_path))

# Memory leak prevention settings
self.max_history_size = max_history_size
Expand Down Expand Up @@ -949,7 +960,6 @@ async def process_event(self, event: 'BaseEvent[Any]', timeout: float | None = N
await self._execute_handlers(event, handlers=applicable_handlers, timeout=timeout)

await self._default_log_handler(event)
await self._default_wal_handler(event)

# Mark event as complete if all handlers are done
event.event_mark_complete_if_all_handlers_completed()
Expand Down Expand Up @@ -1023,8 +1033,8 @@ async def _execute_handlers(
context = contextvars.copy_context()
for handler_id, handler in applicable_handlers.items():
task = asyncio.create_task(
self._execute_sync_or_async_handler(event, handler, timeout=timeout),
name=f'{self}._execute_sync_or_async_handler({event}, {get_handler_name(handler)})',
self._execute_handler_with_middlewares(event, handler, timeout=timeout),
name=f'{self}._execute_handler_with_middlewares({event}, {get_handler_name(handler)})',
context=context,
)
handler_tasks[handler_id] = (task, handler)
Expand All @@ -1034,20 +1044,50 @@ async def _execute_handlers(
try:
await task
except Exception:
# Error already logged and recorded in _execute_sync_or_async_handler
# Error already logged and recorded in _execute_handler_with_middlewares
pass
else:
# otherwise, execute handlers serially, wait until each one completes before moving on to the next
for handler_id, handler in applicable_handlers.items():
try:
await self._execute_sync_or_async_handler(event, handler, timeout=timeout)
await self._execute_handler_with_middlewares(event, handler, timeout=timeout)
except Exception as e:
# Error already logged and recorded in _execute_sync_or_async_handler
# Error already logged and recorded in _execute_handler_with_middlewares
logger.debug(
f'❌ {self} Handler {get_handler_name(handler)}#{str(id(handler))[-4:]}({event}) failed with {type(e).__name__}: {e}'
)
pass

async def _execute_handler_with_middlewares(
self, event: 'BaseEvent[T_EventResultType]', handler: EventHandler, timeout: float | None = None
) -> Any:
"""Execute a handler through the Django-style middleware chain"""
if not self.middlewares:
# No middlewares, execute handler directly
return await self._execute_sync_or_async_handler(event, handler, timeout)

# Create Django-style middleware chain by wrapping the handler in middleware layers
async def base_handler(event: 'BaseEvent[Any]') -> Any:
return await self._execute_sync_or_async_handler(event, handler, timeout)

# Wrap the handler with each middleware (in reverse order for correct execution)
wrapped_handler = base_handler
for middleware in reversed(self.middlewares):
try:
wrapped_handler = middleware(wrapped_handler)
except Exception as e:
# Log middleware initialization error and re-raise
handler_id = get_handler_id(handler, self)
logger.exception(
f'❌ {self} Error initializing middleware {middleware.__class__.__name__} '
f'for handler {get_handler_name(handler)}#{handler_id[-4:]}({event}) -> {type(e).__name__}({e})',
exc_info=True,
)
raise

# Execute the wrapped handler
return await wrapped_handler(event)

async def _execute_sync_or_async_handler(
self, event: 'BaseEvent[T_EventResultType]', handler: EventHandler, timeout: float | None = None
) -> Any:
Expand Down Expand Up @@ -1256,19 +1296,7 @@ async def _default_log_handler(self, event: 'BaseEvent[Any]') -> None:
# )
pass

async def _default_wal_handler(self, event: 'BaseEvent[Any]') -> None:
"""Persist completed event to WAL file as JSONL"""

if not self.wal_path:
return None

try:
event_json = event.model_dump_json() # pyright: ignore[reportUnknownMemberType]
self.wal_path.parent.mkdir(parents=True, exist_ok=True)
async with await anyio.open_file(self.wal_path, 'a', encoding='utf-8') as f: # pyright: ignore[reportUnknownMemberType]
await f.write(event_json + '\n') # pyright: ignore[reportUnknownMemberType]
except Exception as e:
logger.error(f'❌ {self} Failed to save event {event.event_id} to WAL file: {type(e).__name__} {e}\n{event}')
# WAL functionality is now handled by WALEventBusMiddleware

def cleanup_excess_events(self) -> int:
"""
Expand Down
Loading
Loading