Skip to content

Commit f5801e8

Browse files
vertex-sdk-botcopybara-github
authored andcommitted
chore!: Switch tracing APIs in preview AdkApp.
Currently AdkApp uses `cloudtrace.googleapis.com` for GCP tracing. This change switches it to `telemetry.googleapis.com`. It's a breaking change as users might need to enable the new API. PiperOrigin-RevId: 825486202
1 parent 85cbb75 commit f5801e8

File tree

3 files changed

+54
-54
lines changed

3 files changed

+54
-54
lines changed

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@
151151
"google-cloud-trace < 2",
152152
"opentelemetry-sdk < 2",
153153
"opentelemetry-exporter-gcp-trace < 2",
154+
"opentelemetry-exporter-otlp-proto-http < 2",
154155
"pydantic >= 2.11.1, < 3",
155156
"typing_extensions",
156157
]

tests/unit/vertex_adk/test_reasoning_engine_templates_adk.py

Lines changed: 10 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import base64
1717
import importlib
1818
import json
19-
import dataclasses
2019
import os
2120
from unittest import mock
2221
from typing import Optional
@@ -112,27 +111,11 @@ def simple_span_processor_mock():
112111

113112

114113
@pytest.fixture
115-
def cloud_trace_exporter_mock():
116-
import sys
117-
import opentelemetry
118-
119-
mock_cloud_trace_exporter = mock.Mock()
120-
121-
opentelemetry.exporter = type(sys)("exporter")
122-
opentelemetry.exporter.cloud_trace = type(sys)("cloud_trace")
123-
opentelemetry.exporter.cloud_trace.CloudTraceSpanExporter = (
124-
mock_cloud_trace_exporter
125-
)
126-
127-
sys.modules["opentelemetry.exporter"] = opentelemetry.exporter
128-
sys.modules["opentelemetry.exporter.cloud_trace"] = (
129-
opentelemetry.exporter.cloud_trace
130-
)
131-
132-
yield mock_cloud_trace_exporter
133-
134-
del sys.modules["opentelemetry.exporter.cloud_trace"]
135-
del sys.modules["opentelemetry.exporter"]
114+
def otlp_span_exporter_mock():
115+
with mock.patch(
116+
"opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter"
117+
) as otlp_span_exporter_mock:
118+
yield otlp_span_exporter_mock
136119

137120

138121
@pytest.fixture
@@ -619,9 +602,9 @@ def test_default_instrumentor_enablement(
619602
)
620603
def test_tracing_setup(
621604
self,
622-
trace_provider_mock: mock.Mock,
623-
cloud_trace_exporter_mock: mock.Mock,
624605
monkeypatch: pytest.MonkeyPatch,
606+
trace_provider_mock: mock.Mock,
607+
otlp_span_exporter_mock: mock.Mock,
625608
):
626609
monkeypatch.setattr(
627610
"uuid.uuid4", lambda: uuid.UUID("12345678123456781234567812345678")
@@ -643,17 +626,9 @@ def test_tracing_setup(
643626
"some-attribute": "some-value",
644627
}
645628

646-
@dataclasses.dataclass
647-
class RegexMatchingAll:
648-
keys: set[str]
649-
650-
def __eq__(self, regex: object) -> bool:
651-
return isinstance(regex, str) and set(regex.split("|")) == self.keys
652-
653-
cloud_trace_exporter_mock.assert_called_once_with(
654-
project_id=_TEST_PROJECT,
655-
client=mock.ANY,
656-
resource_regex=RegexMatchingAll(keys=set(expected_attributes.keys())),
629+
otlp_span_exporter_mock.assert_called_once_with(
630+
session=mock.ANY,
631+
endpoint="https://telemetry.googleapis.com/v1/traces",
657632
)
658633

659634
assert (
@@ -685,7 +660,6 @@ def test_span_content_capture_enabled_with_tracing(self):
685660
def test_enable_tracing(
686661
self,
687662
caplog,
688-
cloud_trace_exporter_mock,
689663
tracer_provider_mock,
690664
simple_span_processor_mock,
691665
):

vertexai/preview/reasoning_engines/templates/adk.py

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,28 @@ def _warn(msg: str):
233233
_warn._LOGGER.warning(msg) # pyright: ignore[reportFunctionMemberAccess]
234234

235235

236+
def _force_flush_traces():
237+
try:
238+
import opentelemetry.trace
239+
except (ImportError, AttributeError):
240+
_warn(
241+
"Could not force flush traces. opentelemetry-api is not installed. Please call 'pip install google-cloud-aiplatform[agent_engines]'."
242+
)
243+
return None
244+
245+
try:
246+
import opentelemetry.sdk.trace
247+
except (ImportError, AttributeError):
248+
_warn(
249+
"Could not force flush traces. opentelemetry-sdk is not installed. Please call 'pip install google-cloud-aiplatform[agent_engines]'."
250+
)
251+
return None
252+
253+
provider = opentelemetry.trace.get_tracer_provider()
254+
if isinstance(provider, opentelemetry.sdk.trace.TracerProvider):
255+
_ = provider.force_flush()
256+
257+
236258
def _default_instrumentor_builder(
237259
project_id: str,
238260
*,
@@ -313,28 +335,23 @@ def _detect_cloud_resource_id(project_id: str) -> Optional[str]:
313335

314336
if enable_tracing:
315337
try:
316-
import opentelemetry.exporter.cloud_trace
317-
except (ImportError, AttributeError):
318-
return _warn_missing_dependency(
319-
"opentelemetry-exporter-gcp-trace", needed_for_tracing=True
320-
)
321-
322-
try:
323-
import google.cloud.trace_v2
338+
import opentelemetry.exporter.otlp.proto.http.trace_exporter
339+
import google.auth.transport.requests
324340
except (ImportError, AttributeError):
325341
return _warn_missing_dependency(
326-
"google-cloud-trace", needed_for_tracing=True
342+
"opentelemetry-exporter-otlp-proto-http", needed_for_tracing=True
327343
)
328344

329345
import google.auth
330346

331347
credentials, _ = google.auth.default()
332-
span_exporter = opentelemetry.exporter.cloud_trace.CloudTraceSpanExporter(
333-
project_id=project_id,
334-
client=google.cloud.trace_v2.TraceServiceClient(
335-
credentials=credentials.with_quota_project(project_id),
336-
),
337-
resource_regex="|".join(resource.attributes.keys()),
348+
span_exporter = (
349+
opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter(
350+
session=google.auth.transport.requests.AuthorizedSession(
351+
credentials=credentials
352+
),
353+
endpoint="https://telemetry.googleapis.com/v1/traces",
354+
)
338355
)
339356
span_processor = opentelemetry.sdk.trace.export.BatchSpanProcessor(
340357
span_exporter=span_exporter,
@@ -874,9 +891,14 @@ async def async_stream_query(
874891
**kwargs,
875892
)
876893

877-
async for event in events_async:
878-
# Yield the event data as a dictionary
879-
yield _utils.dump_event_for_json(event)
894+
try:
895+
async for event in events_async:
896+
# Yield the event data as a dictionary
897+
yield _utils.dump_event_for_json(event)
898+
finally:
899+
# Avoid trace data loss having to do with CPU throttling on instance turndown
900+
if self._tracing_enabled():
901+
_force_flush_traces()
880902

881903
def streaming_agent_run_with_events(self, request_json: str):
882904
import json
@@ -937,6 +959,9 @@ async def _invoke_agent_async():
937959
user_id=request.user_id,
938960
session_id=session.id,
939961
)
962+
# Avoid trace data loss having to do with CPU throttling on instance turndown
963+
if self._tracing_enabled():
964+
_force_flush_traces()
940965

941966
def _asyncio_thread_main():
942967
try:

0 commit comments

Comments
 (0)