Skip to content

Commit 745237f

Browse files
Copilotberndverst
andcommitted
Complete durable entities implementation with examples and documentation
Co-authored-by: berndverst <[email protected]>
1 parent fcb040d commit 745237f

File tree

5 files changed

+222
-40
lines changed

5 files changed

+222
-40
lines changed

README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,38 @@ Orchestrations can start child orchestrations using the `call_sub_orchestrator`
117117

118118
Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing.
119119

120+
### Durable entities
121+
122+
Durable entities are stateful objects that can maintain state across multiple operations. Entities support operations that can read and modify the entity's state. Each entity has a unique entity ID and maintains its state independently.
123+
124+
```python
125+
# Define an entity function
126+
def counter_entity(ctx: task.EntityContext, input):
127+
if ctx.operation_name == "increment":
128+
current_count = ctx.get_state() or 0
129+
new_count = current_count + (input or 1)
130+
ctx.set_state(new_count)
131+
return new_count
132+
elif ctx.operation_name == "get":
133+
return ctx.get_state() or 0
134+
135+
# Register the entity with the worker
136+
worker.add_named_entity("Counter", counter_entity)
137+
138+
# Signal an entity from an orchestrator
139+
yield ctx.signal_entity("Counter@my-counter", "increment", input=5)
140+
141+
# Or signal an entity directly from a client
142+
client.signal_entity("Counter@my-counter", "increment", input=10)
143+
144+
# Query entity state
145+
entity_state = client.get_entity("Counter@my-counter", include_state=True)
146+
if entity_state and entity_state.exists:
147+
print(f"Current count: {entity_state.serialized_state}")
148+
```
149+
150+
You can find the full sample [here](./examples/durable_entities.py).
151+
120152
### Continue-as-new (TODO)
121153

122154
Orchestrations can be continued as new using the `continue_as_new` API. This API allows an orchestration to restart itself from scratch, optionally with a new input.

durabletask/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ def get_entity(self, entity_id: str, *, include_state: bool = True) -> Optional[
269269
"""
270270
req = pb.GetEntityRequest(instanceId=entity_id, includeState=include_state)
271271
res: pb.GetEntityResponse = self._stub.GetEntity(req)
272-
272+
273273
if not res.exists:
274274
return None
275275

@@ -357,6 +357,6 @@ def clean_entity_storage(self, *,
357357
self._logger.info("Cleaning entity storage.")
358358
res: pb.CleanEntityStorageResponse = self._stub.CleanEntityStorage(req)
359359

360-
return (res.emptyEntitiesRemoved,
360+
return (res.emptyEntitiesRemoved,
361361
res.orphanedLocksReleased,
362362
res.continuationToken.value if not helpers.is_empty(res.continuationToken) else None)

durabletask/worker.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -940,15 +940,15 @@ def signal_entity(self, entity_id: str, operation_name: str, *,
940940

941941
# Entity signals don't return values, so we create a completed task
942942
signal_task = task.CompletableTask()
943-
943+
944944
# Store the action to be executed
945945
task_id = self._next_task_id()
946946
self._pending_actions[task_id] = action
947947
self._pending_tasks[task_id] = signal_task
948-
948+
949949
# Mark as complete since signals don't have return values
950950
signal_task.complete(None)
951-
951+
952952
return signal_task
953953

954954
def call_entity(self, entity_id: str, operation_name: str, *,
@@ -1372,15 +1372,15 @@ def execute(self, req: pb.EntityBatchRequest) -> pb.EntityBatchResult:
13721372

13731373
# Parse current entity state
13741374
current_state = shared.from_json(req.entityState.value) if not ph.is_empty(req.entityState) else None
1375-
1375+
13761376
# Extract entity type from instance ID (format: entitytype@key)
13771377
entity_type = "Unknown"
13781378
if "@" in instance_id:
13791379
entity_type = instance_id.split("@")[0]
1380-
1380+
13811381
results = []
13821382
actions = []
1383-
1383+
13841384
for operation in req.operations:
13851385
try:
13861386
# Get the entity function using the entity type from instanceId
@@ -1413,12 +1413,12 @@ def execute(self, req: pb.EntityBatchRequest) -> pb.EntityBatchResult:
14131413
))
14141414
else:
14151415
result.success.CopyFrom(pb.OperationResultSuccess())
1416-
1416+
14171417
results.append(result)
14181418

14191419
except Exception as ex:
14201420
self._logger.exception(f"Error executing entity operation '{operation.operation}' on entity type '{entity_type}': {ex}")
1421-
1421+
14221422
# Create failure result
14231423
failure_details = ph.new_failure_details(ex)
14241424
result = pb.OperationResult()

examples/durable_entities.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
"""
5+
Example demonstrating durable entities usage.
6+
7+
This example shows how to create and use durable entities with the Python SDK.
8+
Entities are stateful objects that can maintain state across multiple operations.
9+
"""
10+
11+
import durabletask.task as dt
12+
from durabletask.worker import TaskHubGrpcWorker
13+
import logging
14+
15+
16+
def counter_entity(ctx: dt.EntityContext, input) -> int:
17+
"""A simple counter entity that can increment, decrement, get, and reset."""
18+
19+
if ctx.operation_name == "increment":
20+
current_count = ctx.get_state() or 0
21+
increment_by = input or 1
22+
new_count = current_count + increment_by
23+
ctx.set_state(new_count)
24+
return new_count
25+
26+
elif ctx.operation_name == "decrement":
27+
current_count = ctx.get_state() or 0
28+
decrement_by = input or 1
29+
new_count = current_count - decrement_by
30+
ctx.set_state(new_count)
31+
return new_count
32+
33+
elif ctx.operation_name == "get":
34+
return ctx.get_state() or 0
35+
36+
elif ctx.operation_name == "reset":
37+
ctx.set_state(0)
38+
return 0
39+
40+
else:
41+
raise ValueError(f"Unknown operation: {ctx.operation_name}")
42+
43+
44+
def shopping_cart_entity(ctx: dt.EntityContext, input):
45+
"""A shopping cart entity that can add/remove items and calculate totals."""
46+
47+
if ctx.operation_name == "add_item":
48+
cart = ctx.get_state() or {"items": []}
49+
cart["items"].append(input)
50+
ctx.set_state(cart)
51+
return len(cart["items"])
52+
53+
elif ctx.operation_name == "remove_item":
54+
cart = ctx.get_state() or {"items": []}
55+
if input in cart["items"]:
56+
cart["items"].remove(input)
57+
ctx.set_state(cart)
58+
return len(cart["items"])
59+
60+
elif ctx.operation_name == "get_items":
61+
cart = ctx.get_state() or {"items": []}
62+
return cart["items"]
63+
64+
elif ctx.operation_name == "get_total":
65+
cart = ctx.get_state() or {"items": []}
66+
# Simple total calculation assuming each item has a 'price' field
67+
total = sum(item.get("price", 0) for item in cart["items"] if isinstance(item, dict))
68+
return total
69+
70+
elif ctx.operation_name == "clear":
71+
ctx.set_state({"items": []})
72+
return 0
73+
74+
else:
75+
raise ValueError(f"Unknown operation: {ctx.operation_name}")
76+
77+
78+
def entity_orchestrator(ctx: dt.OrchestrationContext, input):
79+
"""Orchestrator that demonstrates entity interactions."""
80+
81+
# Signal entities (fire-and-forget)
82+
yield ctx.signal_entity("Counter@global", "increment", input=5)
83+
yield ctx.signal_entity("Counter@user1", "increment", input=1)
84+
yield ctx.signal_entity("Counter@user2", "increment", input=2)
85+
86+
# Add items to shopping cart
87+
yield ctx.signal_entity("ShoppingCart@user1", "add_item",
88+
input={"name": "Apple", "price": 1.50})
89+
yield ctx.signal_entity("ShoppingCart@user1", "add_item",
90+
input={"name": "Banana", "price": 0.75})
91+
92+
return "Entity operations completed"
93+
94+
95+
def main():
96+
# Set up logging
97+
logging.basicConfig(level=logging.INFO)
98+
99+
# Create and configure the worker
100+
worker = TaskHubGrpcWorker()
101+
102+
# Register entities - entities should be registered by their intended name
103+
# Since entity execution extracts the entity type from the instance ID (e.g., "Counter@key1")
104+
# we need to register them with the exact name that will be used in instance IDs
105+
worker._registry.add_named_entity("Counter", counter_entity)
106+
worker._registry.add_named_entity("ShoppingCart", shopping_cart_entity)
107+
108+
# Register orchestrator
109+
worker.add_orchestrator(entity_orchestrator)
110+
111+
print("Entity worker example setup complete.")
112+
print("\nRegistered entities:")
113+
print("- Counter: supports increment, decrement, get, reset operations")
114+
print("- ShoppingCart: supports add_item, remove_item, get_items, get_total, clear operations")
115+
print("\nTo use entities, you would:")
116+
print("1. Start the worker: worker.start()")
117+
print("2. Use a client to signal entities or start orchestrations")
118+
print("3. Query entity state using client.get_entity()")
119+
120+
# Example client usage (commented out since it requires a running sidecar)
121+
"""
122+
# Create client
123+
client = TaskHubGrpcClient()
124+
125+
# Start an orchestration that uses entities
126+
instance_id = client.schedule_new_orchestration(entity_orchestrator)
127+
print(f"Started orchestration: {instance_id}")
128+
129+
# Signal entities directly
130+
client.signal_entity("Counter@test", "increment", input=10)
131+
client.signal_entity("Counter@test", "increment", input=5)
132+
133+
# Query entity state
134+
counter_state = client.get_entity("Counter@test", include_state=True)
135+
if counter_state:
136+
print(f"Counter state: {counter_state.serialized_state}")
137+
138+
# Query entities
139+
query = dt.EntityQuery(instance_id_starts_with="Counter@", include_state=True)
140+
results = client.query_entities(query)
141+
print(f"Found {len(results.entities)} counter entities")
142+
"""
143+
144+
145+
if __name__ == "__main__":
146+
main()

0 commit comments

Comments
 (0)