Skip to content

Commit 56d4486

Browse files
authored
Distributed Tracing for Entities (#547)
* Adds support for distributed tracing for Durable entities in Python --------- Authored-by: Sophia Tevosyan <[email protected]>
1 parent 85f306c commit 56d4486

File tree

4 files changed

+52
-18
lines changed

4 files changed

+52
-18
lines changed

azure/durable_functions/entity.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from .models import DurableEntityContext
22
from .models.entities import OperationResult, EntityState
3-
from datetime import datetime
3+
from datetime import datetime, timezone
44
from typing import Callable, Any, List, Dict
55

66

@@ -49,7 +49,7 @@ def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) ->
4949
for operation_data in batch:
5050
result: Any = None
5151
is_error: bool = False
52-
start_time: datetime = datetime.now()
52+
start_time: datetime = datetime.now(timezone.utc)
5353

5454
try:
5555
# populate context
@@ -74,6 +74,7 @@ def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) ->
7474
operation_result = OperationResult(
7575
is_error=is_error,
7676
duration=duration,
77+
execution_start_time_ms=int(start_time.timestamp() * 1000),
7778
result=result
7879
)
7980
response.results.append(operation_result)
@@ -122,7 +123,7 @@ def _elapsed_milliseconds_since(self, start_time: datetime) -> int:
122123
int
123124
The time, in millseconds, from start_time to now
124125
"""
125-
end_time = datetime.now()
126+
end_time = datetime.now(timezone.utc)
126127
time_diff = end_time - start_time
127128
elapsed_time = int(time_diff.total_seconds() * 1000)
128129
return elapsed_time

azure/durable_functions/models/DurableOrchestrationClient.py

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from time import time
55
from asyncio import sleep
66
from urllib.parse import urlparse, quote
7-
from opentelemetry import trace
7+
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
88

99
import azure.functions as func
1010

@@ -72,19 +72,7 @@ async def start_new(self,
7272
request_url = self._get_start_new_url(
7373
instance_id=instance_id, orchestration_function_name=orchestration_function_name)
7474

75-
# Get the current span
76-
current_span = trace.get_current_span()
77-
span_context = current_span.get_span_context()
78-
79-
# Get the traceparent and tracestate from the span context
80-
# Follows the W3C Trace Context specification for traceparent
81-
# https://www.w3.org/TR/trace-context/#traceparent-header
82-
trace_id = format(span_context.trace_id, '032x')
83-
span_id = format(span_context.span_id, '016x')
84-
trace_flags = format(span_context.trace_flags, '02x')
85-
trace_parent = f"00-{trace_id}-{span_id}-{trace_flags}"
86-
87-
trace_state = span_context.trace_state
75+
trace_parent, trace_state = DurableOrchestrationClient._get_current_activity_context()
8876

8977
response: List[Any] = await self._post_async_request(
9078
request_url,
@@ -563,9 +551,14 @@ async def signal_entity(self, entityId: EntityId, operation_name: str,
563551
entity_Id=entityId)
564552

565553
request_url = options.to_url(self._orchestration_bindings.rpc_base_url)
554+
555+
trace_parent, trace_state = DurableOrchestrationClient._get_current_activity_context()
556+
566557
response = await self._post_async_request(
567558
request_url,
568-
json.dumps(operation_input) if operation_input else None)
559+
json.dumps(operation_input) if operation_input else None,
560+
trace_parent,
561+
trace_state)
569562

570563
switch_statement = {
571564
202: lambda: None # signal accepted
@@ -797,3 +790,23 @@ async def resume(self, instance_id: str, reason: str) -> None:
797790
error_message = has_error_message()
798791
if error_message:
799792
raise Exception(error_message)
793+
794+
"""Gets the current trace activity traceparent and tracestate
795+
796+
Returns
797+
-------
798+
tuple[str, str]
799+
A tuple containing the (traceparent, tracestate)
800+
"""
801+
@staticmethod
802+
def _get_current_activity_context() -> tuple[str, str]:
803+
carrier = {}
804+
805+
# Inject the current trace context into the carrier
806+
TraceContextTextMapPropagator().inject(carrier)
807+
808+
# Extract the traceparent and optionally the tracestate
809+
trace_parent = carrier.get("traceparent")
810+
trace_state = carrier.get("tracestate")
811+
812+
return trace_parent, trace_state

azure/durable_functions/models/entities/OperationResult.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class OperationResult:
1212
def __init__(self,
1313
is_error: bool,
1414
duration: int,
15+
execution_start_time_ms: int,
1516
result: Optional[str] = None):
1617
"""Instantiate an OperationResult.
1718
@@ -21,11 +22,15 @@ def __init__(self,
2122
Whether or not the operation resulted in an exception.
2223
duration: int
2324
How long the operation took, in milliseconds.
25+
start_time: int
26+
The start time of this operation's execution, in milliseconds,
27+
since January 1st 1970 midnight in UTC.
2428
result: Optional[str]
2529
The operation result. Defaults to None.
2630
"""
2731
self._is_error: bool = is_error
2832
self._duration: int = duration
33+
self._execution_start_time_ms: int = execution_start_time_ms
2934
self._result: Optional[str] = result
3035

3136
@property
@@ -50,6 +55,18 @@ def duration(self) -> int:
5055
"""
5156
return self._duration
5257

58+
@property
59+
def execution_start_time_ms(self) -> int:
60+
"""Get the start time of this operation.
61+
62+
Returns
63+
-------
64+
int:
65+
The start time of this operation's execution, in milliseconds,
66+
since January 1st 1970 midnight in UTC.
67+
"""
68+
return self._execution_start_time_ms
69+
5370
@property
5471
def result(self) -> Any:
5572
"""Get the operation's result.
@@ -72,5 +89,6 @@ def to_json(self) -> Dict[str, Any]:
7289
to_json: Dict[str, Any] = {}
7390
to_json["isError"] = self.is_error
7491
to_json["duration"] = self.duration
92+
to_json["startTime"] = self.execution_start_time_ms
7593
to_json["result"] = json.dumps(self.result, default=_serialize_custom_object)
7694
return to_json

tests/orchestrator/test_entity.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,9 +209,11 @@ def apply_operation(entity_state: EntityState, result: Any, state: Any, is_error
209209
# We cannot control duration, so default it to zero and avoid checking for it
210210
# in later asserts
211211
duration = 0
212+
start_time = 0
212213
operation_result = OperationResult(
213214
is_error=is_error,
214215
duration=duration,
216+
execution_start_time_ms=start_time,
215217
result=result
216218
)
217219
entity_state._results.append(operation_result)

0 commit comments

Comments
 (0)