Skip to content

Commit 6d240c0

Browse files
Copilotberndverst
andcommitted
Add entity types, context, and client methods for durable entities
Co-authored-by: berndverst <[email protected]>
1 parent e4167b2 commit 6d240c0

File tree

4 files changed

+401
-1
lines changed

4 files changed

+401
-1
lines changed

durabletask/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
"""Durable Task SDK for Python"""
55

66
from durabletask.worker import ConcurrencyOptions
7+
from durabletask.task import EntityContext, EntityState, EntityQuery, EntityQueryResult
78

8-
__all__ = ["ConcurrencyOptions"]
9+
__all__ = ["ConcurrencyOptions", "EntityContext", "EntityState", "EntityQuery", "EntityQueryResult"]
910

1011
PACKAGE_NAME = "durabletask"

durabletask/client.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,3 +222,141 @@ def purge_orchestration(self, instance_id: str, recursive: bool = True):
222222
req = pb.PurgeInstancesRequest(instanceId=instance_id, recursive=recursive)
223223
self._logger.info(f"Purging instance '{instance_id}'.")
224224
self._stub.PurgeInstances(req)
225+
226+
def signal_entity(self, entity_id: str, operation_name: str, *,
227+
input: Optional[Any] = None,
228+
request_id: Optional[str] = None,
229+
scheduled_time: Optional[datetime] = None):
230+
"""Signal an entity with an operation.
231+
232+
Parameters
233+
----------
234+
entity_id : str
235+
The ID of the entity to signal.
236+
operation_name : str
237+
The name of the operation to perform.
238+
input : Optional[Any]
239+
The JSON-serializable input to pass to the entity operation.
240+
request_id : Optional[str]
241+
A unique request ID for the operation. If not provided, a random UUID will be used.
242+
scheduled_time : Optional[datetime]
243+
The time to schedule the operation. If not provided, the operation is scheduled immediately.
244+
"""
245+
req = pb.SignalEntityRequest(
246+
instanceId=entity_id,
247+
name=operation_name,
248+
input=wrappers_pb2.StringValue(value=shared.to_json(input)) if input is not None else None,
249+
requestId=request_id if request_id else uuid.uuid4().hex,
250+
scheduledTime=helpers.new_timestamp(scheduled_time) if scheduled_time else None)
251+
252+
self._logger.info(f"Signaling entity '{entity_id}' with operation '{operation_name}'.")
253+
self._stub.SignalEntity(req)
254+
255+
def get_entity(self, entity_id: str, *, include_state: bool = True) -> Optional[task.EntityState]:
256+
"""Get the state of an entity.
257+
258+
Parameters
259+
----------
260+
entity_id : str
261+
The ID of the entity to query.
262+
include_state : bool
263+
Whether to include the entity's state in the response.
264+
265+
Returns
266+
-------
267+
Optional[EntityState]
268+
The entity state if it exists, None otherwise.
269+
"""
270+
req = pb.GetEntityRequest(instanceId=entity_id, includeState=include_state)
271+
res: pb.GetEntityResponse = self._stub.GetEntity(req)
272+
273+
if not res.exists:
274+
return None
275+
276+
entity_metadata = res.entity
277+
return task.EntityState(
278+
instance_id=entity_metadata.instanceId,
279+
last_modified_time=entity_metadata.lastModifiedTime.ToDatetime(),
280+
backlog_queue_size=entity_metadata.backlogQueueSize,
281+
locked_by=entity_metadata.lockedBy.value if not helpers.is_empty(entity_metadata.lockedBy) else None,
282+
serialized_state=entity_metadata.serializedState.value if not helpers.is_empty(entity_metadata.serializedState) else None)
283+
284+
def query_entities(self, query: task.EntityQuery) -> task.EntityQueryResult:
285+
"""Query entities based on the provided criteria.
286+
287+
Parameters
288+
----------
289+
query : EntityQuery
290+
The query criteria for entities.
291+
292+
Returns
293+
-------
294+
EntityQueryResult
295+
The query result containing matching entities and continuation token.
296+
"""
297+
# Build the protobuf query
298+
pb_query = pb.EntityQuery(
299+
includeState=query.include_state,
300+
includeTransient=query.include_transient)
301+
302+
if query.instance_id_starts_with is not None:
303+
pb_query.instanceIdStartsWith = wrappers_pb2.StringValue(value=query.instance_id_starts_with)
304+
if query.last_modified_from is not None:
305+
pb_query.lastModifiedFrom = helpers.new_timestamp(query.last_modified_from)
306+
if query.last_modified_to is not None:
307+
pb_query.lastModifiedTo = helpers.new_timestamp(query.last_modified_to)
308+
if query.page_size is not None:
309+
pb_query.pageSize = wrappers_pb2.Int32Value(value=query.page_size)
310+
if query.continuation_token is not None:
311+
pb_query.continuationToken = wrappers_pb2.StringValue(value=query.continuation_token)
312+
313+
req = pb.QueryEntitiesRequest(query=pb_query)
314+
res: pb.QueryEntitiesResponse = self._stub.QueryEntities(req)
315+
316+
# Convert response to Python objects
317+
entities = []
318+
for entity_metadata in res.entities:
319+
entities.append(task.EntityState(
320+
instance_id=entity_metadata.instanceId,
321+
last_modified_time=entity_metadata.lastModifiedTime.ToDatetime(),
322+
backlog_queue_size=entity_metadata.backlogQueueSize,
323+
locked_by=entity_metadata.lockedBy.value if not helpers.is_empty(entity_metadata.lockedBy) else None,
324+
serialized_state=entity_metadata.serializedState.value if not helpers.is_empty(entity_metadata.serializedState) else None))
325+
326+
return task.EntityQueryResult(
327+
entities=entities,
328+
continuation_token=res.continuationToken.value if not helpers.is_empty(res.continuationToken) else None)
329+
330+
def clean_entity_storage(self, *,
331+
remove_empty_entities: bool = True,
332+
release_orphaned_locks: bool = True,
333+
continuation_token: Optional[str] = None) -> tuple[int, int, Optional[str]]:
334+
"""Clean up entity storage by removing empty entities and releasing orphaned locks.
335+
336+
Parameters
337+
----------
338+
remove_empty_entities : bool
339+
Whether to remove entities that have no state.
340+
release_orphaned_locks : bool
341+
Whether to release locks that are no longer held by active orchestrations.
342+
continuation_token : Optional[str]
343+
A continuation token from a previous cleanup operation.
344+
345+
Returns
346+
-------
347+
tuple[int, int, Optional[str]]
348+
A tuple containing (empty_entities_removed, orphaned_locks_released, continuation_token).
349+
"""
350+
req = pb.CleanEntityStorageRequest(
351+
removeEmptyEntities=remove_empty_entities,
352+
releaseOrphanedLocks=release_orphaned_locks)
353+
354+
if continuation_token is not None:
355+
req.continuationToken = wrappers_pb2.StringValue(value=continuation_token)
356+
357+
self._logger.info("Cleaning entity storage.")
358+
res: pb.CleanEntityStorageResponse = self._stub.CleanEntityStorage(req)
359+
360+
return (res.emptyEntitiesRemoved,
361+
res.orphanedLocksReleased,
362+
res.continuationToken.value if not helpers.is_empty(res.continuationToken) else None)

durabletask/task.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from abc import ABC, abstractmethod
99
from datetime import datetime, timedelta
1010
from typing import Any, Callable, Generator, Generic, Optional, TypeVar, Union
11+
from dataclasses import dataclass
1112

1213
import durabletask.internal.helpers as pbh
1314
import durabletask.internal.orchestrator_service_pb2 as pb
@@ -176,6 +177,51 @@ def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None:
176177
"""
177178
pass
178179

180+
@abstractmethod
181+
def signal_entity(self, entity_id: str, operation_name: str, *,
182+
input: Optional[Any] = None) -> Task:
183+
"""Signal an entity with an operation.
184+
185+
Parameters
186+
----------
187+
entity_id : str
188+
The ID of the entity to signal.
189+
operation_name : str
190+
The name of the operation to perform.
191+
input : Optional[Any]
192+
The JSON-serializable input to pass to the entity operation.
193+
194+
Returns
195+
-------
196+
Task
197+
A Durable Task that completes when the entity operation is scheduled.
198+
"""
199+
pass
200+
201+
@abstractmethod
202+
def call_entity(self, entity_id: str, operation_name: str, *,
203+
input: Optional[TInput] = None,
204+
retry_policy: Optional[RetryPolicy] = None) -> Task[TOutput]:
205+
"""Call an entity operation and wait for the result.
206+
207+
Parameters
208+
----------
209+
entity_id : str
210+
The ID of the entity to call.
211+
operation_name : str
212+
The name of the operation to perform.
213+
input : Optional[TInput]
214+
The JSON-serializable input to pass to the entity operation.
215+
retry_policy : Optional[RetryPolicy]
216+
The retry policy to use for this entity call.
217+
218+
Returns
219+
-------
220+
Task[TOutput]
221+
A Durable Task that completes when the entity operation completes or fails.
222+
"""
223+
pass
224+
179225

180226
class FailureDetails:
181227
def __init__(self, message: str, error_type: str, stack_trace: Optional[str]):
@@ -219,6 +265,40 @@ class OrchestrationStateError(Exception):
219265
pass
220266

221267

268+
@dataclass
269+
class EntityState:
270+
"""Represents the state of a durable entity."""
271+
instance_id: str
272+
last_modified_time: datetime
273+
backlog_queue_size: int
274+
locked_by: Optional[str]
275+
serialized_state: Optional[str]
276+
277+
@property
278+
def exists(self) -> bool:
279+
"""Returns True if the entity exists (has been created), False otherwise."""
280+
return self.serialized_state is not None
281+
282+
283+
@dataclass
284+
class EntityQuery:
285+
"""Represents a query for durable entities."""
286+
instance_id_starts_with: Optional[str] = None
287+
last_modified_from: Optional[datetime] = None
288+
last_modified_to: Optional[datetime] = None
289+
include_state: bool = False
290+
include_transient: bool = False
291+
page_size: Optional[int] = None
292+
continuation_token: Optional[str] = None
293+
294+
295+
@dataclass
296+
class EntityQueryResult:
297+
"""Represents the result of an entity query."""
298+
entities: list[EntityState]
299+
continuation_token: Optional[str] = None
300+
301+
222302
class Task(ABC, Generic[T]):
223303
"""Abstract base class for asynchronous tasks in a durable orchestration."""
224304
_result: T
@@ -433,12 +513,81 @@ def task_id(self) -> int:
433513
return self._task_id
434514

435515

516+
class EntityContext:
517+
def __init__(self, instance_id: str, operation_name: str, is_new_entity: bool = False):
518+
self._instance_id = instance_id
519+
self._operation_name = operation_name
520+
self._is_new_entity = is_new_entity
521+
self._state: Optional[Any] = None
522+
523+
@property
524+
def instance_id(self) -> str:
525+
"""Get the ID of the entity instance.
526+
527+
Returns
528+
-------
529+
str
530+
The ID of the current entity instance.
531+
"""
532+
return self._instance_id
533+
534+
@property
535+
def operation_name(self) -> str:
536+
"""Get the name of the operation being performed on the entity.
537+
538+
Returns
539+
-------
540+
str
541+
The name of the operation.
542+
"""
543+
return self._operation_name
544+
545+
@property
546+
def is_new_entity(self) -> bool:
547+
"""Get a value indicating whether this is a newly created entity.
548+
549+
Returns
550+
-------
551+
bool
552+
True if this is the first operation on this entity, False otherwise.
553+
"""
554+
return self._is_new_entity
555+
556+
def get_state(self, state_type: type[T] = None) -> Optional[T]:
557+
"""Get the current state of the entity.
558+
559+
Parameters
560+
----------
561+
state_type : type[T], optional
562+
The type to deserialize the state to. If not provided, returns the raw state.
563+
564+
Returns
565+
-------
566+
Optional[T]
567+
The current state of the entity, or None if the entity has no state.
568+
"""
569+
return self._state
570+
571+
def set_state(self, state: Any) -> None:
572+
"""Set the current state of the entity.
573+
574+
Parameters
575+
----------
576+
state : Any
577+
The new state for the entity. Must be JSON-serializable.
578+
"""
579+
self._state = state
580+
581+
436582
# Orchestrators are generators that yield tasks and receive/return any type
437583
Orchestrator = Callable[[OrchestrationContext, TInput], Union[Generator[Task, Any, Any], TOutput]]
438584

439585
# Activities are simple functions that can be scheduled by orchestrators
440586
Activity = Callable[[ActivityContext, TInput], TOutput]
441587

588+
# Entities are stateful objects that can receive operations and maintain state
589+
Entity = Callable[['EntityContext', TInput], TOutput]
590+
442591

443592
class RetryPolicy:
444593
"""Represents the retry policy for an orchestration or activity function."""

0 commit comments

Comments
 (0)