Skip to content

Commit a056a00

Browse files
committed
Fix agent span message aggregation
1 parent 4acdc2b commit a056a00

File tree

2 files changed

+254
-6
lines changed

2 files changed

+254
-6
lines changed

instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/genai_semantic_processor.py

Lines changed: 94 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -687,9 +687,12 @@ def _normalize_messages_to_role_parts(
687687
)
688688
parts.append(p)
689689

690-
normalized.append(
691-
{"role": role, "parts": parts or self._redacted_text_parts()}
692-
)
690+
if parts:
691+
normalized.append({"role": role, "parts": parts})
692+
elif not self.include_sensitive_data:
693+
normalized.append(
694+
{"role": role, "parts": self._redacted_text_parts()}
695+
)
693696

694697
return normalized
695698

@@ -946,11 +949,17 @@ def _update_agent_aggregate(
946949
},
947950
)
948951
if payload.input_messages:
949-
entry["input_messages"].extend(payload.input_messages)
952+
entry["input_messages"] = self._merge_content_sequence(
953+
entry["input_messages"], payload.input_messages
954+
)
950955
if payload.output_messages:
951-
entry["output_messages"].extend(payload.output_messages)
956+
entry["output_messages"] = self._merge_content_sequence(
957+
entry["output_messages"], payload.output_messages
958+
)
952959
if payload.system_instructions:
953-
entry["system_instructions"].extend(payload.system_instructions)
960+
entry["system_instructions"] = self._merge_content_sequence(
961+
entry["system_instructions"], payload.system_instructions
962+
)
954963

955964
def _infer_output_type(self, span_data: Any) -> str:
956965
"""Infer gen_ai.output.type for multiple span kinds."""
@@ -1512,6 +1521,85 @@ def _get_attributes_from_generation_span_data(
15121521
normalize_output_type(self._infer_output_type(span_data)),
15131522
)
15141523

1524+
def _merge_content_sequence(
1525+
self,
1526+
existing: list[Any],
1527+
incoming: Sequence[Any],
1528+
) -> list[Any]:
1529+
"""Merge normalized message/content lists without duplicating snapshots."""
1530+
if not incoming:
1531+
return existing
1532+
1533+
incoming_list = [self._clone_message(item) for item in incoming]
1534+
1535+
if self.include_sensitive_data:
1536+
filtered = [
1537+
msg
1538+
for msg in incoming_list
1539+
if not self._is_placeholder_message(msg)
1540+
]
1541+
if filtered:
1542+
incoming_list = filtered
1543+
1544+
if not existing:
1545+
return incoming_list
1546+
1547+
result = [self._clone_message(item) for item in existing]
1548+
1549+
for idx, new_msg in enumerate(incoming_list):
1550+
if idx < len(result):
1551+
if (
1552+
self.include_sensitive_data
1553+
and self._is_placeholder_message(new_msg)
1554+
and not self._is_placeholder_message(result[idx])
1555+
):
1556+
continue
1557+
if result[idx] != new_msg:
1558+
result[idx] = self._clone_message(new_msg)
1559+
else:
1560+
if (
1561+
self.include_sensitive_data
1562+
and self._is_placeholder_message(new_msg)
1563+
):
1564+
if (
1565+
any(
1566+
not self._is_placeholder_message(existing_msg)
1567+
for existing_msg in result
1568+
)
1569+
or new_msg in result
1570+
):
1571+
continue
1572+
result.append(self._clone_message(new_msg))
1573+
1574+
return result
1575+
1576+
def _clone_message(self, message: Any) -> Any:
1577+
if isinstance(message, dict):
1578+
return {
1579+
key: self._clone_message(value)
1580+
if isinstance(value, (dict, list))
1581+
else value
1582+
for key, value in message.items()
1583+
}
1584+
if isinstance(message, list):
1585+
return [self._clone_message(item) for item in message]
1586+
return message
1587+
1588+
def _is_placeholder_message(self, message: Any) -> bool:
1589+
if not isinstance(message, dict):
1590+
return False
1591+
parts = message.get("parts")
1592+
if not isinstance(parts, list) or not parts:
1593+
return False
1594+
for part in parts:
1595+
if (
1596+
not isinstance(part, dict)
1597+
or part.get("type") != "text"
1598+
or part.get("content") != "readacted"
1599+
):
1600+
return False
1601+
return True
1602+
15151603
def _get_attributes_from_agent_span_data(
15161604
self,
15171605
span_data: AgentSpanData,

instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_tracer.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import json
44
import sys
55
from pathlib import Path
6+
from types import SimpleNamespace
7+
from typing import Any
68

79
TESTS_ROOT = Path(__file__).resolve().parent
810
stub_path = TESTS_ROOT / "stubs"
@@ -25,6 +27,10 @@
2527
from opentelemetry.instrumentation.openai_agents import ( # noqa: E402
2628
OpenAIAgentsInstrumentor,
2729
)
30+
from opentelemetry.instrumentation.openai_agents.genai_semantic_processor import ( # noqa: E402
31+
ContentPayload,
32+
GenAISemanticProcessor,
33+
)
2834
from opentelemetry.sdk.trace import TracerProvider # noqa: E402
2935

3036
try:
@@ -202,6 +208,160 @@ def test_agent_create_span_records_attributes():
202208
exporter.clear()
203209

204210

211+
def _placeholder_message() -> dict[str, Any]:
212+
return {
213+
"role": "user",
214+
"parts": [{"type": "text", "content": "readacted"}],
215+
}
216+
217+
218+
def test_normalize_messages_skips_empty_when_sensitive_enabled():
219+
processor = GenAISemanticProcessor(metrics_enabled=False)
220+
normalized = processor._normalize_messages_to_role_parts(
221+
[{"role": "user", "content": None}]
222+
)
223+
assert normalized == []
224+
225+
226+
def test_normalize_messages_emits_placeholder_when_sensitive_disabled():
227+
processor = GenAISemanticProcessor(
228+
include_sensitive_data=False, metrics_enabled=False
229+
)
230+
normalized = processor._normalize_messages_to_role_parts(
231+
[{"role": "user", "content": None}]
232+
)
233+
assert normalized == [_placeholder_message()]
234+
235+
236+
def test_agent_content_aggregation_skips_duplicate_snapshots():
237+
processor = GenAISemanticProcessor(metrics_enabled=False)
238+
agent_id = "agent-span"
239+
processor._agent_content[agent_id] = {
240+
"input_messages": [],
241+
"output_messages": [],
242+
"system_instructions": [],
243+
}
244+
245+
payload = ContentPayload(
246+
input_messages=[
247+
{"role": "user", "parts": [{"type": "text", "content": "hello"}]},
248+
{
249+
"role": "user",
250+
"parts": [{"type": "text", "content": "readacted"}],
251+
},
252+
]
253+
)
254+
255+
processor._update_agent_aggregate(
256+
SimpleNamespace(span_id="child-1", parent_id=agent_id, span_data=None),
257+
payload,
258+
)
259+
processor._update_agent_aggregate(
260+
SimpleNamespace(span_id="child-2", parent_id=agent_id, span_data=None),
261+
payload,
262+
)
263+
264+
aggregated = processor._agent_content[agent_id]["input_messages"]
265+
assert aggregated == [
266+
{"role": "user", "parts": [{"type": "text", "content": "hello"}]}
267+
]
268+
# ensure data copied rather than reused to prevent accidental mutation
269+
assert aggregated is not payload.input_messages
270+
271+
272+
def test_agent_content_aggregation_filters_placeholder_append_when_sensitive():
273+
processor = GenAISemanticProcessor(metrics_enabled=False)
274+
agent_id = "agent-span"
275+
processor._agent_content[agent_id] = {
276+
"input_messages": [],
277+
"output_messages": [],
278+
"system_instructions": [],
279+
}
280+
281+
initial_payload = ContentPayload(
282+
input_messages=[
283+
{"role": "user", "parts": [{"type": "text", "content": "hello"}]}
284+
]
285+
)
286+
processor._update_agent_aggregate(
287+
SimpleNamespace(span_id="child-1", parent_id=agent_id, span_data=None),
288+
initial_payload,
289+
)
290+
291+
placeholder_payload = ContentPayload(
292+
input_messages=[_placeholder_message()]
293+
)
294+
processor._update_agent_aggregate(
295+
SimpleNamespace(span_id="child-2", parent_id=agent_id, span_data=None),
296+
placeholder_payload,
297+
)
298+
299+
aggregated = processor._agent_content[agent_id]["input_messages"]
300+
assert aggregated == [
301+
{"role": "user", "parts": [{"type": "text", "content": "hello"}]}
302+
]
303+
304+
305+
def test_agent_content_aggregation_retains_placeholder_when_sensitive_disabled():
306+
processor = GenAISemanticProcessor(
307+
include_sensitive_data=False, metrics_enabled=False
308+
)
309+
agent_id = "agent-span"
310+
processor._agent_content[agent_id] = {
311+
"input_messages": [],
312+
"output_messages": [],
313+
"system_instructions": [],
314+
}
315+
316+
placeholder_payload = ContentPayload(
317+
input_messages=[_placeholder_message()]
318+
)
319+
processor._update_agent_aggregate(
320+
SimpleNamespace(span_id="child-1", parent_id=agent_id, span_data=None),
321+
placeholder_payload,
322+
)
323+
324+
aggregated = processor._agent_content[agent_id]["input_messages"]
325+
assert aggregated == [_placeholder_message()]
326+
327+
328+
def test_agent_content_aggregation_appends_new_messages_once():
329+
processor = GenAISemanticProcessor(metrics_enabled=False)
330+
agent_id = "agent-span"
331+
processor._agent_content[agent_id] = {
332+
"input_messages": [],
333+
"output_messages": [],
334+
"system_instructions": [],
335+
}
336+
337+
initial_payload = ContentPayload(
338+
input_messages=[
339+
{"role": "user", "parts": [{"type": "text", "content": "hello"}]}
340+
]
341+
)
342+
processor._update_agent_aggregate(
343+
SimpleNamespace(span_id="child-1", parent_id=agent_id, span_data=None),
344+
initial_payload,
345+
)
346+
347+
extended_messages = [
348+
{"role": "user", "parts": [{"type": "text", "content": "hello"}]},
349+
{
350+
"role": "assistant",
351+
"parts": [{"type": "text", "content": "hi there"}],
352+
},
353+
]
354+
extended_payload = ContentPayload(input_messages=extended_messages)
355+
processor._update_agent_aggregate(
356+
SimpleNamespace(span_id="child-2", parent_id=agent_id, span_data=None),
357+
extended_payload,
358+
)
359+
360+
aggregated = processor._agent_content[agent_id]["input_messages"]
361+
assert aggregated == extended_messages
362+
assert extended_payload.input_messages == extended_messages
363+
364+
205365
def test_agent_span_collects_child_messages():
206366
instrumentor, exporter = _instrument_with_provider()
207367

0 commit comments

Comments
 (0)