Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions azure/durable_functions/entity.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .models import DurableEntityContext
from .models.entities import OperationResult, EntityState
from datetime import datetime
from datetime import datetime, timezone
from typing import Callable, Any, List, Dict


Expand Down Expand Up @@ -49,7 +49,7 @@ def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) ->
for operation_data in batch:
result: Any = None
is_error: bool = False
start_time: datetime = datetime.now()
start_time: datetime = datetime.now(timezone.utc)

try:
# populate context
Expand All @@ -74,6 +74,7 @@ def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) ->
operation_result = OperationResult(
is_error=is_error,
duration=duration,
start_time=int(start_time.timestamp() * 1000),
result=result
)
response.results.append(operation_result)
Expand Down Expand Up @@ -119,7 +120,7 @@ def _elapsed_milliseconds_since(self, start_time: datetime) -> int:
int
The time, in millseconds, from start_time to now
"""
end_time = datetime.now()
end_time = datetime.now(timezone.utc)
time_diff = end_time - start_time
elapsed_time = int(time_diff.total_seconds() * 1000)
return elapsed_time
39 changes: 25 additions & 14 deletions azure/durable_functions/models/DurableOrchestrationClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,7 @@ async def start_new(self,
request_url = self._get_start_new_url(
instance_id=instance_id, orchestration_function_name=orchestration_function_name)

# Get the current span
current_span = trace.get_current_span()
span_context = current_span.get_span_context()

# Get the traceparent and tracestate from the span context
# Follows the W3C Trace Context specification for traceparent
# https://www.w3.org/TR/trace-context/#traceparent-header
trace_id = format(span_context.trace_id, '032x')
span_id = format(span_context.span_id, '016x')
trace_flags = format(span_context.trace_flags, '02x')
trace_parent = f"00-{trace_id}-{span_id}-{trace_flags}"

trace_state = span_context.trace_state
trace_parent, trace_state = DurableOrchestrationClient._get_current_activity_context()

response: List[Any] = await self._post_async_request(
request_url,
Expand Down Expand Up @@ -563,9 +551,14 @@ async def signal_entity(self, entityId: EntityId, operation_name: str,
entity_Id=entityId)

request_url = options.to_url(self._orchestration_bindings.rpc_base_url)

trace_parent, trace_state = DurableOrchestrationClient._get_current_activity_context()

response = await self._post_async_request(
request_url,
json.dumps(operation_input) if operation_input else None)
json.dumps(operation_input) if operation_input else None,
trace_parent,
trace_state)

switch_statement = {
202: lambda: None # signal accepted
Expand Down Expand Up @@ -797,3 +790,21 @@ async def resume(self, instance_id: str, reason: str) -> None:
error_message = has_error_message()
if error_message:
raise Exception(error_message)

@staticmethod
def _get_current_activity_context() -> tuple[str, str]:
# Get the current span
current_span = trace.get_current_span()
span_context = current_span.get_span_context()

# Get the traceparent and tracestate from the span context
# Follows the W3C Trace Context specification for traceparent
# https://www.w3.org/TR/trace-context/#traceparent-header
trace_id = format(span_context.trace_id, '032x')
span_id = format(span_context.span_id, '016x')
trace_flags = format(span_context.trace_flags, '02x')
trace_parent = f"00-{trace_id}-{span_id}-{trace_flags}"

trace_state = span_context.trace_state

return trace_parent, trace_state
16 changes: 16 additions & 0 deletions azure/durable_functions/models/entities/OperationResult.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class OperationResult:
def __init__(self,
is_error: bool,
duration: int,
start_time: int,
result: Optional[str] = None):
"""Instantiate an OperationResult.

Expand All @@ -21,11 +22,14 @@ def __init__(self,
Whether or not the operation resulted in an exception.
duration: int
How long the operation took, in milliseconds.
start_time: int
The start time of this operation's execution, in milliseconds, since January 1st 1970 midnight in UTC.
result: Optional[str]
The operation result. Defaults to None.
"""
self._is_error: bool = is_error
self._duration: int = duration
self._start_time: int = start_time
self._result: Optional[str] = result

@property
Expand All @@ -49,6 +53,17 @@ def duration(self) -> int:
The duration of this operation, in milliseconds
"""
return self._duration

@property
def start_time(self) -> int:
"""Get the start time of this operation.

Returns
-------
int:
The start time of this operation's execution, in milliseconds, since January 1st 1970 midnight in UTC
"""
return self._start_time

@property
def result(self) -> Any:
Expand All @@ -72,5 +87,6 @@ def to_json(self) -> Dict[str, Any]:
to_json: Dict[str, Any] = {}
to_json["isError"] = self.is_error
to_json["duration"] = self.duration
to_json["startTime"] = self.start_time
to_json["result"] = json.dumps(self.result, default=_serialize_custom_object)
return to_json
2 changes: 2 additions & 0 deletions tests/orchestrator/test_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ def apply_operation(entity_state: EntityState, result: Any, state: Any, is_error
# We cannot control duration, so default it to zero and avoid checking for it
# in later asserts
duration = 0
start_time = 0
operation_result = OperationResult(
is_error=is_error,
duration=duration,
start_time=start_time,
result=result
)
entity_state._results.append(operation_result)
Expand Down
Loading