@@ -215,6 +215,25 @@ def _warn(msg: str):
215215 _warn ._LOGGER .warning (msg ) # pyright: ignore[reportFunctionMemberAccess]
216216
217217
218+ def _force_flush_traces ():
219+ try :
220+ import opentelemetry .trace
221+ except (ImportError , AttributeError ):
222+ _warn (F"Could not force flush traces. opentelemetry-api is not installed. Please call 'pip install google-cloud-aiplatform[agent_engines]'." )
223+ return None
224+
225+ try :
226+ import opentelemetry .sdk .trace
227+ except (ImportError , AttributeError ):
228+ _warn (F"Could not force flush traces. opentelemetry-sdk is not installed. Please call 'pip install google-cloud-aiplatform[agent_engines]'." )
229+ return None
230+
231+ provider = opentelemetry .trace .get_tracer_provider ()
232+ if isinstance (provider , opentelemetry .sdk .trace .TracerProvider ):
233+ _ = provider .force_flush ()
234+
235+
236+
218237def _default_instrumentor_builder (
219238 project_id : str ,
220239 * ,
@@ -288,28 +307,19 @@ def _detect_cloud_resource_id(project_id: str) -> Optional[str]:
288307
289308 if enable_tracing :
290309 try :
291- import opentelemetry .exporter .cloud_trace
292- except (ImportError , AttributeError ):
293- return _warn_missing_dependency (
294- "opentelemetry-exporter-gcp-trace" , needed_for_tracing = True
295- )
296-
297- try :
298- import google .cloud .trace_v2
310+ import opentelemetry .exporter .otlp .proto .http .trace_exporter
311+ import google .auth .transport .requests
299312 except (ImportError , AttributeError ):
300313 return _warn_missing_dependency (
301- "google-cloud-trace " , needed_for_tracing = True
314+ "opentelemetry-exporter-otlp-proto-http " , needed_for_tracing = True
302315 )
303316
304317 import google .auth
305318
306319 credentials , _ = google .auth .default ()
307- span_exporter = opentelemetry .exporter .cloud_trace .CloudTraceSpanExporter (
308- project_id = project_id ,
309- client = google .cloud .trace_v2 .TraceServiceClient (
310- credentials = credentials .with_quota_project (project_id ),
311- ),
312- resource_regex = "|" .join (resource .attributes .keys ()),
320+ span_exporter = opentelemetry .exporter .otlp .proto .http .trace_exporter .OTLPSpanExporter (
321+ session = google .auth .transport .requests .AuthorizedSession (credentials = credentials ),
322+ endpoint = "https://telemetry.googleapis.com/v1/traces"
313323 )
314324 span_processor = opentelemetry .sdk .trace .export .BatchSpanProcessor (
315325 span_exporter = span_exporter ,
@@ -646,54 +656,17 @@ def set_up(self):
646656 else :
647657 os .environ ["ADK_CAPTURE_MESSAGE_CONTENT_IN_SPANS" ] = "false"
648658
649- GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY = (
650- "GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY"
651- )
652-
653- def telemetry_enabled () -> Optional [bool ]:
654- return (
655- os .getenv (GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY , "0" ).lower ()
656- in ("true" , "1" )
657- if GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY in os .environ
658- else None
659- )
660-
661- # Tracing enablement follows truth table:
662- def tracing_enabled () -> bool :
663- """Tracing enablement follows true table:
664-
665- | enable_tracing | enable_telemetry(env) | tracing_actually_enabled |
666- |----------------|-----------------------|--------------------------|
667- | false | false | false |
668- | false | true | false |
669- | false | None | false |
670- | true | false | false |
671- | true | true | true |
672- | true | None | true |
673- | None(default) | false | false |
674- | None(default) | true | adk_version >= 1.17 |
675- | None(default) | None | false |
676- """
677- enable_tracing : Optional [bool ] = self ._tmpl_attrs .get ("enable_tracing" )
678- enable_telemetry : Optional [bool ] = telemetry_enabled ()
679-
680- return (enable_tracing is True and enable_telemetry is not False ) or (
681- enable_tracing is None
682- and enable_telemetry is True
683- and is_version_sufficient ("1.17.0" )
684- )
685-
686- enable_logging = bool (telemetry_enabled ())
659+ enable_logging = bool (self ._telemetry_enabled ())
687660
688661 custom_instrumentor = self ._tmpl_attrs .get ("instrumentor_builder" )
689662
690- if custom_instrumentor and tracing_enabled ():
663+ if custom_instrumentor and self . _tracing_enabled ():
691664 self ._tmpl_attrs ["instrumentor" ] = custom_instrumentor (project )
692665
693666 if not custom_instrumentor :
694667 self ._tmpl_attrs ["instrumentor" ] = _default_instrumentor_builder (
695668 project ,
696- enable_tracing = tracing_enabled (),
669+ enable_tracing = self . _tracing_enabled (),
697670 enable_logging = enable_logging ,
698671 )
699672
@@ -847,9 +820,13 @@ async def async_stream_query(
847820 ** kwargs ,
848821 )
849822
850- async for event in events_async :
851- # Yield the event data as a dictionary
852- yield _utils .dump_event_for_json (event )
823+ try :
824+ async for event in events_async :
825+ # Yield the event data as a dictionary
826+ yield _utils .dump_event_for_json (event )
827+ finally :
828+ if self ._tracing_enabled ():
829+ _force_flush_traces ()
853830
854831 async def streaming_agent_run_with_events (self , request_json : str ):
855832 """Streams responses asynchronously from the ADK application.
@@ -920,6 +897,8 @@ async def streaming_agent_run_with_events(self, request_json: str):
920897 user_id = request .user_id ,
921898 session_id = session .id ,
922899 )
900+ if self ._tracing_enabled ():
901+ _force_flush_traces ()
923902
924903 async def async_get_session (
925904 self ,
@@ -1105,3 +1084,43 @@ def register_operations(self) -> Dict[str, List[str]]:
11051084 "streaming_agent_run_with_events" ,
11061085 ],
11071086 }
1087+
1088+ def _telemetry_enabled (self ) -> Optional [bool ]:
1089+ import os
1090+
1091+ GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY = (
1092+ "GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY"
1093+ )
1094+
1095+ return (
1096+ os .getenv (GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY , "0" ).lower ()
1097+ in ("true" , "1" )
1098+ if GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY in os .environ
1099+ else None
1100+ )
1101+
1102+ # Tracing enablement follows truth table:
1103+ def _tracing_enabled (self ) -> bool :
1104+ """Tracing enablement follows true table:
1105+
1106+ | enable_tracing | enable_telemetry(env) | tracing_actually_enabled |
1107+ |----------------|-----------------------|--------------------------|
1108+ | false | false | false |
1109+ | false | true | false |
1110+ | false | None | false |
1111+ | true | false | false |
1112+ | true | true | true |
1113+ | true | None | true |
1114+ | None(default) | false | false |
1115+ | None(default) | true | adk_version >= 1.17 |
1116+ | None(default) | None | false |
1117+ """
1118+ enable_tracing : Optional [bool ] = self ._tmpl_attrs .get ("enable_tracing" )
1119+ enable_telemetry : Optional [bool ] = self ._telemetry_enabled ()
1120+
1121+ return (enable_tracing is True and enable_telemetry is not False ) or (
1122+ enable_tracing is None
1123+ and enable_telemetry is True
1124+ and is_version_sufficient ("1.17.0" )
1125+ )
1126+
0 commit comments