Skip to content

Commit ab39602

Browse files
committed
fix: aggregate streaming messages into one event.
1 parent 05bed52 commit ab39602

File tree

1 file changed

+52
-47
lines changed
  • instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai

1 file changed

+52
-47
lines changed

instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/generate_content.py

Lines changed: 52 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,7 @@ def process_completion(
382382
config: Optional[GenerateContentConfigOrDict] = None,
383383
):
384384
self._update_response(response)
385-
self._maybe_log_completion_details(request, response, config)
386-
self._response_index += 1
385+
self._maybe_log_completion_details(request, response.candidates or [], config)
387386

388387
def process_error(self, e: Exception):
389388
self._error_type = str(e.__class__.__name__)
@@ -466,7 +465,7 @@ def _maybe_update_error_type(self, response: GenerateContentResponse):
466465
def _maybe_log_completion_details(
467466
self,
468467
request: Union[ContentListUnion, ContentListUnionDict],
469-
response: GenerateContentResponse,
468+
candidates: list[Candidate],
470469
config: Optional[GenerateContentConfigOrDict] = None,
471470
):
472471
attributes = {
@@ -481,7 +480,7 @@ def _maybe_log_completion_details(
481480
contents=transformers.t_contents(request)
482481
)
483482
output_messages = to_output_messages(
484-
candidates=response.candidates or []
483+
candidates=candidates
485484
)
486485

487486
span = trace.get_current_span()
@@ -791,6 +790,7 @@ def instrumented_generate_content_stream(
791790
config: Optional[GenerateContentConfigOrDict] = None,
792791
**kwargs: Any,
793792
) -> Iterator[GenerateContentResponse]:
793+
candidates: list[Candidate] = []
794794
helper = _GenerateContentInstrumentationHelper(
795795
self,
796796
otel_wrapper,
@@ -818,7 +818,9 @@ def instrumented_generate_content_stream(
818818
helper.sem_conv_opt_in_mode
819819
== _StabilityMode.GEN_AI_LATEST_EXPERIMENTAL
820820
):
821-
helper.process_completion(contents, response, config)
821+
helper._update_response(response)
822+
if response.candidates:
823+
candidates += response.candidates
822824
else:
823825
raise ValueError(
824826
f"Sem Conv opt in mode {helper.sem_conv_opt_in_mode} not supported."
@@ -828,6 +830,7 @@ def instrumented_generate_content_stream(
828830
helper.process_error(error)
829831
raise
830832
finally:
833+
helper._maybe_log_completion_details(contents, candidates, config)
831834
helper.finalize_processing()
832835

833836
return instrumented_generate_content_stream
@@ -923,50 +926,52 @@ async def instrumented_generate_content_stream(
923926
end_on_exit=False,
924927
) as span:
925928
helper.add_request_options_to_span(config)
926-
if helper.sem_conv_opt_in_mode == _StabilityMode.DEFAULT:
927-
helper.process_request(contents, config)
928-
try:
929-
response_async_generator = await wrapped_func(
930-
self,
931-
model=model,
932-
contents=contents,
933-
config=helper.wrapped_config(config),
934-
**kwargs,
935-
)
936-
except Exception as error: # pylint: disable=broad-exception-caught
937-
helper.process_error(error)
938-
helper.finalize_processing()
939-
with trace.use_span(span, end_on_exit=True):
940-
raise
941-
942-
async def _response_async_generator_wrapper():
943-
with trace.use_span(span, end_on_exit=True):
944-
try:
945-
async for response in response_async_generator:
946-
if (
947-
helper.sem_conv_opt_in_mode
948-
== _StabilityMode.DEFAULT
949-
):
950-
helper.process_response(response)
951-
elif (
952-
helper.sem_conv_opt_in_mode
953-
== _StabilityMode.GEN_AI_LATEST_EXPERIMENTAL
954-
):
955-
helper.process_completion(
956-
contents, response, config
957-
)
958-
else:
959-
raise ValueError(
960-
f"Sem Conv opt in mode {helper.sem_conv_opt_in_mode} not supported."
961-
)
962-
yield response
963-
except Exception as error:
964-
helper.process_error(error)
929+
if helper.sem_conv_opt_in_mode == _StabilityMode.DEFAULT:
930+
helper.process_request(contents, config)
931+
try:
932+
response_async_generator = await wrapped_func(
933+
self,
934+
model=model,
935+
contents=contents,
936+
config=helper.wrapped_config(config),
937+
**kwargs,
938+
)
939+
except Exception as error: # pylint: disable=broad-exception-caught
940+
helper.process_error(error)
941+
helper.finalize_processing()
942+
with trace.use_span(span, end_on_exit=True):
965943
raise
966-
finally:
967-
helper.finalize_processing()
968944

969-
return _response_async_generator_wrapper()
945+
async def _response_async_generator_wrapper():
946+
candidates: list[Candidate] = []
947+
with trace.use_span(span, end_on_exit=True):
948+
try:
949+
async for response in response_async_generator:
950+
if (
951+
helper.sem_conv_opt_in_mode
952+
== _StabilityMode.DEFAULT
953+
):
954+
helper.process_response(response)
955+
elif (
956+
helper.sem_conv_opt_in_mode
957+
== _StabilityMode.GEN_AI_LATEST_EXPERIMENTAL
958+
):
959+
helper._update_response(response)
960+
if response.candidates:
961+
candidates += response.candidates
962+
else:
963+
raise ValueError(
964+
f"Sem Conv opt in mode {helper.sem_conv_opt_in_mode} not supported."
965+
)
966+
yield response
967+
except Exception as error:
968+
helper.process_error(error)
969+
raise
970+
finally:
971+
helper._maybe_log_completion_details(contents, candidates, config)
972+
helper.finalize_processing()
973+
974+
return _response_async_generator_wrapper()
970975

971976
return instrumented_generate_content_stream
972977

0 commit comments

Comments
 (0)