Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,14 @@ def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str],
))


def new_call_entity_action(id: int, parent_instance_id: str, entity_id: EntityInstanceId, operation: str, encoded_input: Optional[str]):
def new_call_entity_action(id: int,
parent_instance_id: str,
entity_id: EntityInstanceId,
operation: str,
encoded_input: Optional[str],
request_id: str) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationCalled=pb.EntityOperationCalledEvent(
requestId=f"{parent_instance_id}:{id}",
requestId=request_id,
operation=operation,
scheduledTime=None,
input=get_string_value(encoded_input),
Expand All @@ -208,9 +213,13 @@ def new_call_entity_action(id: int, parent_instance_id: str, entity_id: EntityIn
)))


def new_signal_entity_action(id: int, entity_id: EntityInstanceId, operation: str, encoded_input: Optional[str]):
def new_signal_entity_action(id: int,
entity_id: EntityInstanceId,
operation: str,
encoded_input: Optional[str],
request_id: str) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationSignaled=pb.EntityOperationSignaledEvent(
requestId=f"{entity_id}:{id}",
requestId=request_id,
operation=operation,
scheduledTime=None,
input=get_string_value(encoded_input),
Expand Down
16 changes: 16 additions & 0 deletions durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,22 @@ def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None:
"""
pass

@abstractmethod
def new_uuid(self) -> str:
"""Create a new UUID that is safe for replay within an orchestration or operation.

The default implementation of this method creates a name-based UUID
using the algorithm from RFC 4122 §4.3. The name input used to generate
this value is a combination of the orchestration instance ID, the current UTC datetime,
and an internally managed counter.

Returns
-------
str
New UUID that is safe for replay within an orchestration or operation.
"""
pass

@abstractmethod
def _exit_critical_section(self) -> None:
pass
Expand Down
20 changes: 17 additions & 3 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from types import GeneratorType
from enum import Enum
from typing import Any, Generator, Optional, Sequence, TypeVar, Union
import uuid
from packaging.version import InvalidVersion, parse

import grpc
Expand All @@ -33,6 +34,7 @@

TInput = TypeVar("TInput")
TOutput = TypeVar("TOutput")
DATETIME_STRING_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'


class ConcurrencyOptions:
Expand Down Expand Up @@ -831,6 +833,7 @@ def __init__(self, instance_id: str, registry: _Registry):
# Maps criticalSectionId to task ID
self._entity_lock_id_map: dict[str, int] = {}
self._sequence_number = 0
self._new_uuid_counter = 0
self._current_utc_datetime = datetime(1000, 1, 1)
self._instance_id = instance_id
self._registry = registry
Expand Down Expand Up @@ -1165,7 +1168,7 @@ def call_entity_function_helper(
raise RuntimeError(error_message)

encoded_input = shared.to_json(input) if input is not None else None
action = ph.new_call_entity_action(id, self.instance_id, entity_id, operation, encoded_input)
action = ph.new_call_entity_action(id, self.instance_id, entity_id, operation, encoded_input, self.new_uuid())
self._pending_actions[id] = action

fn_task = task.CompletableTask()
Expand All @@ -1188,7 +1191,7 @@ def signal_entity_function_helper(

encoded_input = shared.to_json(input) if input is not None else None

action = ph.new_signal_entity_action(id, entity_id, operation, encoded_input)
action = ph.new_signal_entity_action(id, entity_id, operation, encoded_input, self.new_uuid())
self._pending_actions[id] = action

def lock_entities_function_helper(self, id: int, entities: list[EntityInstanceId]) -> None:
Expand All @@ -1199,7 +1202,7 @@ def lock_entities_function_helper(self, id: int, entities: list[EntityInstanceId
if not transition_valid:
raise RuntimeError(error_message)

critical_section_id = f"{self.instance_id}:{id:04x}"
critical_section_id = self.new_uuid()

request, target = self._entity_context.emit_acquire_message(critical_section_id, entities)

Expand Down Expand Up @@ -1251,6 +1254,17 @@ def continue_as_new(self, new_input, *, save_events: bool = False) -> None:

self.set_continued_as_new(new_input, save_events)

def new_uuid(self) -> str:
NAMESPACE_UUID: str = "9e952958-5e33-4daf-827f-2fa12937b875"

uuid_name_value = \
f"{self._instance_id}" \
f"_{self.current_utc_datetime.strftime(DATETIME_STRING_FORMAT)}" \
f"_{self._new_uuid_counter}"
self._new_uuid_counter += 1
namespace_uuid = uuid.uuid5(uuid.NAMESPACE_OID, NAMESPACE_UUID)
return str(uuid.uuid5(namespace_uuid, uuid_name_value))


class ExecutionResults:
actions: list[pb.OrchestratorAction]
Expand Down
37 changes: 37 additions & 0 deletions tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import threading
from datetime import timedelta
import uuid

import pytest

Expand Down Expand Up @@ -532,3 +533,39 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _):
assert state.serialized_input is None
assert state.serialized_output is None
assert state.serialized_custom_status == "\"foobaz\""


def test_new_uuid():
def noop(_: task.ActivityContext, _1):
pass

def empty_orchestrator(ctx: task.OrchestrationContext, _):
# Assert that two new_uuid calls return different values
results = [ctx.new_uuid(), ctx.new_uuid()]
yield ctx.call_activity("noop")
# Assert that new_uuid still returns a unique value after replay
results.append(ctx.new_uuid())
return results

# Start a worker, which will connect to the sidecar in a background thread
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
taskhub=taskhub_name, token_credential=None) as w:
w.add_orchestrator(empty_orchestrator)
w.add_activity(noop)
w.start()

c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
taskhub=taskhub_name, token_credential=None)
id = c.schedule_new_orchestration(empty_orchestrator)
state = c.wait_for_orchestration_completion(id, timeout=30)

assert state is not None
assert state.name == task.get_name(empty_orchestrator)
assert state.instance_id == id
assert state.failure_details is None
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
results = json.loads(state.serialized_output or "\"\"")
assert isinstance(results, list) and len(results) == 3
assert uuid.UUID(results[0]) != uuid.UUID(results[1])
assert uuid.UUID(results[0]) != uuid.UUID(results[2])
assert uuid.UUID(results[1]) != uuid.UUID(results[2])
35 changes: 35 additions & 0 deletions tests/durabletask/test_orchestration_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import threading
import time
from datetime import timedelta
import uuid

import pytest

Expand Down Expand Up @@ -499,3 +500,37 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _):
assert state.serialized_input is None
assert state.serialized_output is None
assert state.serialized_custom_status == "\"foobaz\""


def test_new_uuid():
def noop(_: task.ActivityContext, _1):
pass

def empty_orchestrator(ctx: task.OrchestrationContext, _):
# Assert that two new_uuid calls return different values
results = [ctx.new_uuid(), ctx.new_uuid()]
yield ctx.call_activity("noop")
# Assert that new_uuid still returns a unique value after replay
results.append(ctx.new_uuid())
return results

# Start a worker, which will connect to the sidecar in a background thread
with worker.TaskHubGrpcWorker() as w:
w.add_orchestrator(empty_orchestrator)
w.add_activity(noop)
w.start()

c = client.TaskHubGrpcClient()
id = c.schedule_new_orchestration(empty_orchestrator)
state = c.wait_for_orchestration_completion(id, timeout=30)

assert state is not None
assert state.name == task.get_name(empty_orchestrator)
assert state.instance_id == id
assert state.failure_details is None
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
results = json.loads(state.serialized_output or "\"\"")
assert isinstance(results, list) and len(results) == 3
assert uuid.UUID(results[0]) != uuid.UUID(results[1])
assert uuid.UUID(results[0]) != uuid.UUID(results[2])
assert uuid.UUID(results[1]) != uuid.UUID(results[2])
Loading