Skip to content

Commit 773cfba

Browse files
committed
realize invocation and agent run attributes
1 parent 2d2aa3c commit 773cfba

File tree

4 files changed

+108
-33
lines changed

4 files changed

+108
-33
lines changed

veadk/tracing/telemetry/attributes/extractors/common_attributes_extractors.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ def common_gen_ai_app_name(**kwargs) -> str:
1414
return app_name or "<unknown_app_name>"
1515

1616

17+
def common_gen_ai_agent_name(**kwargs) -> str:
18+
agent_name = kwargs.get("agent_name")
19+
return agent_name or "<unknown_agent_name>"
20+
21+
1722
def common_gen_ai_user_id(**kwargs) -> str:
1823
user_id = kwargs.get("user_id")
1924
return user_id or "<unknown_user_id>"
@@ -28,6 +33,7 @@ def common_gen_ai_session_id(**kwargs) -> str:
2833
"gen_ai.system": common_gen_ai_system,
2934
"gen_ai.system.version": common_gen_ai_system_version,
3035
"gen_ai.app.name": common_gen_ai_app_name,
36+
"gen_ai.agent.name": common_gen_ai_agent_name,
3137
"gen_ai.user.id": common_gen_ai_user_id,
3238
"gen_ai.session.id": common_gen_ai_session_id,
3339
}

veadk/tracing/telemetry/exporters/inmemory_exporter.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414

1515
from typing import Sequence
1616

17+
from opentelemetry.context import (
18+
_SUPPRESS_INSTRUMENTATION_KEY,
19+
attach,
20+
detach,
21+
set_value,
22+
)
1723
from opentelemetry.sdk.trace import ReadableSpan, export
1824
from typing_extensions import override
1925

@@ -23,7 +29,7 @@
2329
logger = get_logger(__name__)
2430

2531

26-
in_memory_exporter_instance = None
32+
inmemory_span_processor = None
2733

2834

2935
# ======== Adapted from Google ADK ========
@@ -64,6 +70,30 @@ def clear(self):
6470
self._spans.clear()
6571

6672

73+
class _InMemorySpanProcessor(export.SimpleSpanProcessor):
74+
def __init__(self, exporter: _InMemoryExporter) -> None:
75+
super().__init__(exporter)
76+
self.spans = []
77+
78+
def on_start(self, span, parent_context) -> None:
79+
if span.context:
80+
self.spans.append(span)
81+
82+
def on_end(self, span: ReadableSpan) -> None:
83+
if span.context:
84+
if not span.context.trace_flags.sampled:
85+
return
86+
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
87+
try:
88+
self.span_exporter.export((span,))
89+
# pylint: disable=broad-exception-caught
90+
except Exception:
91+
logger.exception("Exception while exporting Span.")
92+
detach(token)
93+
if span in self.spans:
94+
self.spans.remove(span)
95+
96+
6797
class InMemoryExporter(BaseExporter):
6898
"""InMemory Exporter mainly for store spans in memory for debugging / observability purposes."""
6999

@@ -73,4 +103,7 @@ def __init__(self, name: str = "inmemory_exporter") -> None:
73103
self.name = name
74104

75105
self._exporter = _InMemoryExporter()
76-
self.processor = export.SimpleSpanProcessor(self._exporter)
106+
self.processor = _InMemorySpanProcessor(self._exporter)
107+
108+
global inmemory_span_processor
109+
inmemory_span_processor = self.processor

veadk/tracing/telemetry/opentelemetry_tracer.py

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from opentelemetry.sdk.resources import Resource
2626
from opentelemetry.sdk.trace import TracerProvider
2727
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
28-
from pydantic import BaseModel, ConfigDict, Field, field_validator
28+
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator
2929
from typing_extensions import override
3030

3131
from veadk.tracing.base_tracer import BaseTracer
@@ -37,8 +37,6 @@
3737

3838
logger = get_logger(__name__)
3939

40-
DEFAULT_VEADK_TRACER_NAME = "veadk_global_tracer"
41-
4240

4341
def update_resource_attributions(provider: TracerProvider, resource_attributes: dict):
4442
provider._resource = provider._resource.merge(Resource.create(resource_attributes))
@@ -47,20 +45,37 @@ def update_resource_attributions(provider: TracerProvider, resource_attributes:
4745
class OpentelemetryTracer(BaseModel, BaseTracer):
4846
model_config = ConfigDict(arbitrary_types_allowed=True)
4947

50-
name: str = Field(
51-
default=DEFAULT_VEADK_TRACER_NAME, description="The identifier of tracer."
52-
)
53-
54-
app_name: str = Field(
55-
default="veadk_app",
56-
description="The identifier of app.",
57-
)
48+
name: str = Field(default="veadk_tracer", description="The identifier of tracer.")
5849

5950
exporters: list[BaseExporter] = Field(
6051
default_factory=list,
6152
description="The exporters to export spans.",
6253
)
6354

55+
_app_name: str = PrivateAttr(default="<unknown_app_name>")
56+
57+
_agent_name: str = PrivateAttr(default="<unknown_agent_name>")
58+
59+
@property
60+
def app_name(self) -> str:
61+
return self._app_name
62+
63+
@app_name.setter
64+
def app_name(self, value: str) -> None:
65+
self._app_name = value
66+
# update_common_attributes(self._tracer_provider, {"app_name": self._app_name})
67+
68+
@property
69+
def agent_name(self) -> str:
70+
return self._agent_name
71+
72+
@agent_name.setter
73+
def agent_name(self, value: str) -> None:
74+
self._agent_name = value
75+
# update_common_attributes(
76+
# self._tracer_provider, {"agent_name": self._agent_name}
77+
# )
78+
6479
@field_validator("exporters")
6580
@classmethod
6681
def forbid_inmemory_exporter(cls, v: list[BaseExporter]) -> list[BaseExporter]:
@@ -73,8 +88,6 @@ def model_post_init(self, context: Any) -> None:
7388
patch_google_adk_telemetry()
7489

7590
self._processors = []
76-
self._inmemory_exporter = InMemoryExporter()
77-
self.exporters.append(self._inmemory_exporter)
7891

7992
# VeADK operates on global OpenTelemetry provider, return nothing
8093
self._init_global_tracer_provider()
@@ -121,9 +134,22 @@ def _init_global_tracer_provider(self) -> None:
121134
f"Add span processor for exporter `{exporter.__class__.__name__}` to OpentelemetryTracer failed."
122135
)
123136

124-
logger.debug(
125-
f"Init OpentelemetryTracer with {len(self._processors)} exporters."
126-
)
137+
self._inmemory_exporter = InMemoryExporter()
138+
self._inmemory_exporter_processor = self._inmemory_exporter.processor
139+
140+
# make sure the in memory exporter processor is added at index 0
141+
if self._inmemory_exporter_processor:
142+
global_tracer_provider._active_span_processor._span_processors = (
143+
self._inmemory_exporter_processor,
144+
) + global_tracer_provider._active_span_processor._span_processors
145+
146+
self._processors.append(self._inmemory_exporter_processor)
147+
else:
148+
logger.warning(
149+
"InMemoryExporter processor is not initialized, cannot add to OpentelemetryTracer."
150+
)
151+
152+
logger.info(f"Init OpentelemetryTracer with {len(self._processors)} exporters.")
127153

128154
@property
129155
def trace_file_path(self) -> str:

veadk/tracing/telemetry/telemetry.py

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from google.adk.models.llm_response import LlmResponse
77
from google.adk.tools import BaseTool
88
from opentelemetry import trace
9-
from opentelemetry.trace.span import Span
9+
from opentelemetry.sdk.trace import _Span
1010

1111
from veadk.tracing.telemetry.attributes.attributes import ATTRIBUTES
1212
from veadk.tracing.telemetry.attributes.extractors.llm_attributes_extractors import (
@@ -15,23 +15,32 @@
1515
from veadk.tracing.telemetry.attributes.extractors.tool_attributes_extractors import (
1616
ToolAttributesParams,
1717
)
18+
from veadk.tracing.telemetry.exporters import inmemory_exporter
1819
from veadk.utils.logger import get_logger
1920

2021
logger = get_logger(__name__)
2122

2223

23-
def _set_common_attributes(
24-
span: Span, app_name: str, user_id: str, session_id: str
25-
) -> None:
26-
common_attributes = ATTRIBUTES.get("common", {})
27-
for attr_name, attr_extractor in common_attributes.items():
28-
value = attr_extractor(
29-
app_name=app_name, user_id=user_id, session_id=session_id
30-
)
31-
span.set_attribute(attr_name, value)
24+
def trace_send_data(): ...
3225

3326

34-
def trace_send_data(): ...
27+
def set_common_attributes(current_span: _Span, **kwargs) -> None:
28+
if current_span.context:
29+
current_span_id = current_span.context.trace_id
30+
31+
spans = inmemory_exporter.inmemory_span_processor.spans
32+
33+
spans_in_current_trace = [
34+
span
35+
for span in spans
36+
if span.context and span.context.trace_id == current_span_id
37+
]
38+
39+
common_attributes = ATTRIBUTES.get("common", {})
40+
for span in spans_in_current_trace:
41+
for attr_name, attr_extractor in common_attributes.items():
42+
value = attr_extractor(**kwargs)
43+
span.set_attribute(attr_name, value)
3544

3645

3746
def trace_tool_call(
@@ -63,10 +72,11 @@ def trace_call_llm(
6372
) -> None:
6473
span = trace.get_current_span()
6574

66-
_set_common_attributes(
67-
span=span,
68-
app_name=invocation_context.session.app_name,
69-
user_id=invocation_context.session.user_id,
75+
set_common_attributes(
76+
current_span=span, # type: ignore
77+
agent_name=invocation_context.agent.name,
78+
app_name=invocation_context.app_name,
79+
user_id=invocation_context.user_id,
7080
session_id=invocation_context.session.id,
7181
)
7282

0 commit comments

Comments
 (0)