From b0f1024fd087b45ada0832a04089187caae60064 Mon Sep 17 00:00:00 2001 From: Bamdad Dashtban Date: Thu, 5 Jun 2025 14:49:46 +0100 Subject: [PATCH] adding docs --- documentations/README.md | 162 +++++++ documentations/api-reference.md | 621 ++++++++++++++++++++++++++ documentations/events.md | 443 +++++++++++++++++++ documentations/overview.md | 196 +++++++++ documentations/projections.md | 574 ++++++++++++++++++++++++ documentations/query-system.md | 541 +++++++++++++++++++++++ documentations/storage-adapters.md | 485 +++++++++++++++++++++ documentations/testing.md | 671 +++++++++++++++++++++++++++++ documentations/transactions.md | 653 ++++++++++++++++++++++++++++ 9 files changed, 4346 insertions(+) create mode 100644 documentations/README.md create mode 100644 documentations/api-reference.md create mode 100644 documentations/events.md create mode 100644 documentations/overview.md create mode 100644 documentations/projections.md create mode 100644 documentations/query-system.md create mode 100644 documentations/storage-adapters.md create mode 100644 documentations/testing.md create mode 100644 documentations/transactions.md diff --git a/documentations/README.md b/documentations/README.md new file mode 100644 index 00000000..f47dfbec --- /dev/null +++ b/documentations/README.md @@ -0,0 +1,162 @@ +# Event Store Library Documentation + +Welcome to the comprehensive documentation for the `logicblocks.event.store` library. This documentation provides detailed information about building event-sourced applications using this powerful and flexible event store implementation. + +## 📚 Documentation Structure + +### Getting Started +- **[Overview](./overview.md)** - High-level introduction with architecture diagrams and core concepts + - Library architecture and components + - Event sourcing fundamentals + - Key features and capabilities + - Usage patterns + +### Core Concepts +- **[Events and Event Types](./events.md)** - Deep dive into event modeling + - Event types (NewEvent, StoredEvent) + - Event design patterns + - Best practices for event modeling + - Event versioning strategies + +- **[Storage Adapters](./storage-adapters.md)** - Persistence layer documentation + - In-memory adapter for testing + - PostgreSQL adapter for production + - Creating custom adapters + - Performance optimization + +- **[Projections](./projections.md)** - State derivation from events + - Creating projectors + - Projection patterns + - Projection stores + - Performance optimization + +- **[Query System](./query-system.md)** - Finding and filtering data + - Query syntax and operations + - Advanced filtering + - Performance considerations + - Query patterns + +### Advanced Topics +- **[Transactions](./transactions.md)** - Coordination and consistency + - Transaction management + - Error handling strategies + - Retry patterns + - Distributed transactions + +- **[Testing](./testing.md)** - Testing strategies and utilities + - Test builders and generators + - Unit and integration testing + - Performance testing + - Best practices + +### Reference +- **[API Reference](./api-reference.md)** - Complete API documentation + - All classes and methods + - Type definitions + - Code examples + - Parameter details + +## 🚀 Quick Start + +### Installation +```bash +pip install logicblocks-event-store +``` + +### Basic Example +```python +import asyncio +from logicblocks.event.store import EventStore, adapters +from logicblocks.event.types import NewEvent + +async def main(): + # Create store with in-memory adapter + adapter = adapters.InMemoryEventStorageAdapter() + store = EventStore(adapter) + + # Get a stream reference + stream = store.stream(category="users", stream="user-123") + + # Publish an event + await stream.publish(events=[ + NewEvent( + name="user-registered", + payload={"email": "user@example.com"} + ) + ]) + + # Read events + async for event in stream.scan(): + print(f"{event.name}: {event.payload}") + +asyncio.run(main()) +``` + +## 📖 How to Use This Documentation + +1. **New to Event Sourcing?** Start with the [Overview](./overview.md) to understand the core concepts and architecture. + +2. **Building an Application?** Follow this path: + - [Events](./events.md) → [Storage Adapters](./storage-adapters.md) → [Projections](./projections.md) + +3. **Need Specific Information?** Use the [API Reference](./api-reference.md) for detailed class and method documentation. + +4. **Testing Your Application?** Check out [Testing](./testing.md) for utilities and strategies. + +## 🔍 Common Use Cases + +### Event Sourcing +Build applications where all changes are captured as events: +- [Event modeling patterns](./events.md#event-design-patterns) +- [Projection strategies](./projections.md#projection-patterns) +- [Query patterns](./query-system.md#query-patterns) + +### CQRS Implementation +Separate read and write models: +- [Event store for writes](./events.md#working-with-events) +- [Projection store for reads](./projections.md#projection-store) +- [Query system](./query-system.md) + +### Audit Logging +Track all changes with complete history: +- [Event metadata](./events.md#event-metadata) +- [Bi-temporal data](./events.md#event-types) +- [Stream scanning](./api-reference.md#eventstream) + +## 💡 Best Practices Summary + +1. **Event Design** + - Keep events immutable + - Use descriptive event names + - Include only relevant data in payloads + - Version events for evolution + +2. **Performance** + - Use appropriate storage adapters + - Implement efficient projections + - Optimize queries with indexes + - Consider caching strategies + +3. **Testing** + - Use in-memory adapter for unit tests + - Test projections independently + - Verify event ordering + - Performance test with realistic loads + +4. **Production** + - Use PostgreSQL adapter + - Implement monitoring + - Plan for event versioning + - Consider backup strategies + +## 🤝 Contributing + +This library is open source. For contribution guidelines and development setup, see the main project README. + +## 📄 License + +This documentation is part of the logicblocks.event.store project, distributed under the MIT License. + +--- + +*For the latest updates and additional resources, visit the [project repository](https://github.com/logicblocks/event.store).* \ No newline at end of file diff --git a/documentations/api-reference.md b/documentations/api-reference.md new file mode 100644 index 00000000..12ca013f --- /dev/null +++ b/documentations/api-reference.md @@ -0,0 +1,621 @@ +# API Reference + +## Overview + +This document provides a comprehensive reference for all public APIs in the event store library. Each section covers a specific module with detailed information about classes, methods, and parameters. + +## Core Classes + +### EventStore + +The main entry point for interacting with the event store. + +```python +class EventStore: + def __init__(self, adapter: EventStorageAdapter): + """ + Initialize the event store. + + Args: + adapter: Storage adapter instance + """ + + def stream(self, + category: str, + stream: str, + log: str = "default") -> EventStream: + """ + Get a reference to an event stream. + + Args: + category: Category name + stream: Stream name + log: Log name (default: "default") + + Returns: + EventStream instance + """ + + def category(self, + category: str, + log: str = "default") -> EventCategory: + """ + Get a reference to an event category. + + Args: + category: Category name + log: Log name (default: "default") + + Returns: + EventCategory instance + """ + + def log(self, log: str = "default") -> EventLog: + """ + Get a reference to an event log. + + Args: + log: Log name (default: "default") + + Returns: + EventLog instance + """ +``` + +### EventStream + +Represents a single event stream. + +```python +class EventStream: + @property + def identifier(self) -> StreamIdentifier: + """Get the stream identifier.""" + + async def publish(self, + events: List[NewEvent], + expected_position: Optional[int] = None, + write_conditions: Optional[List[WriteCondition]] = None) -> List[StoredEvent]: + """ + Publish events to the stream. + + Args: + events: List of events to publish + expected_position: Expected stream position for optimistic concurrency + write_conditions: Additional conditions that must be met + + Returns: + List of stored events + + Raises: + UnmetWriteConditionError: If conditions are not met + """ + + async def scan(self, + from_position: Optional[int] = None, + to_position: Optional[int] = None, + limit: Optional[int] = None, + event_names: Optional[List[str]] = None) -> AsyncIterator[StoredEvent]: + """ + Scan events from the stream. + + Args: + from_position: Starting position (inclusive) + to_position: Ending position (inclusive) + limit: Maximum number of events to return + event_names: Filter by event names + + Yields: + StoredEvent instances + """ + + async def get_info(self) -> Dict[str, Any]: + """ + Get stream information. + + Returns: + Dictionary with stream metadata + """ +``` + +## Event Types + +### NewEvent + +Represents an event that hasn't been stored yet. + +```python +@dataclass +class NewEvent: + name: str + payload: JSONType + metadata: JSONType = field(default_factory=dict) + occurred_at: Optional[datetime] = None + + def __post_init__(self): + """Validate and set defaults.""" +``` + +### StoredEvent + +Represents an event that has been persisted. + +```python +@dataclass +class StoredEvent: + id: str + stream: StreamIdentifier + name: str + payload: JSONType + metadata: JSONType + position: int + global_position: int + occurred_at: datetime + recorded_at: datetime +``` + +### StreamIdentifier + +Identifies a specific stream. + +```python +@dataclass +class StreamIdentifier: + log: str + category: str + stream: str + + def __str__(self) -> str: + """String representation: log/category/stream""" +``` + +## Storage Adapters + +### EventStorageAdapter (Abstract Base) + +```python +class EventStorageAdapter(ABC): + @abstractmethod + async def append_to_stream(self, + stream: StreamIdentifier, + events: List[NewEvent], + expected_position: Optional[int] = None) -> List[StoredEvent]: + """Append events to a stream.""" + + @abstractmethod + async def scan_stream(self, + stream: StreamIdentifier, + from_position: Optional[int] = None, + to_position: Optional[int] = None, + limit: Optional[int] = None) -> AsyncIterator[StoredEvent]: + """Scan events from a stream.""" + + @abstractmethod + async def scan_category(self, + log: str, + category: str, + from_global_position: Optional[int] = None, + to_global_position: Optional[int] = None, + limit: Optional[int] = None) -> AsyncIterator[StoredEvent]: + """Scan events from a category.""" + + @abstractmethod + async def scan_log(self, + log: str, + from_global_position: Optional[int] = None, + to_global_position: Optional[int] = None, + limit: Optional[int] = None) -> AsyncIterator[StoredEvent]: + """Scan events from a log.""" + + @abstractmethod + async def get_stream_info(self, + stream: StreamIdentifier) -> Dict[str, Any]: + """Get stream information.""" +``` + +### InMemoryEventStorageAdapter + +```python +class InMemoryEventStorageAdapter(EventStorageAdapter): + def __init__(self): + """Initialize in-memory storage.""" +``` + +### PostgresEventStorageAdapter + +```python +class PostgresEventStorageAdapter(EventStorageAdapter): + def __init__(self, + connection_source: ConnectionSource, + serializer: Optional[Serializer] = None, + serialization_constraint: Optional[SerializationConstraint] = None): + """ + Initialize PostgreSQL adapter. + + Args: + connection_source: Function that returns a database connection + serializer: Custom serializer (optional) + serialization_constraint: Write serialization strategy + """ +``` + +## Projections + +### Projector + +Base class for creating projectors. + +```python +class Projector(Generic[TSource, TState, TMetadata]): + @abstractmethod + def initial_state_factory(self) -> TState: + """Create initial state.""" + + @abstractmethod + def initial_metadata_factory(self) -> TMetadata: + """Create initial metadata.""" + + @abstractmethod + def id_factory(self, state: TState, source: TSource) -> str: + """Generate projection ID.""" + + async def project(self, source: EventSource) -> Projection[TState, TMetadata]: + """ + Project events from source. + + Args: + source: Event source to project from + + Returns: + Projection instance + """ + + def project_event(self, + state: TState, + metadata: TMetadata, + event: StoredEvent) -> Tuple[TState, TMetadata]: + """ + Project a single event. + + Args: + state: Current state + metadata: Current metadata + event: Event to project + + Returns: + Tuple of (new_state, new_metadata) + """ +``` + +### Projection + +```python +@dataclass +class Projection(Generic[TState, TMetadata]): + id: str + state: TState + metadata: TMetadata +``` + +### ProjectionStore + +```python +class ProjectionStore: + def __init__(self, adapter: ProjectionStorageAdapter): + """Initialize projection store.""" + + async def save(self, projection: Projection) -> None: + """Save a projection.""" + + async def get(self, id: str) -> Optional[Projection]: + """Get projection by ID.""" + + async def query(self, query: Query) -> AsyncIterator[Projection]: + """Query projections.""" + + async def delete(self, id: str) -> None: + """Delete a projection.""" +``` + +## Query System + +### Query + +```python +@dataclass +class Query: + filter: Optional[FilterClause] = None + order_by: Optional[List[Tuple[str, str]]] = None + limit: Optional[int] = None + offset: Optional[int] = None +``` + +### where + +Factory function for creating filter clauses. + +```python +def where(field: str) -> FieldReference: + """ + Create a field reference for filtering. + + Args: + field: Field path (supports dot notation) + + Returns: + FieldReference that supports comparison operations + + Example: + filter = where("state.status") == "active" + """ +``` + +### FilterClause Operations + +```python +# Equality +where("field") == value +where("field") != value + +# Comparison +where("field") > value +where("field") >= value +where("field") < value +where("field") <= value + +# Contains +where("field").contains(value) + +# Null checks +where("field").is_null() +where("field").is_not_null() + +# Logical operations +filter1 & filter2 # AND +filter1 | filter2 # OR +~filter # NOT +``` + +## Transactions + +### event_store_transaction + +```python +@contextmanager +async def event_store_transaction(store: EventStore) -> EventStoreTransaction: + """ + Create a transaction context. + + Args: + store: Event store instance + + Yields: + Transaction instance + + Example: + async with event_store_transaction(store) as tx: + await tx.stream("cat", "stream").publish(events) + """ +``` + +### Retry Decorators + +```python +def retry_on_error(max_attempts: int = 3, + backoff_factor: float = 2.0, + max_delay: float = 60.0): + """ + Retry on any error. + + Args: + max_attempts: Maximum retry attempts + backoff_factor: Exponential backoff factor + max_delay: Maximum delay between retries + """ + +def retry_on_unmet_condition_error(max_attempts: int = 3, + backoff_factor: float = 2.0): + """Retry only on write condition errors.""" + +def ignore_on_error(): + """Ignore all errors (use carefully).""" + +def ignore_on_unmet_condition_error(): + """Ignore only write condition errors.""" +``` + +## Write Conditions + +### Built-in Conditions + +```python +def stream_empty() -> WriteCondition: + """Stream must be empty.""" + +def stream_not_empty() -> WriteCondition: + """Stream must not be empty.""" + +def stream_exists() -> WriteCondition: + """Stream must exist.""" + +def position_is(position: int) -> WriteCondition: + """Stream must be at specific position.""" + +def position_less_than(position: int) -> WriteCondition: + """Stream position must be less than value.""" + +def any_condition(*conditions: WriteCondition) -> WriteCondition: + """At least one condition must be true.""" + +def all_conditions(*conditions: WriteCondition) -> WriteCondition: + """All conditions must be true.""" +``` + +## Testing Utilities + +### EventBuilder + +```python +class EventBuilder: + def with_id(self, id: str) -> EventBuilder: + """Set event ID.""" + + def with_name(self, name: str) -> EventBuilder: + """Set event name.""" + + def with_stream(self, category: str, stream: str, log: str = "default") -> EventBuilder: + """Set stream identifier.""" + + def with_payload(self, payload: JSONType) -> EventBuilder: + """Set payload.""" + + def with_metadata(self, metadata: JSONType) -> EventBuilder: + """Set metadata.""" + + def with_position(self, position: int) -> EventBuilder: + """Set stream position.""" + + def with_global_position(self, global_position: int) -> EventBuilder: + """Set global position.""" + + def with_occurred_at(self, occurred_at: datetime) -> EventBuilder: + """Set occurrence time.""" + + def build(self) -> StoredEvent: + """Build the event.""" + + def build_many(self, count: int) -> List[StoredEvent]: + """Build multiple events.""" +``` + +### ProjectionBuilder + +```python +class ProjectionBuilder: + def with_id(self, id: str) -> ProjectionBuilder: + """Set projection ID.""" + + def with_state(self, state: Any) -> ProjectionBuilder: + """Set state.""" + + def with_metadata(self, metadata: Any) -> ProjectionBuilder: + """Set metadata.""" + + def build(self) -> Projection: + """Build the projection.""" +``` + +## Exceptions + +### UnmetWriteConditionError + +```python +class UnmetWriteConditionError(Exception): + """Raised when write conditions are not met.""" +``` + +### OptimisticConcurrencyError + +```python +class OptimisticConcurrencyError(UnmetWriteConditionError): + """Raised when expected version doesn't match.""" +``` + +## Type Definitions + +### JSONType + +```python +JSONType = Union[None, bool, int, float, str, List['JSONType'], Dict[str, 'JSONType']] +``` + +### ConnectionSource + +```python +ConnectionSource = Callable[[], Awaitable[Connection]] +``` + +## Utility Functions + +### Serialization + +```python +class Serializer(ABC): + @abstractmethod + def serialize(self, value: Any) -> str: + """Serialize value to string.""" + + @abstractmethod + def deserialize(self, value: str) -> Any: + """Deserialize string to value.""" +``` + +### Time Utilities + +```python +def now_utc() -> datetime: + """Get current UTC time with timezone.""" + +def ensure_timezone(dt: datetime) -> datetime: + """Ensure datetime has UTC timezone.""" +``` + +## Usage Examples + +### Basic Event Store Usage + +```python +# Initialize +adapter = InMemoryEventStorageAdapter() +store = EventStore(adapter) + +# Publish events +stream = store.stream(category="users", stream="user-123") +await stream.publish(events=[ + NewEvent(name="user-registered", payload={"email": "user@example.com"}) +]) + +# Read events +async for event in stream.scan(): + print(f"{event.name}: {event.payload}") +``` + +### Projection Example + +```python +# Define projector +class UserProjector(Projector[StreamIdentifier, dict, dict]): + def initial_state_factory(self) -> dict: + return {"events": []} + + def initial_metadata_factory(self) -> dict: + return {"version": 0} + + def id_factory(self, state, source) -> str: + return f"user:{source.stream}" + + def user_registered(self, state, event): + state["events"].append(event.name) + return state + +# Use projector +projector = UserProjector() +projection = await projector.project(stream) +``` + +### Query Example + +```python +# Query projections +query = Query( + filter=( + (where("state.status") == "active") & + (where("state.score") > 100) + ), + order_by=[("state.created_at", "desc")], + limit=10 +) + +async for projection in projection_store.query(query): + print(projection.id, projection.state) +``` \ No newline at end of file diff --git a/documentations/events.md b/documentations/events.md new file mode 100644 index 00000000..70a074f2 --- /dev/null +++ b/documentations/events.md @@ -0,0 +1,443 @@ +# Events and Event Types + +## Overview + +Events are the fundamental building blocks of the event store library. They represent facts that have occurred in your system and are immutable once stored. This document covers everything you need to know about working with events. + +## Event Model + +### Event Hierarchy + +```mermaid +graph TD + E[Event Base] --> NE[NewEvent] + E --> SE[StoredEvent] + + NE --> |publish| ES[Event Store] + ES --> |store| SE + + SE --> |fields| F1[id: str] + SE --> |fields| F2[name: str] + SE --> |fields| F3[stream: StreamIdentifier] + SE --> |fields| F4[payload: JSONType] + SE --> |fields| F5[metadata: JSONType] + SE --> |fields| F6[position: int] + SE --> |fields| F7[global_position: int] + SE --> |fields| F8[occurred_at: datetime] + SE --> |fields| F9[recorded_at: datetime] +``` + +### Event Types + +#### NewEvent + +A `NewEvent` represents an event that hasn't been stored yet. It contains: + +- **name**: The event type/name (e.g., "user-registered", "order-placed") +- **payload**: The event data as a JSON-serializable object +- **metadata**: Optional metadata as a JSON-serializable object +- **occurred_at**: Optional timestamp when the event occurred (defaults to now) + +```python +from logicblocks.event.types import NewEvent +from datetime import datetime + +# Basic event +event = NewEvent( + name="user-registered", + payload={ + "user_id": "123", + "email": "user@example.com", + "name": "John Doe" + } +) + +# Event with metadata +event = NewEvent( + name="order-placed", + payload={ + "order_id": "456", + "items": ["item1", "item2"], + "total": 99.99 + }, + metadata={ + "correlation_id": "789", + "causation_id": "012", + "user_agent": "Mozilla/5.0..." + } +) + +# Event with custom occurrence time +event = NewEvent( + name="payment-received", + payload={"amount": 50.00, "currency": "USD"}, + occurred_at=datetime(2024, 1, 1, 12, 0, 0) +) +``` + +#### StoredEvent + +A `StoredEvent` is an event that has been persisted to the event store. It includes all fields from `NewEvent` plus: + +- **id**: Unique identifier for the event +- **stream**: The stream identifier where the event is stored +- **position**: Position within the stream (0-based) +- **global_position**: Position within the entire log +- **recorded_at**: Timestamp when the event was recorded + +### Stream Identifiers + +Events are organized into streams using `StreamIdentifier`: + +```python +from logicblocks.event.types import StreamIdentifier + +# Create a stream identifier +stream_id = StreamIdentifier( + log="default", + category="users", + stream="user-123" +) + +# Access components +print(stream_id.log) # "default" +print(stream_id.category) # "users" +print(stream_id.stream) # "user-123" +``` + +## Event Design Patterns + +### 1. Event Naming + +Use consistent, descriptive names that indicate what happened: + +```python +# Good event names +"user-registered" +"order-placed" +"payment-received" +"inventory-updated" +"email-sent" + +# Avoid ambiguous names +"user-event" +"update" +"process" +``` + +### 2. Event Payload Design + +Keep payloads focused and include only relevant data: + +```python +# Good: Focused payload +NewEvent( + name="product-price-changed", + payload={ + "product_id": "SKU123", + "old_price": 19.99, + "new_price": 24.99, + "currency": "USD" + } +) + +# Avoid: Including entire aggregates +NewEvent( + name="product-updated", + payload={ + "product": { + "id": "SKU123", + "name": "Widget", + "description": "...", + "price": 24.99, + # ... 50 more fields + } + } +) +``` + +### 3. Event Metadata + +Use metadata for cross-cutting concerns: + +```python +NewEvent( + name="order-shipped", + payload={ + "order_id": "789", + "tracking_number": "1Z999AA1012345678" + }, + metadata={ + # Correlation across services + "correlation_id": "abc-123", + + # Causation chain + "causation_id": "def-456", + + # User context + "user_id": "user-123", + "tenant_id": "tenant-456", + + # Request context + "request_id": "req-789", + "ip_address": "192.168.1.1", + + # Version information + "schema_version": "1.0" + } +) +``` + +### 4. Event Versioning + +Handle event evolution gracefully: + +```python +# Version 1 +NewEvent( + name="user-profile-updated", + payload={ + "user_id": "123", + "name": "John Doe" + }, + metadata={"version": 1} +) + +# Version 2 (added field) +NewEvent( + name="user-profile-updated", + payload={ + "user_id": "123", + "name": "John Doe", + "email": "john@example.com" # New field + }, + metadata={"version": 2} +) + +# In your projector, handle both versions +def user_profile_updated(self, state, event): + version = event.metadata.get("version", 1) + + state["name"] = event.payload["name"] + + if version >= 2: + state["email"] = event.payload.get("email") + + return state +``` + +## Working with Events + +### Publishing Events + +```python +from logicblocks.event.store import EventStore +from logicblocks.event.types import NewEvent + +# Single event +await stream.publish(events=[ + NewEvent(name="user-logged-in", payload={"user_id": "123"}) +]) + +# Multiple events (atomic) +await stream.publish(events=[ + NewEvent(name="cart-created", payload={"cart_id": "456"}), + NewEvent(name="item-added", payload={"item_id": "789", "quantity": 2}), + NewEvent(name="item-added", payload={"item_id": "012", "quantity": 1}) +]) +``` + +### Reading Events + +```python +# Read all events from a stream +events = await stream.scan() + +# Read with filters +events = await stream.scan( + from_position=10, + to_position=20, + event_names=["order-placed", "order-shipped"] +) + +# Process events +async for event in events: + print(f"Event: {event.name}") + print(f"Position: {event.position}") + print(f"Payload: {event.payload}") +``` + +## Best Practices + +### 1. Event Immutability + +Never modify events after they're stored: + +```python +# DON'T: Try to modify stored events +event = await stream.get_event(id="123") +event.payload["status"] = "updated" # This won't work! + +# DO: Publish a new event +await stream.publish(events=[ + NewEvent( + name="status-updated", + payload={"old_status": "pending", "new_status": "completed"} + ) +]) +``` + +### 2. Event Granularity + +Keep events fine-grained and focused: + +```python +# Good: Specific events +events = [ + NewEvent(name="order-placed", payload={...}), + NewEvent(name="payment-processed", payload={...}), + NewEvent(name="order-confirmed", payload={...}) +] + +# Avoid: Generic events +NewEvent(name="order-updated", payload={"action": "placed"}) +``` + +### 3. Event Ordering + +Events within a stream are guaranteed to be ordered: + +```python +# Events published together maintain order +await stream.publish(events=[ + NewEvent(name="step-1", payload={}), + NewEvent(name="step-2", payload={}), + NewEvent(name="step-3", payload={}) +]) + +# Reading preserves order +events = await stream.scan() +# events[0].name == "step-1" +# events[1].name == "step-2" +# events[2].name == "step-3" +``` + +### 4. Event Sourcing + +Use events as the source of truth: + +```python +# Rebuild state from events +async def rebuild_user_profile(user_id: str): + stream = store.stream(category="users", stream=user_id) + events = await stream.scan() + + profile = {} + async for event in events: + if event.name == "user-registered": + profile.update(event.payload) + elif event.name == "profile-updated": + profile.update(event.payload) + elif event.name == "email-changed": + profile["email"] = event.payload["new_email"] + + return profile +``` + +## Common Patterns + +### Command Events + +Events that represent commands or intentions: + +```python +NewEvent( + name="transfer-money-requested", + payload={ + "from_account": "123", + "to_account": "456", + "amount": 100.00, + "currency": "USD" + } +) +``` + +### Notification Events + +Events that notify about state changes: + +```python +NewEvent( + name="account-balance-low", + payload={ + "account_id": "123", + "balance": 10.00, + "threshold": 50.00 + } +) +``` + +### Integration Events + +Events for system integration: + +```python +NewEvent( + name="customer-data-exported", + payload={ + "export_id": "789", + "record_count": 1000, + "destination": "data-warehouse" + }, + metadata={ + "integration": "etl-pipeline", + "schedule": "daily" + } +) +``` + +## Error Handling + +### Invalid Events + +The library validates events before storage: + +```python +try: + # This will fail - name is required + event = NewEvent(name="", payload={}) +except ValueError as e: + print(f"Invalid event: {e}") + +try: + # This will fail - payload must be JSON-serializable + event = NewEvent( + name="test", + payload={"func": lambda x: x} # Functions aren't serializable + ) +except TypeError as e: + print(f"Serialization error: {e}") +``` + +### Handling Missing Events + +```python +# Defensive projection +def order_shipped(self, state, event): + # Handle missing fields gracefully + tracking = event.payload.get("tracking_number", "PENDING") + carrier = event.payload.get("carrier", "UNKNOWN") + + state["shipping"] = { + "tracking": tracking, + "carrier": carrier, + "shipped_at": event.occurred_at + } + return state +``` + +## Next Steps + +- Learn about [Projections](./projections.md) to transform events into useful state +- Explore [Storage Adapters](./storage-adapters.md) for persistence options +- See [Query System](./query-system.md) for finding and filtering events \ No newline at end of file diff --git a/documentations/overview.md b/documentations/overview.md new file mode 100644 index 00000000..fd516f52 --- /dev/null +++ b/documentations/overview.md @@ -0,0 +1,196 @@ +# Event Store Library Overview + +## Introduction + +The `logicblocks.event.store` library is a comprehensive eventing infrastructure designed for building event-sourced architectures. It provides a robust foundation for implementing event-driven systems with support for event storage, projections, querying, and more. + +## Core Concepts + +### Architecture Overview + +```mermaid +graph TB + %% Core Components + ES[Event Store] + EA[Event Adapter] + EL[Event Log] + EC[Event Category] + EST[Event Stream] + + %% Storage Layer + MA[Memory Adapter] + PA[PostgreSQL Adapter] + SA[Storage Adapter Interface] + + %% Processing Layer + PR[Projector] + PS[Projection Store] + PSA[Projection Storage Adapter] + + %% Query Layer + QS[Query System] + + %% Events + E[Event] + NE[New Event] + SE[Stored Event] + + %% Relationships + ES --> SA + SA --> MA + SA --> PA + + ES --> EL + EL --> EC + EC --> EST + + EST --> E + E --> NE + E --> SE + + PR --> EST + PR --> PS + PS --> PSA + + QS --> ES + QS --> PS + + style ES fill:#f9f,stroke:#333,stroke-width:4px + style PR fill:#9ff,stroke:#333,stroke-width:2px + style QS fill:#ff9,stroke:#333,stroke-width:2px +``` + +### Event Model + +The library uses a hierarchical event model: + +```mermaid +graph TD + L[Log] --> C1[Category: profiles] + L --> C2[Category: orders] + L --> C3[Category: payments] + + C1 --> S1[Stream: user-123] + C1 --> S2[Stream: user-456] + + C2 --> S3[Stream: order-789] + C2 --> S4[Stream: order-012] + + S1 --> E1[Event: profile-created] + S1 --> E2[Event: profile-updated] + S1 --> E3[Event: email-changed] + + S3 --> E4[Event: order-placed] + S3 --> E5[Event: order-shipped] +``` + +### Event Lifecycle + +```mermaid +sequenceDiagram + participant App as Application + participant ES as Event Store + participant SA as Storage Adapter + participant DB as Database + + App->>ES: Create NewEvent + ES->>ES: Validate Event + ES->>ES: Apply Write Conditions + ES->>SA: Store Event + SA->>DB: Persist Event + DB-->>SA: Confirmation + SA-->>ES: StoredEvent + ES-->>App: Event Published +``` + +## Key Features + +### 1. Event Storage +- **Immutable and Append-Only**: Events are never modified once stored +- **Bi-temporal Support**: Tracks both occurrence time and recording time +- **Consistency Guarantees**: Optimistic concurrency control for stream updates +- **Write Conditions**: Extensible pre-condition evaluation system +- **Ordering Guarantees**: Serialized writes ensure consistent event ordering + +### 2. Storage Adapters +- **Pluggable Architecture**: Abstract base class for implementing custom adapters +- **In-Memory Adapter**: For testing and development +- **PostgreSQL Adapter**: Production-ready persistent storage +- **Extensible**: Easy to add support for other databases + +### 3. Projections +- **Event Reduction**: Transform event sequences into meaningful state +- **Metadata Tracking**: Automatic version and timestamp management +- **Projection Store**: Built-in storage and querying for projections +- **Custom Projectors**: Define your own projection logic + +### 4. Query System +- **Flexible Querying**: Rich query language for events and projections +- **Filtering**: Support for various filter operations +- **Ordering**: Control result ordering +- **Pagination**: Built-in support for paginated results + +### 5. Transaction Support +- **Retry Logic**: Automatic retry on transient failures +- **Error Handling**: Configurable error handling strategies +- **Batch Operations**: Support for atomic multi-event operations + +## Component Interaction + +```mermaid +graph LR + %% User interaction + U[User Code] --> ES[Event Store] + U --> PR[Projector] + + %% Event flow + ES --> |publish| EST[Event Stream] + EST --> |scan| E[Events] + + %% Projection flow + PR --> |project| EST + PR --> |store| PS[Projection Store] + + %% Query flow + U --> |query| QS[Query System] + QS --> ES + QS --> PS + + %% Storage + ES --> SA[Storage Adapter] + PS --> PSA[Projection Adapter] + + style U fill:#ffd,stroke:#333,stroke-width:2px + style ES fill:#f9f,stroke:#333,stroke-width:2px + style PR fill:#9ff,stroke:#333,stroke-width:2px +``` + +## Usage Patterns + +### 1. Event Sourcing Pattern +``` +Application → NewEvent → EventStore → StoredEvent → Projector → Projection → Query +``` + +### 2. CQRS Pattern +``` +Commands → EventStore (Write Model) +Queries → ProjectionStore (Read Model) +``` + +### 3. Event-Driven Architecture +``` +Service A → Event → EventStore → Event Handler → Service B +``` + +## Next Steps + +For detailed information on specific topics, see: + +- [Events and Event Types](./events.md) - Deep dive into event modeling +- [Storage Adapters](./storage-adapters.md) - Working with different storage backends +- [Projections](./projections.md) - Building and managing projections +- [Query System](./query-system.md) - Querying events and projections +- [Transactions](./transactions.md) - Transaction support and error handling +- [Testing](./testing.md) - Testing utilities and best practices +- [API Reference](./api-reference.md) - Complete API documentation \ No newline at end of file diff --git a/documentations/projections.md b/documentations/projections.md new file mode 100644 index 00000000..7e4bd72c --- /dev/null +++ b/documentations/projections.md @@ -0,0 +1,574 @@ +# Projections + +## Overview + +Projections transform event streams into meaningful state by reducing (folding) a sequence of events into a single value. They are a fundamental concept in event sourcing, allowing you to derive current state from historical events. This document covers how to create and use projections effectively. + +## Concepts + +### What is a Projection? + +```mermaid +graph LR + E1[Event 1] --> P[Projector] + E2[Event 2] --> P + E3[Event 3] --> P + E4[Event N] --> P + P --> S[State] + P --> M[Metadata] + S --> PR[Projection] + M --> PR +``` + +A projection consists of: +- **State**: The derived data from events +- **Metadata**: Information about the projection (version, timestamps, etc.) +- **ID**: Unique identifier for the projection + +### Projection Flow + +```mermaid +sequenceDiagram + participant ES as Event Stream + participant P as Projector + participant PS as Projection Store + participant App as Application + + App->>P: project(stream) + P->>ES: scan events + ES-->>P: events + P->>P: reduce events to state + P->>PS: store projection + PS-->>App: projection result +``` + +## Creating Projectors + +### Basic Projector + +```python +from logicblocks.event.projection import Projector +from logicblocks.event.types import StreamIdentifier + +class UserProfileProjector( + Projector[StreamIdentifier, dict, dict] +): + """ + Type parameters: + - StreamIdentifier: Source type + - dict: State type + - dict: Metadata type + """ + + def initial_state_factory(self) -> dict: + """Create initial state.""" + return { + "name": None, + "email": None, + "phone": None, + "address": None, + "created_at": None, + "last_login": None + } + + def initial_metadata_factory(self) -> dict: + """Create initial metadata.""" + return { + "version": 0, + "events_processed": 0 + } + + def id_factory(self, state: dict, source: StreamIdentifier) -> str: + """Generate projection ID.""" + return f"user-profile:{source.stream}" + + # Event handlers - method names match event names with underscores + def user_registered(self, state: dict, event) -> dict: + """Handle user-registered event.""" + state["name"] = event.payload["name"] + state["email"] = event.payload["email"] + state["created_at"] = event.occurred_at + return state + + def profile_updated(self, state: dict, event) -> dict: + """Handle profile-updated event.""" + state.update(event.payload) + return state + + def email_changed(self, state: dict, event) -> dict: + """Handle email-changed event.""" + state["email"] = event.payload["new_email"] + return state + + def user_logged_in(self, state: dict, event) -> dict: + """Handle user-logged-in event.""" + state["last_login"] = event.occurred_at + return state +``` + +### Using a Projector + +```python +from logicblocks.event.store import EventStore + +# Create projector +projector = UserProfileProjector() + +# Project from a stream +stream = store.stream(category="users", stream="user-123") +projection = await projector.project(source=stream) + +# Access projection data +print(projection.id) # "user-profile:user-123" +print(projection.state) # {"name": "John", "email": "john@example.com", ...} +print(projection.metadata) # {"version": 4, "events_processed": 4} +``` + +## Advanced Projector Features + +### Event Name Mapping + +Handle events with special characters or different naming conventions: + +```python +class OrderProjector(Projector[StreamIdentifier, dict, dict]): + def initial_state_factory(self) -> dict: + return {"status": "pending", "items": [], "total": 0} + + # Map event names to handler methods + @handles("order-placed") + def handle_order_placed(self, state: dict, event) -> dict: + state["status"] = "placed" + state["items"] = event.payload["items"] + return state + + @handles("payment-received") + def handle_payment(self, state: dict, event) -> dict: + state["status"] = "paid" + state["payment_amount"] = event.payload["amount"] + return state +``` + +### Metadata Tracking + +Track additional information about projections: + +```python +class TrackedProjector(Projector[StreamIdentifier, dict, dict]): + def initial_metadata_factory(self) -> dict: + return { + "version": 0, + "events_processed": 0, + "last_event_id": None, + "last_event_position": -1, + "projection_time": None + } + + def project_event(self, state: dict, metadata: dict, event) -> tuple[dict, dict]: + """Override to track metadata for each event.""" + # Update state using appropriate handler + new_state = super().project_event(state, metadata, event)[0] + + # Update metadata + metadata["version"] += 1 + metadata["events_processed"] += 1 + metadata["last_event_id"] = event.id + metadata["last_event_position"] = event.position + metadata["projection_time"] = datetime.now() + + return new_state, metadata +``` + +### Conditional Projection + +Project only specific events or conditions: + +```python +class ConditionalProjector(Projector[StreamIdentifier, dict, dict]): + def should_handle_event(self, event) -> bool: + """Filter events before processing.""" + # Only process events from specific time period + if event.occurred_at < datetime(2024, 1, 1): + return False + + # Only process specific event types + allowed_events = ["important-event", "critical-update"] + return event.name in allowed_events + + def project(self, source, **kwargs): + """Override project to filter events.""" + filtered_source = FilteredEventSource( + source, + filter_fn=self.should_handle_event + ) + return super().project(filtered_source, **kwargs) +``` + +## Projection Patterns + +### 1. Aggregate Projection + +Combine multiple related entities: + +```python +class CustomerOrdersProjector(Projector[StreamIdentifier, dict, dict]): + def initial_state_factory(self) -> dict: + return { + "customer_id": None, + "total_orders": 0, + "total_spent": 0.0, + "orders": [], + "loyalty_tier": "bronze" + } + + def order_placed(self, state: dict, event) -> dict: + state["customer_id"] = event.payload["customer_id"] + state["total_orders"] += 1 + state["total_spent"] += event.payload["total"] + state["orders"].append({ + "order_id": event.payload["order_id"], + "amount": event.payload["total"], + "date": event.occurred_at + }) + + # Update loyalty tier + if state["total_spent"] > 1000: + state["loyalty_tier"] = "gold" + elif state["total_spent"] > 500: + state["loyalty_tier"] = "silver" + + return state +``` + +### 2. Snapshot Projection + +Create point-in-time snapshots: + +```python +class InventorySnapshotProjector(Projector[StreamIdentifier, dict, dict]): + def __init__(self, snapshot_date: datetime): + self.snapshot_date = snapshot_date + super().__init__() + + def initial_state_factory(self) -> dict: + return { + "items": {}, + "total_value": 0.0, + "snapshot_date": self.snapshot_date + } + + def should_handle_event(self, event) -> bool: + # Only process events up to snapshot date + return event.occurred_at <= self.snapshot_date + + def item_added(self, state: dict, event) -> dict: + item_id = event.payload["item_id"] + quantity = event.payload["quantity"] + price = event.payload["price"] + + if item_id not in state["items"]: + state["items"][item_id] = {"quantity": 0, "price": price} + + state["items"][item_id]["quantity"] += quantity + state["total_value"] += quantity * price + return state +``` + +### 3. Time-Window Projection + +Project events within a specific time window: + +```python +class DailyActivityProjector(Projector[StreamIdentifier, dict, dict]): + def __init__(self, date: date): + self.date = date + self.start = datetime.combine(date, time.min) + self.end = datetime.combine(date, time.max) + super().__init__() + + def initial_state_factory(self) -> dict: + return { + "date": self.date, + "events_by_hour": {h: 0 for h in range(24)}, + "total_events": 0, + "unique_users": set() + } + + def should_handle_event(self, event) -> bool: + return self.start <= event.occurred_at <= self.end + + def user_action(self, state: dict, event) -> dict: + hour = event.occurred_at.hour + state["events_by_hour"][hour] += 1 + state["total_events"] += 1 + state["unique_users"].add(event.payload.get("user_id")) + return state +``` + +## Projection Store + +Store and query projections: + +```python +from logicblocks.event.projection.store import ProjectionStore +from logicblocks.event.query import where, Query + +# Create projection store +projection_store = ProjectionStore(adapter=storage_adapter) + +# Store projection +projection = await projector.project(source=stream) +await projection_store.save(projection) + +# Query projections +# Find by ID +projection = await projection_store.get(id="user-profile:user-123") + +# Query with filters +results = await projection_store.query( + Query( + filter=where("state.loyalty_tier") == "gold", + order_by=[("state.total_spent", "desc")], + limit=10 + ) +) + +# Update projection +projection.state["last_updated"] = datetime.now() +await projection_store.save(projection) +``` + +## Performance Optimization + +### 1. Incremental Projections + +Update projections incrementally instead of rebuilding: + +```python +class IncrementalProjector: + async def update_projection(self, stream, projection_store): + # Get existing projection + projection_id = f"profile:{stream.identifier.stream}" + existing = await projection_store.get(projection_id) + + if existing: + # Project only new events + last_position = existing.metadata.get("last_position", -1) + events = stream.scan(from_position=last_position + 1) + else: + # Full projection for new streams + events = stream.scan() + + # Continue projection from existing state + state = existing.state if existing else self.initial_state_factory() + metadata = existing.metadata if existing else self.initial_metadata_factory() + + async for event in events: + state, metadata = self.project_event(state, metadata, event) + + # Save updated projection + projection = Projection( + id=projection_id, + state=state, + metadata=metadata + ) + await projection_store.save(projection) +``` + +### 2. Cached Projections + +Cache frequently accessed projections: + +```python +from functools import lru_cache +import asyncio + +class CachedProjectionService: + def __init__(self, projector, store, cache_ttl=60): + self.projector = projector + self.store = store + self.cache_ttl = cache_ttl + self._cache = {} + self._cache_times = {} + + async def get_projection(self, stream_id: str) -> Projection: + # Check cache + if stream_id in self._cache: + cache_time = self._cache_times[stream_id] + if time.time() - cache_time < self.cache_ttl: + return self._cache[stream_id] + + # Load or create projection + projection_id = f"cached:{stream_id}" + projection = await self.store.get(projection_id) + + if not projection: + # Create new projection + stream = self.event_store.stream(stream=stream_id) + projection = await self.projector.project(stream) + await self.store.save(projection) + + # Update cache + self._cache[stream_id] = projection + self._cache_times[stream_id] = time.time() + + return projection +``` + +### 3. Parallel Projections + +Project multiple streams in parallel: + +```python +async def project_multiple_streams(projector, streams): + """Project multiple streams concurrently.""" + tasks = [ + projector.project(stream) + for stream in streams + ] + + projections = await asyncio.gather(*tasks) + return projections + +# Usage +streams = [ + store.stream(category="users", stream=f"user-{i}") + for i in range(100) +] + +projections = await project_multiple_streams( + UserProfileProjector(), + streams +) +``` + +## Testing Projections + +### Unit Testing Projectors + +```python +import pytest +from logicblocks.event.testing import EventBuilder + +class TestUserProfileProjector: + @pytest.fixture + def projector(self): + return UserProfileProjector() + + def test_initial_state(self, projector): + state = projector.initial_state_factory() + assert state["name"] is None + assert state["email"] is None + + def test_user_registered_event(self, projector): + # Create test event + event = EventBuilder() \ + .with_name("user-registered") \ + .with_payload({ + "name": "John Doe", + "email": "john@example.com" + }) \ + .build() + + # Project event + state = projector.initial_state_factory() + new_state = projector.user_registered(state, event) + + # Verify state + assert new_state["name"] == "John Doe" + assert new_state["email"] == "john@example.com" + + @pytest.mark.asyncio + async def test_full_projection(self, projector, event_store): + # Create events + stream = event_store.stream(category="users", stream="test-user") + await stream.publish(events=[ + NewEvent( + name="user-registered", + payload={"name": "Test User", "email": "test@example.com"} + ), + NewEvent( + name="email-changed", + payload={"new_email": "newemail@example.com"} + ) + ]) + + # Project + projection = await projector.project(stream) + + # Verify + assert projection.state["name"] == "Test User" + assert projection.state["email"] == "newemail@example.com" + assert projection.metadata["events_processed"] == 2 +``` + +## Best Practices + +### 1. Idempotent Projections + +Ensure projections produce the same result when run multiple times: + +```python +def balance_updated(self, state: dict, event) -> dict: + # Idempotent: set to specific value + state["balance"] = event.payload["new_balance"] + return state + + # NOT idempotent: relative change + # state["balance"] += event.payload["change"] +``` + +### 2. Error Handling + +Handle missing or invalid data gracefully: + +```python +def safe_projection(self, state: dict, event) -> dict: + try: + # Defensive data access + user_data = event.payload.get("user", {}) + state["name"] = user_data.get("name", "Unknown") + state["age"] = int(user_data.get("age", 0)) + except (ValueError, TypeError) as e: + # Log error but don't fail projection + logger.warning(f"Error projecting event {event.id}: {e}") + state["errors"] = state.get("errors", []) + state["errors"].append({ + "event_id": event.id, + "error": str(e) + }) + + return state +``` + +### 3. Projection Versioning + +Handle projection schema changes: + +```python +class VersionedProjector(Projector): + PROJECTION_VERSION = 2 + + def initial_metadata_factory(self) -> dict: + return { + "projection_version": self.PROJECTION_VERSION, + "schema_version": 1 + } + + def migrate_projection(self, projection): + """Migrate old projections to new schema.""" + version = projection.metadata.get("projection_version", 1) + + if version < 2: + # Migrate from v1 to v2 + projection.state["new_field"] = "default_value" + projection.metadata["projection_version"] = 2 + + return projection +``` + +## Next Steps + +- Explore the [Query System](./query-system.md) for finding projections +- Learn about [Transactions](./transactions.md) for coordinating updates +- See [Testing](./testing.md) for projection testing strategies \ No newline at end of file diff --git a/documentations/query-system.md b/documentations/query-system.md new file mode 100644 index 00000000..e9c498c9 --- /dev/null +++ b/documentations/query-system.md @@ -0,0 +1,541 @@ +# Query System + +## Overview + +The event store library provides a powerful and flexible query system for finding events and projections. The query system supports filtering, ordering, pagination, and complex conditions. This document covers how to effectively use the query capabilities. + +## Query Architecture + +```mermaid +graph TD + Q[Query] --> F[Filter Clauses] + Q --> O[Ordering] + Q --> P[Pagination] + + F --> EQ[Equality] + F --> COMP[Comparison] + F --> CONT[Contains] + F --> LOG[Logical] + + LOG --> AND[AND] + LOG --> OR[OR] + LOG --> NOT[NOT] + + Q --> ES[Event Store] + Q --> PS[Projection Store] + + ES --> R1[Event Results] + PS --> R2[Projection Results] +``` + +## Basic Queries + +### Querying Events + +```python +from logicblocks.event.query import Query, where + +# Query events from a stream +stream = store.stream(category="users", stream="user-123") + +# Get all events +events = await stream.scan() + +# Filter by event name +events = await stream.scan( + event_names=["user-registered", "profile-updated"] +) + +# Filter by position range +events = await stream.scan( + from_position=10, + to_position=20 +) + +# Limit results +events = await stream.scan(limit=100) +``` + +### Querying Categories + +```python +# Query all events in a category +category = store.category("users") + +# Scan with filters +events = await category.scan( + event_names=["user-registered"], + limit=50 +) + +# Time-based queries +from datetime import datetime, timedelta + +yesterday = datetime.now() - timedelta(days=1) +events = await category.scan( + from_occurred_at=yesterday, + to_occurred_at=datetime.now() +) +``` + +## Advanced Filtering + +### Using the Where Clause + +The `where` function creates flexible filter conditions: + +```python +from logicblocks.event.query import where + +# Equality +filter = where("payload.status") == "active" + +# Comparison +filter = where("payload.amount") > 100 +filter = where("payload.age") >= 18 +filter = where("payload.score") < 50 +filter = where("payload.discount") <= 0.2 + +# Contains +filter = where("payload.tags").contains("important") +filter = where("payload.email").contains("@example.com") + +# Null checks +filter = where("payload.deleted_at").is_null() +filter = where("payload.description").is_not_null() +``` + +### Combining Filters + +```python +# AND conditions +filter = ( + (where("payload.status") == "active") & + (where("payload.age") >= 18) & + (where("payload.country") == "US") +) + +# OR conditions +filter = ( + (where("payload.priority") == "high") | + (where("payload.urgent") == True) +) + +# Complex combinations +filter = ( + (where("payload.status") == "active") & + ( + (where("payload.role") == "admin") | + (where("payload.role") == "moderator") + ) +) + +# NOT conditions +filter = ~(where("payload.status") == "deleted") +``` + +### Nested Field Access + +Query nested fields using dot notation: + +```python +# Nested object fields +filter = where("payload.user.profile.email") == "john@example.com" +filter = where("payload.address.city") == "New York" +filter = where("metadata.correlation_id") == "abc-123" + +# Array access (if supported by storage) +filter = where("payload.items[0].name") == "Widget" +filter = where("payload.tags").contains("featured") +``` + +## Projection Queries + +### Basic Projection Queries + +```python +from logicblocks.event.projection.store import ProjectionStore + +projection_store = ProjectionStore(adapter) + +# Get by ID +projection = await projection_store.get("user-profile:user-123") + +# Query all projections +projections = await projection_store.query(Query()) + +# Query with filter +query = Query( + filter=where("state.status") == "active" +) +projections = await projection_store.query(query) +``` + +### Complex Projection Queries + +```python +# Multi-field filtering +query = Query( + filter=( + (where("state.total_orders") > 10) & + (where("state.loyalty_tier") == "gold") & + (where("state.last_order_date") > datetime(2024, 1, 1)) + ) +) + +# Query with ordering +query = Query( + filter=where("state.status") == "active", + order_by=[ + ("state.created_at", "desc"), + ("state.name", "asc") + ] +) + +# Pagination +query = Query( + filter=where("state.category") == "electronics", + order_by=[("state.price", "desc")], + limit=20, + offset=40 # Skip first 40 results +) +``` + +## Query Patterns + +### 1. Time-Based Queries + +```python +# Events in the last hour +one_hour_ago = datetime.now() - timedelta(hours=1) +recent_events = await stream.scan( + from_occurred_at=one_hour_ago +) + +# Events in a specific date range +start_date = datetime(2024, 1, 1) +end_date = datetime(2024, 1, 31) +january_events = await category.scan( + from_occurred_at=start_date, + to_occurred_at=end_date +) + +# Projections updated recently +query = Query( + filter=where("metadata.last_updated") > datetime.now() - timedelta(minutes=5) +) +``` + +### 2. Search Patterns + +```python +# Case-insensitive search (if supported) +query = Query( + filter=where("state.email").icontains("@EXAMPLE.com") +) + +# Prefix search +query = Query( + filter=where("state.sku").startswith("PROD-") +) + +# Range queries +query = Query( + filter=( + (where("state.price") >= 10.00) & + (where("state.price") <= 100.00) + ) +) +``` + +### 3. Aggregation Patterns + +While the query system doesn't directly support aggregation, you can implement it in your application: + +```python +async def count_by_status(projection_store): + """Count projections by status.""" + all_projections = await projection_store.query(Query()) + + counts = {} + async for projection in all_projections: + status = projection.state.get("status", "unknown") + counts[status] = counts.get(status, 0) + 1 + + return counts + +async def get_top_customers(projection_store, limit=10): + """Get top customers by total spent.""" + query = Query( + filter=where("state.total_spent") > 0, + order_by=[("state.total_spent", "desc")], + limit=limit + ) + + return await projection_store.query(query) +``` + +## Performance Optimization + +### 1. Efficient Filtering + +```python +# Use indexes effectively +# Filter on indexed fields first +query = Query( + filter=( + # Indexed field first + (where("state.category") == "electronics") & + # Then more specific filters + (where("state.brand") == "Apple") & + (where("state.in_stock") == True) + ) +) + +# Avoid complex operations on large datasets +# BAD: Loading all data then filtering in memory +all_projections = await projection_store.query(Query()) +filtered = [p for p in all_projections if complex_condition(p)] + +# GOOD: Filter at query level +query = Query(filter=build_filter_from_condition()) +filtered = await projection_store.query(query) +``` + +### 2. Pagination Strategy + +```python +async def paginate_results(store, page_size=100): + """Efficiently paginate through large result sets.""" + offset = 0 + + while True: + query = Query( + order_by=[("id", "asc")], # Consistent ordering + limit=page_size, + offset=offset + ) + + results = await store.query(query) + result_count = 0 + + async for item in results: + result_count += 1 + yield item + + if result_count < page_size: + break + + offset += page_size + +# Usage +async for projection in paginate_results(projection_store): + process_projection(projection) +``` + +### 3. Cursor-Based Pagination + +For better performance with large datasets: + +```python +async def cursor_paginate(store, page_size=100): + """Cursor-based pagination for better performance.""" + last_id = None + + while True: + if last_id: + filter_clause = where("id") > last_id + else: + filter_clause = None + + query = Query( + filter=filter_clause, + order_by=[("id", "asc")], + limit=page_size + ) + + results = await store.query(query) + result_count = 0 + + async for item in results: + result_count += 1 + last_id = item.id + yield item + + if result_count < page_size: + break +``` + +## Query Builders + +Create reusable query builders for common patterns: + +```python +class QueryBuilder: + """Fluent query builder.""" + + def __init__(self): + self._filters = [] + self._order_by = [] + self._limit = None + self._offset = None + + def where(self, field, operator, value): + if operator == "==": + self._filters.append(where(field) == value) + elif operator == ">": + self._filters.append(where(field) > value) + elif operator == "contains": + self._filters.append(where(field).contains(value)) + return self + + def and_where(self, field, operator, value): + return self.where(field, operator, value) + + def order_by(self, field, direction="asc"): + self._order_by.append((field, direction)) + return self + + def limit(self, limit): + self._limit = limit + return self + + def offset(self, offset): + self._offset = offset + return self + + def build(self): + filter_clause = None + if self._filters: + filter_clause = self._filters[0] + for f in self._filters[1:]: + filter_clause = filter_clause & f + + return Query( + filter=filter_clause, + order_by=self._order_by, + limit=self._limit, + offset=self._offset + ) + +# Usage +query = (QueryBuilder() + .where("state.status", "==", "active") + .and_where("state.priority", ">", 5) + .order_by("state.created_at", "desc") + .limit(10) + .build()) +``` + +## Testing Queries + +### Unit Testing + +```python +import pytest +from logicblocks.event.query import Query, where + +class TestQueries: + def test_equality_filter(self): + filter = where("status") == "active" + assert filter.field == "status" + assert filter.operator == "==" + assert filter.value == "active" + + def test_combined_filters(self): + filter = ( + (where("age") >= 18) & + (where("country") == "US") + ) + assert filter.operator == "AND" + assert len(filter.clauses) == 2 + + @pytest.mark.asyncio + async def test_projection_query(self, projection_store): + # Create test projections + await projection_store.save( + Projection( + id="test-1", + state={"status": "active", "score": 100} + ) + ) + await projection_store.save( + Projection( + id="test-2", + state={"status": "inactive", "score": 50} + ) + ) + + # Query active projections + query = Query(filter=where("state.status") == "active") + results = list(await projection_store.query(query)) + + assert len(results) == 1 + assert results[0].id == "test-1" +``` + +## Best Practices + +### 1. Use Specific Filters + +```python +# GOOD: Specific filters reduce data transfer +query = Query( + filter=( + (where("state.category") == "electronics") & + (where("state.brand") == "Apple") & + (where("state.model") == "iPhone 15") + ) +) + +# BAD: Loading all data and filtering in memory +all_items = await store.query(Query()) +iphones = [i for i in all_items if is_iphone_15(i)] +``` + +### 2. Index Considerations + +```python +# Design queries around your indexes +# If you have an index on (category, created_at) +query = Query( + filter=where("state.category") == "orders", + order_by=[("state.created_at", "desc")] +) +``` + +### 3. Query Monitoring + +```python +import time +import logging + +logger = logging.getLogger(__name__) + +async def monitored_query(store, query): + """Execute query with performance monitoring.""" + start_time = time.time() + result_count = 0 + + try: + results = await store.query(query) + async for item in results: + result_count += 1 + yield item + finally: + duration = time.time() - start_time + logger.info( + f"Query completed: {result_count} results in {duration:.2f}s", + extra={ + "query_filter": str(query.filter), + "result_count": result_count, + "duration": duration + } + ) +``` + +## Next Steps + +- Learn about [Transactions](./transactions.md) for coordinated operations +- Explore [Testing](./testing.md) for query testing strategies +- See [API Reference](./api-reference.md) for complete query API \ No newline at end of file diff --git a/documentations/storage-adapters.md b/documentations/storage-adapters.md new file mode 100644 index 00000000..35a7ce36 --- /dev/null +++ b/documentations/storage-adapters.md @@ -0,0 +1,485 @@ +# Storage Adapters + +## Overview + +Storage adapters provide the persistence layer for the event store. The library uses an adapter pattern that allows you to switch between different storage backends without changing your application code. This document covers the available adapters and how to create custom ones. + +## Architecture + +```mermaid +classDiagram + class EventStorageAdapter { + <> + +append_to_stream() + +scan_stream() + +scan_category() + +scan_log() + +get_stream_info() + } + + class InMemoryEventStorageAdapter { + -events: dict + -locks: dict + +append_to_stream() + +scan_stream() + +scan_category() + +scan_log() + } + + class PostgresEventStorageAdapter { + -connection_source + -serializer + +append_to_stream() + +scan_stream() + +scan_category() + +scan_log() + } + + class EventStore { + -adapter: EventStorageAdapter + +stream() + +category() + +log() + } + + EventStorageAdapter <|-- InMemoryEventStorageAdapter + EventStorageAdapter <|-- PostgresEventStorageAdapter + EventStore --> EventStorageAdapter +``` + +## Available Adapters + +### In-Memory Adapter + +The in-memory adapter stores events in memory and is perfect for: +- Unit testing +- Development +- Prototyping +- Temporary event storage + +```python +from logicblocks.event.store import EventStore +from logicblocks.event.store.adapters import InMemoryEventStorageAdapter + +# Create adapter and store +adapter = InMemoryEventStorageAdapter() +store = EventStore(adapter) + +# Use the store normally +stream = store.stream(category="users", stream="user-123") +await stream.publish(events=[...]) +``` + +**Characteristics:** +- ✅ Fast performance +- ✅ No external dependencies +- ✅ Thread-safe +- ❌ Data lost on restart +- ❌ Not suitable for production + +### PostgreSQL Adapter + +The PostgreSQL adapter provides durable storage backed by PostgreSQL: + +```python +from logicblocks.event.store.adapters import PostgresEventStorageAdapter +from logicblocks.event.persistence import ConnectionSource +import asyncpg + +# Create connection source +async def create_connection(): + return await asyncpg.connect( + host="localhost", + port=5432, + user="eventstore", + password="password", + database="events" + ) + +connection_source = ConnectionSource(create_connection) + +# Create adapter with connection source +adapter = PostgresEventStorageAdapter( + connection_source=connection_source +) + +# Create store +store = EventStore(adapter) +``` + +**Characteristics:** +- ✅ Durable storage +- ✅ ACID compliance +- ✅ Production ready +- ✅ Supports concurrent access +- ✅ Built-in indexing +- ❌ Requires PostgreSQL + +#### Database Schema + +The PostgreSQL adapter uses the following schema: + +```sql +-- Events table +CREATE TABLE events ( + id UUID PRIMARY KEY, + log VARCHAR(255) NOT NULL, + category VARCHAR(255) NOT NULL, + stream VARCHAR(255) NOT NULL, + name VARCHAR(255) NOT NULL, + payload JSONB NOT NULL, + metadata JSONB, + position INTEGER NOT NULL, + global_position BIGSERIAL, + occurred_at TIMESTAMPTZ NOT NULL, + recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + -- Unique constraint for position within stream + CONSTRAINT unique_stream_position + UNIQUE (log, category, stream, position) +); + +-- Indexes for efficient querying +CREATE INDEX idx_events_stream + ON events (log, category, stream, position); +CREATE INDEX idx_events_category + ON events (log, category, global_position); +CREATE INDEX idx_events_log + ON events (log, global_position); +CREATE INDEX idx_events_name + ON events (name); +CREATE INDEX idx_events_occurred_at + ON events (occurred_at); +``` + +#### Connection Management + +```python +# Using connection pooling +async def create_pool(): + return await asyncpg.create_pool( + host="localhost", + port=5432, + user="eventstore", + password="password", + database="events", + min_size=10, + max_size=20 + ) + +pool = await create_pool() +connection_source = ConnectionSource(lambda: pool.acquire()) + +# With environment variables +import os + +async def create_connection(): + return await asyncpg.connect( + host=os.getenv("DB_HOST", "localhost"), + port=int(os.getenv("DB_PORT", 5432)), + user=os.getenv("DB_USER"), + password=os.getenv("DB_PASSWORD"), + database=os.getenv("DB_NAME", "events") + ) +``` + +## Adapter Configuration + +### Serialization + +Both adapters support custom serialization: + +```python +from logicblocks.event.types.conversion import Serializer +import json +from datetime import datetime + +class CustomSerializer(Serializer): + def serialize(self, value): + # Custom serialization logic + if isinstance(value, datetime): + return value.isoformat() + return json.dumps(value) + + def deserialize(self, value): + # Custom deserialization logic + return json.loads(value) + +# Use with PostgreSQL adapter +adapter = PostgresEventStorageAdapter( + connection_source=connection_source, + serializer=CustomSerializer() +) +``` + +### Write Serialization + +Control concurrent writes with serialization constraints: + +```python +from logicblocks.event.store import constraints + +# Serialize at stream level (default) +adapter = PostgresEventStorageAdapter( + connection_source=connection_source, + serialization_constraint=constraints.stream_serialization() +) + +# Serialize at category level +adapter = PostgresEventStorageAdapter( + connection_source=connection_source, + serialization_constraint=constraints.category_serialization() +) + +# Serialize at log level +adapter = PostgresEventStorageAdapter( + connection_source=connection_source, + serialization_constraint=constraints.log_serialization() +) + +# No serialization (parallel writes) +adapter = PostgresEventStorageAdapter( + connection_source=connection_source, + serialization_constraint=constraints.no_serialization() +) +``` + +## Creating Custom Adapters + +To create a custom storage adapter, inherit from `EventStorageAdapter`: + +```python +from logicblocks.event.store.adapters import EventStorageAdapter +from logicblocks.event.types import StoredEvent, NewEvent, StreamIdentifier +from typing import List, Optional, AsyncIterator +import uuid +from datetime import datetime + +class CustomStorageAdapter(EventStorageAdapter): + def __init__(self): + # Initialize your storage backend + self.storage = YourStorageBackend() + + async def append_to_stream( + self, + stream: StreamIdentifier, + events: List[NewEvent], + expected_position: Optional[int] = None + ) -> List[StoredEvent]: + """Append events to a stream.""" + stored_events = [] + + # Get current stream position + current_position = await self._get_stream_position(stream) + + # Check expected position if provided + if expected_position is not None and expected_position != current_position: + raise OptimisticConcurrencyError( + f"Expected position {expected_position}, " + f"but stream at position {current_position}" + ) + + # Store each event + for event in events: + position = current_position + 1 + global_position = await self._get_next_global_position() + + stored_event = StoredEvent( + id=str(uuid.uuid4()), + stream=stream, + name=event.name, + payload=event.payload, + metadata=event.metadata, + position=position, + global_position=global_position, + occurred_at=event.occurred_at or datetime.now(), + recorded_at=datetime.now() + ) + + await self.storage.save(stored_event) + stored_events.append(stored_event) + current_position = position + + return stored_events + + async def scan_stream( + self, + stream: StreamIdentifier, + from_position: Optional[int] = None, + to_position: Optional[int] = None, + limit: Optional[int] = None + ) -> AsyncIterator[StoredEvent]: + """Scan events from a stream.""" + query = self.storage.query() + query.filter( + log=stream.log, + category=stream.category, + stream=stream.stream + ) + + if from_position is not None: + query.filter(position__gte=from_position) + + if to_position is not None: + query.filter(position__lte=to_position) + + if limit is not None: + query.limit(limit) + + async for event_data in query.execute(): + yield self._deserialize_event(event_data) + + async def scan_category( + self, + log: str, + category: str, + # ... other parameters + ) -> AsyncIterator[StoredEvent]: + """Scan events from a category.""" + # Implementation similar to scan_stream + pass + + async def scan_log( + self, + log: str, + # ... other parameters + ) -> AsyncIterator[StoredEvent]: + """Scan events from entire log.""" + # Implementation similar to scan_stream + pass + + async def get_stream_info( + self, + stream: StreamIdentifier + ) -> dict: + """Get information about a stream.""" + count = await self.storage.count( + log=stream.log, + category=stream.category, + stream=stream.stream + ) + + return { + "stream": stream, + "event_count": count, + "position": count - 1 if count > 0 else -1 + } +``` + +### Adapter Requirements + +Your custom adapter must implement: + +1. **`append_to_stream`**: Atomically append events to a stream +2. **`scan_stream`**: Read events from a specific stream +3. **`scan_category`**: Read events from all streams in a category +4. **`scan_log`**: Read events from the entire log +5. **`get_stream_info`**: Get metadata about a stream + +### Best Practices for Custom Adapters + +1. **Atomicity**: Ensure all events in a batch are stored atomically +2. **Ordering**: Maintain strict ordering within streams +3. **Concurrency**: Handle concurrent writes appropriately +4. **Performance**: Implement efficient indexing for scans +5. **Error Handling**: Provide clear error messages + +## Testing Storage Adapters + +The library provides test utilities for adapter implementations: + +```python +from logicblocks.event.testing import EventStorageAdapterTestCase +import pytest + +class TestCustomAdapter(EventStorageAdapterTestCase): + @pytest.fixture + async def adapter(self): + # Create and return your adapter instance + adapter = CustomStorageAdapter() + yield adapter + # Cleanup if needed + await adapter.cleanup() + + # The base class provides comprehensive tests + # for adapter compliance +``` + +## Performance Considerations + +### In-Memory Adapter + +- **Memory Usage**: O(n) where n is number of events +- **Write Performance**: O(1) amortized +- **Read Performance**: O(n) for scans, can be optimized with indexing + +### PostgreSQL Adapter + +- **Write Performance**: Depends on disk I/O and indexes +- **Read Performance**: Optimized with indexes +- **Batch Operations**: Use transactions for better performance + +```python +# Batch writes with transaction +async with adapter.transaction() as tx: + for i in range(1000): + await stream.publish( + events=[NewEvent(name=f"event-{i}", payload={})], + transaction=tx + ) +``` + +### Optimization Tips + +1. **Use appropriate indexes** for your query patterns +2. **Batch writes** when possible +3. **Use connection pooling** for PostgreSQL +4. **Consider partitioning** for very large event stores +5. **Monitor query performance** and optimize as needed + +## Migration Between Adapters + +To migrate from one adapter to another: + +```python +async def migrate_events(source_store, target_store): + # Get all categories + source_log = source_store.log() + + # Scan all events + events = source_log.scan() + + # Group by stream + streams = {} + async for event in events: + stream_id = event.stream + if stream_id not in streams: + streams[stream_id] = [] + streams[stream_id].append(event) + + # Replay to target store + for stream_id, stream_events in streams.items(): + target_stream = target_store.stream( + category=stream_id.category, + stream=stream_id.stream + ) + + # Convert StoredEvents back to NewEvents + new_events = [ + NewEvent( + name=e.name, + payload=e.payload, + metadata=e.metadata, + occurred_at=e.occurred_at + ) + for e in stream_events + ] + + await target_stream.publish(events=new_events) +``` + +## Next Steps + +- Learn about [Projections](./projections.md) for event processing +- Explore the [Query System](./query-system.md) for finding events +- See [Transactions](./transactions.md) for advanced write patterns \ No newline at end of file diff --git a/documentations/testing.md b/documentations/testing.md new file mode 100644 index 00000000..73a8bb4d --- /dev/null +++ b/documentations/testing.md @@ -0,0 +1,671 @@ +# Testing + +## Overview + +Testing event-sourced systems requires specialized approaches and utilities. The event store library provides comprehensive testing support including builders, data generators, and test base classes. This document covers testing strategies, utilities, and best practices. + +## Testing Architecture + +```mermaid +graph TD + T[Testing Tools] --> B[Builders] + T --> G[Generators] + T --> M[Mocks] + T --> TC[Test Cases] + + B --> EB[Event Builder] + B --> PB[Projection Builder] + B --> SB[Stream Builder] + + G --> RD[Random Data] + G --> FX[Fixtures] + + TC --> UT[Unit Tests] + TC --> IT[Integration Tests] + TC --> AT[Adapter Tests] +``` + +## Test Utilities + +### Event Builder + +Build test events fluently: + +```python +from logicblocks.event.testing import EventBuilder +from datetime import datetime + +# Basic event +event = (EventBuilder() + .with_name("user-registered") + .with_payload({"email": "test@example.com"}) + .build()) + +# Complex event +event = (EventBuilder() + .with_id("test-event-123") + .with_name("order-placed") + .with_stream("orders", "order-456") + .with_payload({ + "items": [ + {"sku": "ABC123", "quantity": 2}, + {"sku": "XYZ789", "quantity": 1} + ], + "total": 99.99 + }) + .with_metadata({ + "user_id": "user-123", + "ip_address": "192.168.1.1" + }) + .with_position(42) + .with_global_position(1000) + .with_occurred_at(datetime(2024, 1, 1, 12, 0, 0)) + .build()) + +# Build multiple events +events = (EventBuilder() + .with_name("item-added") + .with_payload_template({"item_id": "item-{}", "quantity": 1}) + .build_many(5)) # Creates 5 events with item-0 through item-4 +``` + +### Projection Builder + +Build test projections: + +```python +from logicblocks.event.testing import ProjectionBuilder + +# Simple projection +projection = (ProjectionBuilder() + .with_id("user-profile:user-123") + .with_state({ + "name": "Test User", + "email": "test@example.com" + }) + .build()) + +# Complex projection with metadata +projection = (ProjectionBuilder() + .with_id("order-summary:2024-01") + .with_state({ + "total_orders": 150, + "total_revenue": 12500.00, + "average_order_value": 83.33 + }) + .with_metadata({ + "version": 2, + "last_event_position": 500, + "updated_at": datetime.now() + }) + .build()) +``` + +### Data Generators + +Generate random test data: + +```python +from logicblocks.event.testing import generators + +# Random event data +random_event = generators.random_event() +random_payload = generators.random_payload() +random_metadata = generators.random_metadata() + +# Random identifiers +stream_id = generators.random_stream_identifier() +event_id = generators.random_event_id() + +# Random values +email = generators.random_email() +user_id = generators.random_user_id() +amount = generators.random_amount(min=10.0, max=1000.0) +timestamp = generators.random_timestamp( + start=datetime(2024, 1, 1), + end=datetime(2024, 12, 31) +) + +# Batch generation +events = generators.random_events(count=100) +users = generators.random_users(count=50) +``` + +## Unit Testing + +### Testing Event Handlers + +```python +import pytest +from unittest.mock import Mock, AsyncMock + +class TestOrderHandlers: + @pytest.fixture + def handler(self): + return OrderEventHandler() + + def test_order_placed_handler(self, handler): + # Create test event + event = (EventBuilder() + .with_name("order-placed") + .with_payload({ + "order_id": "123", + "customer_id": "456", + "total": 99.99 + }) + .build()) + + # Test handler + result = handler.handle_order_placed(event) + + # Verify + assert result.order_id == "123" + assert result.status == "pending" + assert result.total == 99.99 +``` + +### Testing Projectors + +```python +class TestUserProjector: + @pytest.fixture + def projector(self): + return UserProfileProjector() + + def test_initial_state(self, projector): + state = projector.initial_state_factory() + assert state == { + "name": None, + "email": None, + "created_at": None + } + + def test_user_registered_projection(self, projector): + # Initial state + state = projector.initial_state_factory() + + # Create event + event = (EventBuilder() + .with_name("user-registered") + .with_payload({ + "name": "John Doe", + "email": "john@example.com" + }) + .with_occurred_at(datetime(2024, 1, 1)) + .build()) + + # Project + new_state = projector.user_registered(state, event) + + # Verify + assert new_state["name"] == "John Doe" + assert new_state["email"] == "john@example.com" + assert new_state["created_at"] == datetime(2024, 1, 1) + + @pytest.mark.asyncio + async def test_full_projection_flow(self, projector): + # Create event sequence + events = [ + (EventBuilder() + .with_name("user-registered") + .with_payload({"name": "Jane", "email": "jane@example.com"}) + .with_position(0) + .build()), + (EventBuilder() + .with_name("profile-updated") + .with_payload({"bio": "Software Engineer"}) + .with_position(1) + .build()), + (EventBuilder() + .with_name("email-changed") + .with_payload({"new_email": "jane.doe@example.com"}) + .with_position(2) + .build()) + ] + + # Mock source + source = AsyncMock() + source.scan.return_value = AsyncIterator(events) + + # Project + projection = await projector.project(source) + + # Verify final state + assert projection.state["name"] == "Jane" + assert projection.state["email"] == "jane.doe@example.com" + assert projection.state["bio"] == "Software Engineer" + assert projection.metadata["events_processed"] == 3 +``` + +## Integration Testing + +### Testing with In-Memory Store + +```python +@pytest.fixture +async def event_store(): + """Create in-memory event store for testing.""" + adapter = InMemoryEventStorageAdapter() + store = EventStore(adapter) + yield store + # Cleanup if needed + +class TestEventStoreIntegration: + @pytest.mark.asyncio + async def test_publish_and_scan(self, event_store): + # Create stream + stream = event_store.stream( + category="test", + stream="integration" + ) + + # Publish events + events_to_publish = [ + NewEvent(name="test-1", payload={"value": 1}), + NewEvent(name="test-2", payload={"value": 2}), + NewEvent(name="test-3", payload={"value": 3}) + ] + + published = await stream.publish(events=events_to_publish) + + # Verify published events + assert len(published) == 3 + assert all(e.position is not None for e in published) + + # Scan events + scanned_events = [] + async for event in stream.scan(): + scanned_events.append(event) + + # Verify scanned events + assert len(scanned_events) == 3 + assert scanned_events[0].name == "test-1" + assert scanned_events[1].payload["value"] == 2 + assert scanned_events[2].position == 2 +``` + +### Testing Transactions + +```python +class TestTransactions: + @pytest.mark.asyncio + async def test_transaction_commit(self, event_store): + async with event_store_transaction(event_store) as tx: + stream1 = tx.stream(category="accounts", stream="acc-1") + stream2 = tx.stream(category="accounts", stream="acc-2") + + await stream1.publish(events=[ + NewEvent(name="debit", payload={"amount": 100}) + ]) + await stream2.publish(events=[ + NewEvent(name="credit", payload={"amount": 100}) + ]) + + # Verify both events were committed + events1 = list(await event_store.stream( + category="accounts", stream="acc-1" + ).scan()) + events2 = list(await event_store.stream( + category="accounts", stream="acc-2" + ).scan()) + + assert len(events1) == 1 + assert len(events2) == 1 + + @pytest.mark.asyncio + async def test_transaction_rollback(self, event_store): + with pytest.raises(ValueError): + async with event_store_transaction(event_store) as tx: + stream = tx.stream(category="test", stream="rollback") + + await stream.publish(events=[ + NewEvent(name="event-1", payload={}) + ]) + + # Force error + raise ValueError("Test error") + + # Verify no events were committed + events = list(await event_store.stream( + category="test", stream="rollback" + ).scan()) + assert len(events) == 0 +``` + +## Component Testing + +### Testing Storage Adapters + +```python +from logicblocks.event.testing import EventStorageAdapterTestCase + +class TestPostgresAdapter(EventStorageAdapterTestCase): + @pytest.fixture + async def adapter(self, postgres_connection): + """Provide adapter instance for base tests.""" + adapter = PostgresEventStorageAdapter( + connection_source=lambda: postgres_connection + ) + yield adapter + + # Cleanup + await postgres_connection.execute("TRUNCATE TABLE events") + + # Base class provides comprehensive adapter tests + # Add custom tests for PostgreSQL-specific features + + @pytest.mark.asyncio + async def test_postgres_json_querying(self, adapter): + """Test PostgreSQL-specific JSON queries.""" + stream = StreamIdentifier("log", "category", "stream") + + # Publish event with nested JSON + await adapter.append_to_stream( + stream=stream, + events=[ + NewEvent( + name="test", + payload={ + "user": { + "name": "John", + "tags": ["admin", "developer"] + } + } + ) + ] + ) + + # Query using PostgreSQL JSON operators + # (Implementation depends on adapter design) +``` + +### Testing Projections + +```python +class TestProjectionStore: + @pytest.fixture + async def projection_store(self, storage_adapter): + return ProjectionStore(adapter=storage_adapter) + + @pytest.mark.asyncio + async def test_save_and_retrieve(self, projection_store): + # Create projection + projection = (ProjectionBuilder() + .with_id("test-projection") + .with_state({"count": 42}) + .build()) + + # Save + await projection_store.save(projection) + + # Retrieve + retrieved = await projection_store.get("test-projection") + + # Verify + assert retrieved is not None + assert retrieved.state["count"] == 42 + + @pytest.mark.asyncio + async def test_query_projections(self, projection_store): + # Create test projections + projections = [ + (ProjectionBuilder() + .with_id(f"user-{i}") + .with_state({ + "status": "active" if i % 2 == 0 else "inactive", + "score": i * 10 + }) + .build()) + for i in range(10) + ] + + # Save all + for projection in projections: + await projection_store.save(projection) + + # Query active users + query = Query( + filter=where("state.status") == "active", + order_by=[("state.score", "desc")] + ) + + results = [] + async for projection in projection_store.query(query): + results.append(projection) + + # Verify + assert len(results) == 5 + assert results[0].state["score"] == 80 + assert all(p.state["status"] == "active" for p in results) +``` + +## Performance Testing + +### Load Testing + +```python +import asyncio +import time + +class TestPerformance: + @pytest.mark.asyncio + async def test_high_volume_writes(self, event_store): + """Test writing large number of events.""" + stream = event_store.stream(category="perf", stream="test") + + # Generate events + events = [ + NewEvent( + name="perf-test", + payload={"index": i, "data": "x" * 100} + ) + for i in range(1000) + ] + + # Measure write time + start_time = time.time() + await stream.publish(events=events) + write_duration = time.time() - start_time + + # Verify and measure read time + start_time = time.time() + read_events = [] + async for event in stream.scan(): + read_events.append(event) + read_duration = time.time() - start_time + + # Assertions + assert len(read_events) == 1000 + assert write_duration < 5.0 # Should write 1000 events in < 5s + assert read_duration < 2.0 # Should read 1000 events in < 2s + + # Report metrics + print(f"Write: {1000/write_duration:.2f} events/sec") + print(f"Read: {1000/read_duration:.2f} events/sec") + + @pytest.mark.asyncio + async def test_concurrent_writes(self, event_store): + """Test concurrent writes to different streams.""" + async def write_to_stream(stream_id: str, count: int): + stream = event_store.stream( + category="concurrent", + stream=stream_id + ) + + events = [ + NewEvent(name=f"event-{i}", payload={"i": i}) + for i in range(count) + ] + + await stream.publish(events=events) + + # Launch concurrent writes + start_time = time.time() + tasks = [ + write_to_stream(f"stream-{i}", 100) + for i in range(10) + ] + await asyncio.gather(*tasks) + duration = time.time() - start_time + + # Verify + for i in range(10): + stream = event_store.stream( + category="concurrent", + stream=f"stream-{i}" + ) + events = list(await stream.scan()) + assert len(events) == 100 + + print(f"Concurrent writes: {1000/duration:.2f} events/sec") +``` + +## Test Fixtures + +### Common Fixtures + +```python +@pytest.fixture +async def sample_events(): + """Generate sample events for testing.""" + return [ + (EventBuilder() + .with_name("user-registered") + .with_payload({"user_id": "123", "email": "user@example.com"}) + .build()), + (EventBuilder() + .with_name("user-verified") + .with_payload({"user_id": "123"}) + .build()), + (EventBuilder() + .with_name("subscription-created") + .with_payload({"user_id": "123", "plan": "premium"}) + .build()) + ] + +@pytest.fixture +async def populated_store(event_store, sample_events): + """Event store with pre-populated data.""" + stream = event_store.stream(category="users", stream="user-123") + await stream.publish(events=[ + NewEvent( + name=e.name, + payload=e.payload, + metadata=e.metadata + ) + for e in sample_events + ]) + return event_store + +@pytest.fixture +def mock_time(): + """Mock time for deterministic tests.""" + with patch('datetime.datetime') as mock_datetime: + mock_datetime.now.return_value = datetime(2024, 1, 1, 12, 0, 0) + mock_datetime.side_effect = lambda *args, **kw: datetime(*args, **kw) + yield mock_datetime +``` + +## Testing Best Practices + +### 1. Test Isolation + +```python +class TestWithIsolation: + @pytest.fixture(autouse=True) + async def setup_teardown(self): + """Ensure test isolation.""" + # Setup + self.test_id = str(uuid.uuid4()) + + yield + + # Teardown - clean up any test data + # This is automatic with in-memory adapter + # For persistent storage, implement cleanup + + def get_test_stream_id(self, suffix=""): + """Generate unique stream IDs for each test.""" + return f"test-{self.test_id}{suffix}" +``` + +### 2. Deterministic Tests + +```python +def test_deterministic_projection(): + """Use fixed data for deterministic results.""" + # Use builders with specific values + events = [ + (EventBuilder() + .with_id("fixed-id-1") + .with_occurred_at(datetime(2024, 1, 1)) + .with_position(0) + .build()), + (EventBuilder() + .with_id("fixed-id-2") + .with_occurred_at(datetime(2024, 1, 2)) + .with_position(1) + .build()) + ] + + # Test with known inputs and outputs + projector = MyProjector() + state = projector.initial_state_factory() + + for event in events: + state = projector.project_event(state, {}, event)[0] + + # Verify exact state + assert state == { + "expected": "exact_value", + "count": 2 + } +``` + +### 3. Property-Based Testing + +```python +from hypothesis import given, strategies as st + +class TestProperties: + @given( + events=st.lists( + st.builds( + lambda name, value: NewEvent( + name=name, + payload={"value": value} + ), + name=st.sampled_from(["add", "subtract", "multiply"]), + value=st.integers(min_value=1, max_value=100) + ), + min_size=1, + max_size=50 + ) + ) + @pytest.mark.asyncio + async def test_event_ordering_preserved(self, event_store, events): + """Events maintain order regardless of content.""" + stream = event_store.stream( + category="property", + stream="test" + ) + + # Publish events + await stream.publish(events=events) + + # Read back + read_events = list(await stream.scan()) + + # Verify ordering + assert len(read_events) == len(events) + for i, (original, read) in enumerate(zip(events, read_events)): + assert read.name == original.name + assert read.payload == original.payload + assert read.position == i +``` + +## Next Steps + +- See [API Reference](./api-reference.md) for testing API details +- Review [Best Practices](./best-practices.md) for production testing +- Explore [Examples](./examples.md) for real-world test scenarios \ No newline at end of file diff --git a/documentations/transactions.md b/documentations/transactions.md new file mode 100644 index 00000000..1bcbde53 --- /dev/null +++ b/documentations/transactions.md @@ -0,0 +1,653 @@ +# Transactions + +## Overview + +The event store library provides comprehensive transaction support for coordinating complex operations, handling errors gracefully, and ensuring consistency. This document covers transaction patterns, error handling strategies, and best practices for building reliable event-sourced systems. + +## Transaction Architecture + +```mermaid +graph TD + T[Transaction] --> C[Context Manager] + T --> R[Retry Logic] + T --> E[Error Handling] + + C --> B[Begin] + C --> CM[Commit] + C --> RB[Rollback] + + R --> EX[Exponential Backoff] + R --> MR[Max Retries] + + E --> CE[Condition Errors] + E --> TE[Transient Errors] + E --> PE[Permanent Errors] + + T --> ES[Event Store] + ES --> S[Success/Failure] +``` + +## Basic Transactions + +### Using Transaction Context + +```python +from logicblocks.event.store import event_store_transaction +from logicblocks.event.types import NewEvent + +# Basic transaction +async with event_store_transaction(store) as tx: + stream1 = tx.stream(category="accounts", stream="account-123") + stream2 = tx.stream(category="accounts", stream="account-456") + + # All operations are atomic + await stream1.publish(events=[ + NewEvent( + name="money-withdrawn", + payload={"amount": 100.00} + ) + ]) + + await stream2.publish(events=[ + NewEvent( + name="money-deposited", + payload={"amount": 100.00} + ) + ]) + + # Automatically commits on success + # Automatically rolls back on error +``` + +### Manual Transaction Control + +```python +# Manual transaction management +tx = await store.begin_transaction() + +try: + # Perform operations + await tx.stream("users", "user-123").publish(events=[...]) + await tx.stream("logs", "audit").publish(events=[...]) + + # Commit if successful + await tx.commit() +except Exception as e: + # Rollback on error + await tx.rollback() + raise +``` + +## Error Handling + +### Write Condition Errors + +Handle optimistic concurrency control: + +```python +from logicblocks.event.store import UnmetWriteConditionError +from logicblocks.event.store.conditions import stream_empty, stream_not_empty + +try: + # Ensure stream exists before appending + await stream.publish( + events=[NewEvent(name="update", payload={})], + expected_version=5 # Specific version expected + ) +except UnmetWriteConditionError as e: + # Handle version mismatch + print(f"Stream version mismatch: {e}") + + # Retry with updated expectations + current_version = await stream.version() + await stream.publish( + events=[NewEvent(name="update", payload={})], + expected_version=current_version + ) +``` + +### Retry Strategies + +```python +from logicblocks.event.store import ( + retry_on_error, + retry_on_unmet_condition_error +) + +# Retry on any error with exponential backoff +@retry_on_error(max_attempts=3, backoff_factor=2.0) +async def process_with_retry(): + stream = store.stream(category="orders", stream="order-123") + await stream.publish(events=[ + NewEvent(name="order-processed", payload={}) + ]) + +# Retry only on condition errors +@retry_on_unmet_condition_error(max_attempts=5) +async def create_if_not_exists(): + stream = store.stream(category="users", stream="user-456") + + # Try to create only if stream is empty + await stream.publish( + events=[NewEvent(name="user-created", payload={})], + write_conditions=[stream_empty()] + ) +``` + +### Ignore Strategies + +```python +from logicblocks.event.store import ( + ignore_on_error, + ignore_on_unmet_condition_error +) + +# Ignore all errors (use carefully!) +@ignore_on_error() +async def optional_logging(): + audit_stream = store.stream(category="audit", stream="optional") + await audit_stream.publish(events=[ + NewEvent(name="action-logged", payload={}) + ]) + +# Ignore only condition errors +@ignore_on_unmet_condition_error() +async def create_once(): + stream = store.stream(category="config", stream="settings") + await stream.publish( + events=[NewEvent(name="initialized", payload={})], + write_conditions=[stream_empty()] + ) +``` + +## Transaction Patterns + +### 1. Saga Pattern + +Coordinate multi-step processes: + +```python +class OrderSaga: + def __init__(self, store): + self.store = store + self.saga_id = str(uuid.uuid4()) + + async def execute(self, order_data): + """Execute order saga with compensation.""" + completed_steps = [] + + try: + # Step 1: Reserve inventory + await self._reserve_inventory(order_data) + completed_steps.append("inventory_reserved") + + # Step 2: Charge payment + await self._charge_payment(order_data) + completed_steps.append("payment_charged") + + # Step 3: Create shipment + await self._create_shipment(order_data) + completed_steps.append("shipment_created") + + # Step 4: Confirm order + await self._confirm_order(order_data) + + except Exception as e: + # Compensate in reverse order + await self._compensate(completed_steps, order_data) + raise + + async def _compensate(self, steps, order_data): + """Compensate completed steps.""" + if "shipment_created" in steps: + await self._cancel_shipment(order_data) + + if "payment_charged" in steps: + await self._refund_payment(order_data) + + if "inventory_reserved" in steps: + await self._release_inventory(order_data) + + async def _reserve_inventory(self, order_data): + stream = self.store.stream( + category="inventory", + stream=order_data["product_id"] + ) + await stream.publish(events=[ + NewEvent( + name="inventory-reserved", + payload={ + "saga_id": self.saga_id, + "quantity": order_data["quantity"] + } + ) + ]) +``` + +### 2. Process Manager Pattern + +Orchestrate complex workflows: + +```python +class OrderProcessManager: + def __init__(self, store, projection_store): + self.store = store + self.projection_store = projection_store + + async def handle_event(self, event): + """Route events to appropriate handlers.""" + handlers = { + "order-placed": self._handle_order_placed, + "payment-received": self._handle_payment_received, + "inventory-checked": self._handle_inventory_checked, + "shipment-ready": self._handle_shipment_ready + } + + handler = handlers.get(event.name) + if handler: + await handler(event) + + async def _handle_order_placed(self, event): + order_id = event.payload["order_id"] + + # Start payment process + payment_stream = self.store.stream( + category="payments", + stream=f"payment-{order_id}" + ) + + await payment_stream.publish(events=[ + NewEvent( + name="payment-requested", + payload={ + "order_id": order_id, + "amount": event.payload["total"] + } + ) + ]) + + # Check inventory + for item in event.payload["items"]: + inventory_stream = self.store.stream( + category="inventory", + stream=item["product_id"] + ) + + await inventory_stream.publish(events=[ + NewEvent( + name="inventory-check-requested", + payload={ + "order_id": order_id, + "product_id": item["product_id"], + "quantity": item["quantity"] + } + ) + ]) + + async def _handle_payment_received(self, event): + # Update order status + order_id = event.payload["order_id"] + order_stream = self.store.stream( + category="orders", + stream=order_id + ) + + await order_stream.publish(events=[ + NewEvent( + name="order-paid", + payload={"payment_id": event.payload["payment_id"]} + ) + ]) +``` + +### 3. Event Sourcing with Snapshots + +Optimize projection rebuilding: + +```python +class SnapshotTransaction: + def __init__(self, store, projection_store): + self.store = store + self.projection_store = projection_store + + async def update_with_snapshot(self, stream_id, projector): + """Update projection with periodic snapshots.""" + + # Get last snapshot + snapshot = await self.projection_store.get( + f"snapshot:{stream_id}" + ) + + if snapshot: + # Continue from snapshot + last_position = snapshot.metadata["last_position"] + state = snapshot.state + metadata = snapshot.metadata + else: + # Start from beginning + last_position = -1 + state = projector.initial_state_factory() + metadata = projector.initial_metadata_factory() + + # Get events since snapshot + stream = self.store.stream(stream=stream_id) + events = stream.scan(from_position=last_position + 1) + + # Project new events + event_count = 0 + async for event in events: + state, metadata = projector.project_event( + state, metadata, event + ) + event_count += 1 + last_position = event.position + + # Save new snapshot if threshold reached + if event_count > 100: # Snapshot every 100 events + snapshot = Projection( + id=f"snapshot:{stream_id}", + state=state, + metadata={ + **metadata, + "last_position": last_position, + "snapshot_time": datetime.now() + } + ) + await self.projection_store.save(snapshot) + + # Save current projection + projection = Projection( + id=f"current:{stream_id}", + state=state, + metadata=metadata + ) + await self.projection_store.save(projection) +``` + +## Advanced Transaction Features + +### Distributed Transactions + +Coordinate across multiple stores: + +```python +class DistributedTransaction: + def __init__(self, stores): + self.stores = stores # Dict of name -> store + self.operations = [] + + async def __aenter__(self): + self.transactions = {} + for name, store in self.stores.items(): + self.transactions[name] = await store.begin_transaction() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if exc_type is None: + # Commit all + for tx in self.transactions.values(): + await tx.commit() + else: + # Rollback all + for tx in self.transactions.values(): + await tx.rollback() + + def get_store(self, name): + return self.transactions[name] + +# Usage +async with DistributedTransaction({ + "events": event_store, + "projections": projection_store +}) as dt: + # Operations across stores + events_tx = dt.get_store("events") + projections_tx = dt.get_store("projections") + + # Coordinated updates + await events_tx.stream("users", "user-123").publish(...) + await projections_tx.save(projection) +``` + +### Conditional Transactions + +Execute based on conditions: + +```python +from logicblocks.event.store.conditions import ( + stream_exists, + position_is, + any_condition, + all_conditions +) + +class ConditionalTransaction: + async def transfer_if_sufficient_balance( + self, + from_account: str, + to_account: str, + amount: float + ): + async with event_store_transaction(self.store) as tx: + # Check balance projection + from_projection = await self.projection_store.get( + f"balance:{from_account}" + ) + + if from_projection.state["balance"] < amount: + raise InsufficientBalanceError() + + # Execute transfer with conditions + from_stream = tx.stream( + category="accounts", + stream=from_account + ) + to_stream = tx.stream( + category="accounts", + stream=to_account + ) + + # Ensure both accounts exist + await from_stream.publish( + events=[ + NewEvent( + name="money-withdrawn", + payload={"amount": amount} + ) + ], + write_conditions=[stream_exists()] + ) + + await to_stream.publish( + events=[ + NewEvent( + name="money-deposited", + payload={"amount": amount} + ) + ], + write_conditions=[stream_exists()] + ) +``` + +## Performance Optimization + +### Batch Operations + +```python +class BatchProcessor: + def __init__(self, store, batch_size=100): + self.store = store + self.batch_size = batch_size + + async def process_batch(self, items): + """Process items in batches for better performance.""" + for i in range(0, len(items), self.batch_size): + batch = items[i:i + self.batch_size] + + async with event_store_transaction(self.store) as tx: + # Process batch in single transaction + tasks = [] + for item in batch: + stream = tx.stream( + category=item["category"], + stream=item["stream"] + ) + task = stream.publish(events=item["events"]) + tasks.append(task) + + # Execute concurrently within transaction + await asyncio.gather(*tasks) +``` + +### Connection Pooling + +```python +class PooledTransactionManager: + def __init__(self, connection_pool): + self.pool = connection_pool + + async def execute_with_connection(self, operation): + """Execute operation with pooled connection.""" + async with self.pool.acquire() as connection: + adapter = PostgresEventStorageAdapter( + connection_source=lambda: connection + ) + store = EventStore(adapter) + + async with event_store_transaction(store) as tx: + return await operation(tx) +``` + +## Testing Transactions + +```python +import pytest +from unittest.mock import AsyncMock + +class TestTransactions: + @pytest.mark.asyncio + async def test_transaction_rollback(self, event_store): + """Test transaction rollback on error.""" + stream = event_store.stream( + category="test", + stream="rollback-test" + ) + + # Get initial state + initial_events = list(await stream.scan()) + + try: + async with event_store_transaction(event_store) as tx: + test_stream = tx.stream( + category="test", + stream="rollback-test" + ) + + await test_stream.publish(events=[ + NewEvent(name="test-event", payload={}) + ]) + + # Force error + raise Exception("Test error") + except Exception: + pass + + # Verify rollback + final_events = list(await stream.scan()) + assert len(final_events) == len(initial_events) + + @pytest.mark.asyncio + async def test_retry_logic(self): + """Test retry on transient errors.""" + mock_operation = AsyncMock() + mock_operation.side_effect = [ + Exception("Transient error"), + Exception("Transient error"), + "Success" + ] + + @retry_on_error(max_attempts=3) + async def operation_with_retry(): + return await mock_operation() + + result = await operation_with_retry() + assert result == "Success" + assert mock_operation.call_count == 3 +``` + +## Best Practices + +### 1. Keep Transactions Small + +```python +# GOOD: Small, focused transactions +async with event_store_transaction(store) as tx: + stream = tx.stream(category="orders", stream=order_id) + await stream.publish(events=[ + NewEvent(name="order-confirmed", payload={}) + ]) + +# BAD: Large, complex transactions +async with event_store_transaction(store) as tx: + # Too many operations in one transaction + for order in large_order_list: + # Process hundreds of orders... +``` + +### 2. Use Appropriate Isolation + +```python +# Read committed for most cases +async with event_store_transaction( + store, + isolation_level="read_committed" +) as tx: + # Standard operations + pass + +# Serializable for critical operations +async with event_store_transaction( + store, + isolation_level="serializable" +) as tx: + # Critical financial operations + pass +``` + +### 3. Handle Errors Gracefully + +```python +async def safe_operation(store): + """Operation with comprehensive error handling.""" + max_retries = 3 + retry_count = 0 + + while retry_count < max_retries: + try: + async with event_store_transaction(store) as tx: + # Perform operations + return await do_work(tx) + + except UnmetWriteConditionError as e: + # Handle concurrency conflicts + retry_count += 1 + if retry_count >= max_retries: + raise + await asyncio.sleep(0.1 * retry_count) + + except ConnectionError as e: + # Handle connection issues + logger.error(f"Connection error: {e}") + raise + + except Exception as e: + # Handle unexpected errors + logger.error(f"Unexpected error: {e}") + raise +``` + +## Next Steps + +- Explore [Testing](./testing.md) for transaction testing strategies +- See [API Reference](./api-reference.md) for complete transaction API +- Learn about [Best Practices](./best-practices.md) for production systems \ No newline at end of file