Skip to content
Closed
93 changes: 93 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,99 @@ Orchestrations can start child orchestrations using the `call_sub_orchestrator`

Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing.

### Durable entities

Durable entities are stateful objects that can maintain state across multiple operations. Entities support operations that can read and modify the entity's state. Each entity has a unique entity ID and maintains its state independently.

The Python SDK supports both function-based and class-based entity implementations:

#### Function-based entities (simple)

```python
# Define an entity function
def counter_entity(ctx: task.EntityContext, input):
if ctx.operation_name == "increment":
current_count = ctx.get_state() or 0
new_count = current_count + (input or 1)
ctx.set_state(new_count)
return new_count
elif ctx.operation_name == "get":
return ctx.get_state() or 0

# Register the entity with the worker
worker._registry.add_named_entity("Counter", counter_entity)
```

#### Class-based entities (advanced)

```python
import durabletask as dt

class CounterEntity(dt.EntityBase):
def increment(self, value: int = 1) -> int:
current = self.get_state() or 0
new_value = current + value
self.set_state(new_value)
return new_value

def get(self) -> int:
return self.get_state() or 0

def reset(self) -> int:
self.set_state(0)
return 0

# Register class-based entity
worker._registry.add_named_entity("Counter", CounterEntity)
```

#### Client operations with structured IDs

```python
# Use structured entity IDs (recommended)
counter_id = dt.EntityInstanceId("Counter", "my-counter")

# Signal an entity from an orchestrator
yield ctx.signal_entity(counter_id, "increment", input=5)

# Or signal an entity directly from a client
client.signal_entity(counter_id, "increment", input=10)

# Query entity state
entity_state = client.get_entity(counter_id, include_state=True)
if entity_state and entity_state.exists:
print(f"Current count: {entity_state.serialized_state}")

# Query multiple entities
query = dt.EntityQuery(instance_id_starts_with="Counter@", include_state=True)
results = client.query_entities(query)
```

#### Entity-to-entity communication

Entities can signal other entities and start orchestrations:

```python
class NotificationEntity(dt.EntityBase):
def send_notification(self, data):
# Process notification
notifications = self.get_state() or {"count": 0}
notifications["count"] += 1
self.set_state(notifications)

# Signal another entity
counter_id = dt.EntityInstanceId("Counter", f"user-{data['user_id']}")
self.signal_entity(counter_id, "increment")

# Start an orchestration
return self.start_new_orchestration("process_notification", input=data)
```

You can find comprehensive examples in:
- [Basic entities](./examples/durable_entities.py)
- [Class-based entities](./examples/class_based_entities.py)
- [Complete guide](./docs/entities.md)

### Continue-as-new (TODO)

Orchestrations can be continued as new using the `continue_as_new` API. This API allows an orchestration to restart itself from scratch, optionally with a new input.
Expand Down
285 changes: 285 additions & 0 deletions docs/entities.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
# Durable Entities Guide

This guide covers the comprehensive durable entities support in the Python SDK, bringing feature parity with other Durable Task SDKs.

## What are Durable Entities?

Durable entities are stateful objects that can maintain state across multiple operations. Each entity has a unique entity ID and can handle various operations that read and modify its state. Entities are accessed using the format `EntityType@EntityKey` (e.g., `Counter@user1`).

## Key Features

### Entity Functions (Basic Implementation)

Register entity functions that handle operations and maintain state:

```python
import durabletask as dt

def counter_entity(ctx: dt.EntityContext, input):
if ctx.operation_name == "increment":
current_count = ctx.get_state() or 0
new_count = current_count + (input or 1)
ctx.set_state(new_count)
return new_count
elif ctx.operation_name == "get":
return ctx.get_state() or 0

# Register with worker
worker = TaskHubGrpcWorker()
worker._registry.add_named_entity("Counter", counter_entity)
```

### Class-Based Entities (Advanced Implementation)

For more complex entities, use the `EntityBase` class with method-based dispatch:

```python
import durabletask as dt

class CounterEntity(dt.EntityBase):
def __init__(self):
super().__init__()
self._state = 0

def increment(self, value: int = 1) -> int:
"""Increment the counter by the specified value."""
current = self.get_state() or 0
new_value = current + value
self.set_state(new_value)
return new_value

def get(self) -> int:
"""Get the current counter value."""
return self.get_state() or 0

def reset(self) -> int:
"""Reset the counter to zero."""
self.set_state(0)
return 0

# Register class-based entity
worker._registry.add_named_entity("Counter", CounterEntity)
```

### Client Operations

Signal entities, query state, and manage entity storage:

```python
# Create client
client = TaskHubGrpcClient()

# Signal an entity using string ID
client.signal_entity("Counter@my-counter", "increment", input=5)

# Signal an entity using structured ID (recommended)
counter_id = dt.EntityInstanceId("Counter", "my-counter")
client.signal_entity(counter_id, "increment", input=5)

# Query entity state
entity_state = client.get_entity(counter_id, include_state=True)
if entity_state and entity_state.exists:
print(f"Counter value: {entity_state.serialized_state}")

# Query multiple entities
query = dt.EntityQuery(instance_id_starts_with="Counter@", include_state=True)
results = client.query_entities(query)
print(f"Found {len(results.entities)} counter entities")

# Clean entity storage
removed, released, token = client.clean_entity_storage()
```

### Orchestration Integration

Signal entities from orchestrations:

```python
def my_orchestrator(ctx: dt.OrchestrationContext, input):
# Signal entities (fire-and-forget)
counter_id = dt.EntityInstanceId("Counter", "global")
yield ctx.signal_entity(counter_id, "increment", input=5)

cart_id = dt.EntityInstanceId("ShoppingCart", "user1")
yield ctx.signal_entity(cart_id, "add_item",
input={"name": "Apple", "price": 1.50})
return "Entity operations completed"
```

### Entity-to-Entity Communication

Entities can signal other entities and start orchestrations:

```python
class NotificationEntity(dt.EntityBase):
def send_notification(self, data):
user_id = data["user_id"]
message = data["message"]

# Store notification
notifications = self.get_state() or {"notifications": []}
notifications["notifications"].append({
"user_id": user_id,
"message": message,
"timestamp": datetime.utcnow().isoformat()
})
self.set_state(notifications)

# Signal user's notification counter
counter_id = dt.EntityInstanceId("Counter", f"notifications-{user_id}")
self.signal_entity(counter_id, "increment", input=1)

# Start a notification processing workflow
workflow_id = self.start_new_orchestration(
"process_notification",
input={"user_id": user_id, "message": message}
)

return workflow_id
```

## Entity ID Structure

Use `EntityInstanceId` for type-safe entity references:

```python
# Create structured entity ID
entity_id = dt.EntityInstanceId("Counter", "user123")
print(entity_id.name) # "Counter"
print(entity_id.key) # "user123"
print(str(entity_id)) # "Counter@user123"

# Parse from string
parsed_id = dt.EntityInstanceId.from_string("ShoppingCart@cart1")
```

## Error Handling

Handle entity operation failures with specialized exceptions:

```python
try:
client.signal_entity("NonExistent@entity", "operation")
except dt.EntityOperationFailedException as ex:
print(f"Entity operation failed: {ex.failure_details.message}")
print(f"Failed entity: {ex.entity_id}")
print(f"Failed operation: {ex.operation_name}")
```

## Entity Context Features

The `EntityContext` provides rich functionality:

```python
def advanced_entity(ctx: dt.EntityContext, input):
# Access entity information
print(f"Entity ID: {ctx.instance_id}")
print(f"Entity name: {ctx.entity_id.name}")
print(f"Entity key: {ctx.entity_id.key}")
print(f"Operation: {ctx.operation_name}")
print(f"Is new: {ctx.is_new_entity}")

# State management
current_state = ctx.get_state()
ctx.set_state({"updated": True, "input": input})

# Signal other entities
ctx.signal_entity("Logger@system", "log",
input=f"Operation {ctx.operation_name} executed")

# Start orchestrations
workflow_id = ctx.start_new_orchestration("cleanup_workflow")

return {"workflow_id": workflow_id}
```

## Best Practices

### 1. Use Structured Entity IDs

```python
# ✅ Good - Type-safe and clear
counter_id = dt.EntityInstanceId("Counter", "user123")
client.signal_entity(counter_id, "increment")

# ❌ Avoid - Error-prone string concatenation
client.signal_entity("Counter@user123", "increment")
```

### 2. Implement Rich Entity Classes

```python
# ✅ Good - Clear separation of concerns
class ShoppingCartEntity(dt.EntityBase):
def add_item(self, item: dict) -> int:
# Validation
if not item.get("name") or not item.get("price"):
raise ValueError("Item must have name and price")

# Business logic
cart = self.get_state() or {"items": []}
cart["items"].append(item)
self.set_state(cart)

return len(cart["items"])

def get_total(self) -> float:
cart = self.get_state() or {"items": []}
return sum(item["price"] for item in cart["items"])
```

### 3. Handle State Initialization

```python
class StatefulEntity(dt.EntityBase):
def __init__(self):
super().__init__()
# Set default state structure
self._state = {"initialized": True, "value": 0}

def ensure_initialized(self):
if not self.get_state():
self.set_state({"initialized": True, "value": 0})
```

### 4. Use Type Hints

```python
from typing import Dict, List, Optional

class TypedEntity(dt.EntityBase):
def process_order(self, order_data: Dict[str, any]) -> str:
"""Process an order and return order ID."""
order_id = f"order-{len(self.get_orders())}"
self.add_order(order_data)
return order_id

def get_orders(self) -> List[Dict]:
"""Get all orders."""
state = self.get_state() or {"orders": []}
return state["orders"]
```

## Examples

- **Basic entities**: See [`examples/durable_entities.py`](examples/durable_entities.py)
- **Class-based entities**: See [`examples/class_based_entities.py`](examples/class_based_entities.py)

## Comparison with .NET Implementation

This Python implementation provides feature parity with the .NET DurableTask SDK:

| Feature | .NET | Python | Status |
|---------|------|--------|--------|
| Function-based entities | ✅ | ✅ | Complete |
| Class-based entities | ✅ | ✅ | Complete |
| Method dispatch | ✅ | ✅ | Complete |
| Structured entity IDs | ✅ | ✅ | Complete |
| Entity-to-entity signals | ✅ | ✅ | Complete |
| Orchestration starting | ✅ | ✅ | Complete |
| State management | ✅ | ✅ | Complete |
| Error handling | ✅ | ✅ | Complete |
| Client operations | ✅ | ✅ | Complete |
| Entity locking | ✅ | ⏳ | Planned |

The Python implementation follows the same patterns and provides equivalent functionality to ensure consistency across Durable Task SDKs.
Loading
Loading