Skip to content

Commit 5fd2a0e

Browse files
committed
Add new_uuid method to client
- Update entity messages to use UUIDs as requestIds
1 parent b4086fd commit 5fd2a0e

File tree

5 files changed

+116
-7
lines changed

5 files changed

+116
-7
lines changed

durabletask/internal/helpers.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,14 @@ def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str],
196196
))
197197

198198

199-
def new_call_entity_action(id: int, parent_instance_id: str, entity_id: EntityInstanceId, operation: str, encoded_input: Optional[str]):
199+
def new_call_entity_action(id: int,
200+
parent_instance_id: str,
201+
entity_id: EntityInstanceId,
202+
operation: str,
203+
encoded_input: Optional[str],
204+
request_id: str) -> pb.OrchestratorAction:
200205
return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationCalled=pb.EntityOperationCalledEvent(
201-
requestId=f"{parent_instance_id}:{id}",
206+
requestId=request_id,
202207
operation=operation,
203208
scheduledTime=None,
204209
input=get_string_value(encoded_input),
@@ -208,9 +213,13 @@ def new_call_entity_action(id: int, parent_instance_id: str, entity_id: EntityIn
208213
)))
209214

210215

211-
def new_signal_entity_action(id: int, entity_id: EntityInstanceId, operation: str, encoded_input: Optional[str]):
216+
def new_signal_entity_action(id: int,
217+
entity_id: EntityInstanceId,
218+
operation: str,
219+
encoded_input: Optional[str],
220+
request_id: str) -> pb.OrchestratorAction:
212221
return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationSignaled=pb.EntityOperationSignaledEvent(
213-
requestId=f"{entity_id}:{id}",
222+
requestId=request_id,
214223
operation=operation,
215224
scheduledTime=None,
216225
input=get_string_value(encoded_input),

durabletask/task.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,22 @@ def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None:
258258
"""
259259
pass
260260

261+
@abstractmethod
262+
def new_uuid(self) -> str:
263+
"""Create a new UUID that is safe for replay within an orchestration or operation.
264+
265+
The default implementation of this method creates a name-based UUID
266+
using the algorithm from RFC 4122 §4.3. The name input used to generate
267+
this value is a combination of the orchestration instance ID and an
268+
internally managed sequence number.
269+
270+
Returns
271+
-------
272+
str
273+
New UUID that is safe for replay within an orchestration or operation.
274+
"""
275+
pass
276+
261277
@abstractmethod
262278
def _exit_critical_section(self) -> None:
263279
pass

durabletask/worker.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from types import GeneratorType
1414
from enum import Enum
1515
from typing import Any, Generator, Optional, Sequence, TypeVar, Union
16+
import uuid
1617
from packaging.version import InvalidVersion, parse
1718

1819
import grpc
@@ -33,6 +34,7 @@
3334

3435
TInput = TypeVar("TInput")
3536
TOutput = TypeVar("TOutput")
37+
DATETIME_STRING_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
3638

3739

3840
class ConcurrencyOptions:
@@ -831,6 +833,7 @@ def __init__(self, instance_id: str, registry: _Registry):
831833
# Maps criticalSectionId to task ID
832834
self._entity_lock_id_map: dict[str, int] = {}
833835
self._sequence_number = 0
836+
self._new_uuid_counter = 0
834837
self._current_utc_datetime = datetime(1000, 1, 1)
835838
self._instance_id = instance_id
836839
self._registry = registry
@@ -1165,7 +1168,7 @@ def call_entity_function_helper(
11651168
raise RuntimeError(error_message)
11661169

11671170
encoded_input = shared.to_json(input) if input is not None else None
1168-
action = ph.new_call_entity_action(id, self.instance_id, entity_id, operation, encoded_input)
1171+
action = ph.new_call_entity_action(id, self.instance_id, entity_id, operation, encoded_input, self.new_uuid())
11691172
self._pending_actions[id] = action
11701173

11711174
fn_task = task.CompletableTask()
@@ -1188,7 +1191,7 @@ def signal_entity_function_helper(
11881191

11891192
encoded_input = shared.to_json(input) if input is not None else None
11901193

1191-
action = ph.new_signal_entity_action(id, entity_id, operation, encoded_input)
1194+
action = ph.new_signal_entity_action(id, entity_id, operation, encoded_input, self.new_uuid())
11921195
self._pending_actions[id] = action
11931196

11941197
def lock_entities_function_helper(self, id: int, entities: list[EntityInstanceId]) -> None:
@@ -1199,7 +1202,7 @@ def lock_entities_function_helper(self, id: int, entities: list[EntityInstanceId
11991202
if not transition_valid:
12001203
raise RuntimeError(error_message)
12011204

1202-
critical_section_id = f"{self.instance_id}:{id:04x}"
1205+
critical_section_id = self.new_uuid()
12031206

12041207
request, target = self._entity_context.emit_acquire_message(critical_section_id, entities)
12051208

@@ -1251,6 +1254,17 @@ def continue_as_new(self, new_input, *, save_events: bool = False) -> None:
12511254

12521255
self.set_continued_as_new(new_input, save_events)
12531256

1257+
def new_uuid(self) -> str:
1258+
URL_NAMESPACE: str = "9e952958-5e33-4daf-827f-2fa12937b875"
1259+
1260+
uuid_name_value = \
1261+
f"{self._instance_id}" \
1262+
f"_{self.current_utc_datetime.strftime(DATETIME_STRING_FORMAT)}" \
1263+
f"_{self._new_uuid_counter}"
1264+
self._new_uuid_counter += 1
1265+
namespace_uuid = uuid.uuid5(uuid.NAMESPACE_OID, URL_NAMESPACE)
1266+
return str(uuid.uuid5(namespace_uuid, uuid_name_value))
1267+
12541268

12551269
class ExecutionResults:
12561270
actions: list[pb.OrchestratorAction]

tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,3 +532,39 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _):
532532
assert state.serialized_input is None
533533
assert state.serialized_output is None
534534
assert state.serialized_custom_status == "\"foobaz\""
535+
536+
537+
def test_new_uuid():
538+
def noop(_: task.ActivityContext, _):
539+
pass
540+
541+
def empty_orchestrator(ctx: task.OrchestrationContext, _):
542+
# Assert that two new_uuid calls return different values
543+
results = [ctx.new_uuid(), ctx.new_uuid()]
544+
yield ctx.call_activity("noop")
545+
# Assert that new_uuid still returns a unique value after replay
546+
results.append(ctx.new_uuid())
547+
return results
548+
549+
# Start a worker, which will connect to the sidecar in a background thread
550+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
551+
taskhub=taskhub_name, token_credential=None) as w:
552+
w.add_orchestrator(empty_orchestrator)
553+
w.add_activity(noop)
554+
w.start()
555+
556+
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
557+
taskhub=taskhub_name, token_credential=None)
558+
id = c.schedule_new_orchestration(empty_orchestrator)
559+
state = c.wait_for_orchestration_completion(id, timeout=30)
560+
561+
assert state is not None
562+
assert state.name == task.get_name(empty_orchestrator)
563+
assert state.instance_id == id
564+
assert state.failure_details is None
565+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
566+
results = json.loads(state.serialized_output or "\"\"")
567+
assert isinstance(results, list) and len(results) == 3
568+
assert results[0] != results[1]
569+
assert results[0] != results[2]
570+
assert results[1] != results[2]

tests/durabletask/test_orchestration_e2e.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,3 +499,37 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _):
499499
assert state.serialized_input is None
500500
assert state.serialized_output is None
501501
assert state.serialized_custom_status == "\"foobaz\""
502+
503+
504+
def test_new_uuid():
505+
def noop(_: task.ActivityContext, _):
506+
pass
507+
508+
def empty_orchestrator(ctx: task.OrchestrationContext, _):
509+
# Assert that two new_uuid calls return different values
510+
results = [ctx.new_uuid(), ctx.new_uuid()]
511+
yield ctx.call_activity("noop")
512+
# Assert that new_uuid still returns a unique value after replay
513+
results.append(ctx.new_uuid())
514+
return results
515+
516+
# Start a worker, which will connect to the sidecar in a background thread
517+
with worker.TaskHubGrpcWorker() as w:
518+
w.add_orchestrator(empty_orchestrator)
519+
w.add_activity(noop)
520+
w.start()
521+
522+
c = client.TaskHubGrpcClient()
523+
id = c.schedule_new_orchestration(empty_orchestrator)
524+
state = c.wait_for_orchestration_completion(id, timeout=30)
525+
526+
assert state is not None
527+
assert state.name == task.get_name(empty_orchestrator)
528+
assert state.instance_id == id
529+
assert state.failure_details is None
530+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
531+
results = json.loads(state.serialized_output or "\"\"")
532+
assert isinstance(results, list) and len(results) == 3
533+
assert results[0] != results[1]
534+
assert results[0] != results[2]
535+
assert results[1] != results[2]

0 commit comments

Comments
 (0)