Skip to content

Commit db3f460

Browse files
refine(tracing): remove global instance and add context to root span (#173)
1 parent 0ca65f3 commit db3f460

File tree

7 files changed

+122
-84
lines changed

7 files changed

+122
-84
lines changed

veadk/tracing/telemetry/attributes/extractors/llm_attributes_extractors.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,12 +252,14 @@ def llm_gen_ai_messages(params: LLMAttributesParams) -> ExtractorResponse:
252252
part.inline_data.display_name.split(
253253
"/"
254254
)[-1]
255-
if part.inline_data.display_name
255+
if part.inline_data
256+
and part.inline_data.display_name
256257
else "<unknown_image_name>"
257258
),
258259
"parts.0.image_url.url": (
259260
part.inline_data.display_name
260-
if part.inline_data.display_name
261+
if part.inline_data
262+
and part.inline_data.display_name
261263
else "<unknown_image_url>"
262264
),
263265
}

veadk/tracing/telemetry/exporters/apmplus_exporter.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import time
1516
from dataclasses import dataclass
1617
from typing import Any
1718

19+
from google.adk.agents.invocation_context import InvocationContext
1820
from google.adk.models.llm_request import LlmRequest
1921
from google.adk.models.llm_response import LlmResponse
20-
from opentelemetry import metrics
22+
from opentelemetry import metrics, trace
2123
from opentelemetry import metrics as metrics_api
2224
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
2325
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
@@ -193,7 +195,13 @@ def __init__(
193195
explicit_bucket_boundaries_advisory=_GEN_AI_SERVER_TIME_PER_OUTPUT_TOKEN_BUCKETS,
194196
)
195197

196-
def record(self, llm_request: LlmRequest, llm_response: LlmResponse) -> None:
198+
def record(
199+
self,
200+
invocation_context: InvocationContext,
201+
event_id: str,
202+
llm_request: LlmRequest,
203+
llm_response: LlmResponse,
204+
) -> None:
197205
attributes = {
198206
"gen_ai_system": "volcengine",
199207
"gen_ai_response_model": llm_request.model,
@@ -217,10 +225,18 @@ def record(self, llm_request: LlmRequest, llm_response: LlmResponse) -> None:
217225
token_attributes = {**attributes, "gen_ai_token_type": "output"}
218226
self.token_usage.record(output_token, attributes=token_attributes)
219227

220-
# TODO: Get llm duration
221-
# duration = 5.0
222-
# if self.duration_histogram:
223-
# self.duration_histogram.record(duration, attributes=attributes)
228+
# Get llm duration
229+
span = trace.get_current_span()
230+
if span and hasattr(span, "start_time") and self.duration_histogram:
231+
# We use span start time as the llm request start time
232+
tik = span.start_time # type: ignore
233+
# We use current time as the llm request end time
234+
tok = time.time_ns()
235+
# Calculate duration in seconds
236+
duration = (tok - tik) / 1e9
237+
self.duration_histogram.record(
238+
duration, attributes=attributes
239+
) # unit in seconds
224240

225241
# Get model request error
226242
if llm_response.error_code and self.chat_exception_counter:
@@ -269,6 +285,8 @@ class APMPlusExporter(BaseExporter):
269285
config: APMPlusExporterConfig = Field(default_factory=APMPlusExporterConfig)
270286

271287
def model_post_init(self, context: Any) -> None:
288+
logger.info(f"APMPlusExporter sevice name: {self.config.service_name}")
289+
272290
headers = {
273291
"x-byteapm-appkey": self.config.app_key,
274292
}

veadk/tracing/telemetry/exporters/cozeloop_exporter.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class CozeloopExporter(BaseExporter):
4242
config: CozeloopExporterConfig = Field(default_factory=CozeloopExporterConfig)
4343

4444
def model_post_init(self, context: Any) -> None:
45+
logger.info(f"CozeloopExporter space ID: {self.config.space_id}")
46+
4547
headers = {
4648
"cozeloop-workspace-id": self.config.space_id,
4749
"authorization": f"Bearer {self.config.token}",

veadk/tracing/telemetry/exporters/inmemory_exporter.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,22 @@ def clear(self):
7575
class _InMemorySpanProcessor(export.SimpleSpanProcessor):
7676
def __init__(self, exporter: _InMemoryExporter) -> None:
7777
super().__init__(exporter)
78-
self.spans = []
7978

8079
def on_start(self, span, parent_context) -> None:
81-
if span.context:
82-
self.spans.append(span)
80+
if span.name.startswith("invocation"):
81+
span.set_attribute("gen_ai.operation.name", "chain")
82+
span.set_attribute("gen_ai.usage.total_tokens", 0)
83+
84+
ctx = set_value("invocation_span_instance", span, context=parent_context)
85+
token = attach(ctx) # mount context on `invocation` root span in Google ADK
86+
setattr(span, "_invocation_token", token) # for later detach
87+
88+
if span.name.startswith("agent_run"):
89+
span.set_attribute("gen_ai.operation.name", "agent")
90+
91+
ctx = set_value("agent_run_span_instance", span, context=parent_context)
92+
token = attach(ctx)
93+
setattr(span, "_agent_run_token", token) # for later detach
8394

8495
def on_end(self, span: ReadableSpan) -> None:
8596
if span.context:
@@ -92,8 +103,14 @@ def on_end(self, span: ReadableSpan) -> None:
92103
except Exception:
93104
logger.exception("Exception while exporting Span.")
94105
detach(token)
95-
if span in self.spans:
96-
self.spans.remove(span)
106+
107+
token = getattr(span, "_invocation_token", None)
108+
if token:
109+
detach(token)
110+
111+
token = getattr(span, "_agent_run_token", None)
112+
if token:
113+
detach(token)
97114

98115

99116
class InMemoryExporter(BaseExporter):
@@ -106,6 +123,3 @@ def __init__(self, name: str = "inmemory_exporter") -> None:
106123

107124
self._exporter = _InMemoryExporter()
108125
self.processor = _InMemorySpanProcessor(self._exporter)
109-
110-
111-
_INMEMORY_EXPORTER_INSTANCE = InMemoryExporter()

veadk/tracing/telemetry/exporters/tls_exporter.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ class TLSExporter(BaseExporter):
4444
config: TLSExporterConfig = Field(default_factory=TLSExporterConfig)
4545

4646
def model_post_init(self, context: Any) -> None:
47+
logger.info(f"TLSExporter topic ID: {self.config.topic_id}")
48+
4749
headers = {
4850
"x-tls-otel-tracetopic": self.config.topic_id,
4951
"x-tls-otel-ak": self.config.access_key,

veadk/tracing/telemetry/opentelemetry_tracer.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,7 @@
3030
from veadk.tracing.base_tracer import BaseTracer
3131
from veadk.tracing.telemetry.exporters.apmplus_exporter import APMPlusExporter
3232
from veadk.tracing.telemetry.exporters.base_exporter import BaseExporter
33-
from veadk.tracing.telemetry.exporters.inmemory_exporter import (
34-
_INMEMORY_EXPORTER_INSTANCE,
35-
InMemoryExporter,
36-
)
33+
from veadk.tracing.telemetry.exporters.inmemory_exporter import InMemoryExporter
3734
from veadk.utils.logger import get_logger
3835
from veadk.utils.patches import patch_google_adk_telemetry
3936

@@ -119,7 +116,7 @@ def _init_global_tracer_provider(self) -> None:
119116
f"Add span processor for exporter `{exporter.__class__.__name__}` to OpentelemetryTracer failed."
120117
)
121118

122-
self._inmemory_exporter = _INMEMORY_EXPORTER_INSTANCE
119+
self._inmemory_exporter = InMemoryExporter()
123120
if self._inmemory_exporter.processor:
124121
# make sure the in memory exporter processor is added at index 0
125122
# because we use this to record all spans

veadk/tracing/telemetry/telemetry.py

Lines changed: 66 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,23 @@
2020
from google.adk.models.llm_response import LlmResponse
2121
from google.adk.tools import BaseTool
2222
from opentelemetry import trace
23-
from opentelemetry.sdk.trace import _Span
23+
from opentelemetry.context import get_value
24+
from opentelemetry.sdk.trace import Span, _Span
2425

2526
from veadk.tracing.telemetry.attributes.attributes import ATTRIBUTES
2627
from veadk.tracing.telemetry.attributes.extractors.types import (
2728
ExtractorResponse,
2829
LLMAttributesParams,
2930
ToolAttributesParams,
3031
)
31-
from veadk.tracing.telemetry.exporters.inmemory_exporter import (
32-
_INMEMORY_EXPORTER_INSTANCE,
33-
)
3432
from veadk.utils.logger import get_logger
3533

3634
logger = get_logger(__name__)
3735

3836

39-
def upload_metrics(
37+
def _upload_metrics(
4038
invocation_context: InvocationContext,
39+
event_id: str,
4140
llm_request: LlmRequest,
4241
llm_response: LlmResponse,
4342
) -> None:
@@ -48,11 +47,13 @@ def upload_metrics(
4847
for tracer in tracers:
4948
for exporter in getattr(tracer, "exporters", []):
5049
if getattr(exporter, "meter_uploader", None):
51-
exporter.meter_uploader.record(llm_request, llm_response)
50+
exporter.meter_uploader.record(
51+
invocation_context, event_id, llm_request, llm_response
52+
)
5253

5354

5455
def _set_agent_input_attribute(
55-
span: _Span, invocation_context: InvocationContext
56+
span: Span, invocation_context: InvocationContext
5657
) -> None:
5758
# We only save the original user input as the agent input
5859
# hence once the `agent.input` has been set, we don't overwrite it
@@ -106,7 +107,7 @@ def _set_agent_input_attribute(
106107
)
107108

108109

109-
def _set_agent_output_attribute(span: _Span, llm_response: LlmResponse) -> None:
110+
def _set_agent_output_attribute(span: Span, llm_response: LlmResponse) -> None:
110111
content = llm_response.content
111112
if content and content.parts:
112113
for idx, part in enumerate(content.parts):
@@ -126,67 +127,64 @@ def set_common_attributes_on_model_span(
126127
current_span: _Span,
127128
**kwargs,
128129
) -> None:
129-
if current_span.context:
130-
current_span_id = current_span.context.trace_id
131-
else:
132-
logger.warning(
133-
"Current span context is missing, failed to get `trace_id` to set common attributes."
134-
)
135-
return
136-
130+
common_attributes = ATTRIBUTES.get("common", {})
137131
try:
138-
spans = _INMEMORY_EXPORTER_INSTANCE.processor.spans # type: ignore
139-
140-
spans_in_current_trace = [
141-
span
142-
for span in spans
143-
if span.context and span.context.trace_id == current_span_id
144-
]
145-
146-
common_attributes = ATTRIBUTES.get("common", {})
147-
for span in spans_in_current_trace:
148-
if span.is_recording():
149-
if span.name.startswith("invocation"):
150-
span.set_attribute("gen_ai.operation.name", "chain")
151-
_set_agent_input_attribute(span, invocation_context)
152-
_set_agent_output_attribute(span, llm_response)
153-
elif span.name.startswith("agent_run"):
154-
span.set_attribute("gen_ai.operation.name", "agent")
155-
_set_agent_input_attribute(span, invocation_context)
156-
_set_agent_output_attribute(span, llm_response)
157-
for attr_name, attr_extractor in common_attributes.items():
158-
value = attr_extractor(**kwargs)
159-
span.set_attribute(attr_name, value)
132+
invocation_span: Span = get_value("invocation_span_instance") # type: ignore
133+
agent_run_span: Span = get_value("agent_run_span_instance") # type: ignore
134+
135+
if invocation_span and invocation_span.name.startswith("invocation"):
136+
_set_agent_input_attribute(invocation_span, invocation_context)
137+
_set_agent_output_attribute(invocation_span, llm_response)
138+
for attr_name, attr_extractor in common_attributes.items():
139+
value = attr_extractor(**kwargs)
140+
invocation_span.set_attribute(attr_name, value)
141+
142+
# Calculate the token usage for the whole invocation span
143+
current_step_token_usage = (
144+
llm_response.usage_metadata.total_token_count
145+
if llm_response.usage_metadata
146+
and llm_response.usage_metadata.total_token_count
147+
else 0
148+
)
149+
prev_total_token_usage = (
150+
invocation_span.attributes["gen_ai.usage.total_tokens"]
151+
if invocation_span.attributes
152+
else 0
153+
)
154+
accumulated_total_token_usage = (
155+
current_step_token_usage + int(prev_total_token_usage) # type: ignore
156+
) # we can ignore this warning, cause we manually set the attribute to int before
157+
invocation_span.set_attribute(
158+
"gen_ai.usage.total_tokens", accumulated_total_token_usage
159+
)
160+
161+
if agent_run_span and agent_run_span.name.startswith("agent_run"):
162+
_set_agent_input_attribute(agent_run_span, invocation_context)
163+
_set_agent_output_attribute(agent_run_span, llm_response)
164+
for attr_name, attr_extractor in common_attributes.items():
165+
value = attr_extractor(**kwargs)
166+
agent_run_span.set_attribute(attr_name, value)
167+
168+
for attr_name, attr_extractor in common_attributes.items():
169+
value = attr_extractor(**kwargs)
170+
current_span.set_attribute(attr_name, value)
160171
except Exception as e:
161172
logger.error(f"Failed to set common attributes for spans: {e}")
162173

163174

164175
def set_common_attributes_on_tool_span(current_span: _Span) -> None:
165-
# find parent span (generally a llm span)
166-
if not current_span.context:
167-
logger.warning(
168-
f"Get tool span's context failed. Skip setting common attributes for span {current_span.name}"
169-
)
170-
return
171-
172-
if not current_span.parent:
173-
logger.warning(
174-
f"Get tool span's parent failed. Skip setting common attributes for span {current_span.name}"
175-
)
176-
return
177-
178-
parent_span_id = current_span.parent.span_id
179-
for span in _INMEMORY_EXPORTER_INSTANCE.processor.spans: # type: ignore
180-
if span.context.span_id == parent_span_id:
181-
common_attributes = ATTRIBUTES.get("common", {})
182-
for attr_name in common_attributes.keys():
183-
if hasattr(span.attributes, attr_name):
184-
current_span.set_attribute(attr_name, span.attributes[attr_name])
185-
else:
186-
logger.error(f"Parent span does not have attribute {attr_name}")
176+
common_attributes = ATTRIBUTES.get("common", {})
187177

178+
invocation_span: Span = get_value("invocation_span_instance") # type: ignore
188179

189-
def trace_send_data(): ...
180+
for attr_name in common_attributes.keys():
181+
if (
182+
invocation_span
183+
and invocation_span.name.startswith("invocation")
184+
and invocation_span.attributes
185+
and attr_name in invocation_span.attributes
186+
):
187+
current_span.set_attribute(attr_name, invocation_span.attributes[attr_name])
190188

191189

192190
def trace_tool_call(
@@ -212,7 +210,7 @@ def trace_call_llm(
212210
llm_request: LlmRequest,
213211
llm_response: LlmResponse,
214212
) -> None:
215-
span = trace.get_current_span()
213+
span: Span = trace.get_current_span() # type: ignore
216214

217215
from veadk.agent import Agent
218216

@@ -234,6 +232,7 @@ def trace_call_llm(
234232
span.context.trace_state.get("call_type", "")
235233
if (
236234
hasattr(span, "context")
235+
and span.context
237236
and hasattr(span.context, "trace_state")
238237
and hasattr(span.context.trace_state, "get")
239238
)
@@ -253,4 +252,8 @@ def trace_call_llm(
253252
response: ExtractorResponse = attr_extractor(params)
254253
ExtractorResponse.update_span(span, attr_name, response)
255254

256-
upload_metrics(invocation_context, llm_request, llm_response)
255+
_upload_metrics(invocation_context, event_id, llm_request, llm_response)
256+
257+
258+
# Do not modify this function
259+
def trace_send_data(): ...

0 commit comments

Comments
 (0)