Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions util/opentelemetry-util-genai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3798](#3798))
- Record content-type and use canonical paths in fsspec genai uploader
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3795](#3795))
- Make inputs / outputs / system instructions optional params to `on_completion`,
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3802](#3802)).

## Version 0.1b0 (2025-09-25)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@

@dataclass
class Completion:
inputs: list[types.InputMessage]
outputs: list[types.OutputMessage]
system_instruction: list[types.MessagePart]
inputs: list[types.InputMessage] | None
outputs: list[types.OutputMessage] | None
system_instruction: list[types.MessagePart] | None


@dataclass
Expand Down Expand Up @@ -210,10 +210,13 @@ def on_completion(
log_record: LogRecord | None = None,
**kwargs: Any,
) -> None:
if not any([inputs, outputs, system_instruction]):
return
# An empty list will not be uploaded.
completion = Completion(
inputs=inputs,
outputs=outputs,
system_instruction=system_instruction,
inputs=inputs or None,
outputs=outputs or None,
system_instruction=system_instruction or None,
)
# generate the paths to upload to
ref_names = self._calculate_ref_path()
Expand All @@ -225,23 +228,36 @@ def to_dict(
) -> JsonEncodeable:
return [asdict(dc) for dc in dataclass_list]

references = [
(ref_name, ref, ref_attr)
for ref_name, ref, ref_attr in [
(
ref_names.inputs_ref,
completion.inputs,
GEN_AI_INPUT_MESSAGES_REF,
),
(
ref_names.outputs_ref,
completion.outputs,
GEN_AI_OUTPUT_MESSAGES_REF,
),
(
ref_names.system_instruction_ref,
completion.system_instruction,
GEN_AI_SYSTEM_INSTRUCTIONS_REF,
),
]
if ref
]
self._submit_all(
{
# Use partial to defer as much as possible to the background threads
ref_names.inputs_ref: partial(to_dict, completion.inputs),
ref_names.outputs_ref: partial(to_dict, completion.outputs),
ref_names.system_instruction_ref: partial(
to_dict, completion.system_instruction
),
},
ref_name: partial(to_dict, ref)
for ref_name, ref, _ in references
}
)

# stamp the refs on telemetry
references = {
GEN_AI_INPUT_MESSAGES_REF: ref_names.inputs_ref,
GEN_AI_OUTPUT_MESSAGES_REF: ref_names.outputs_ref,
GEN_AI_SYSTEM_INSTRUCTIONS_REF: ref_names.system_instruction_ref,
}
references = {ref_attr: name for name, _, ref_attr in references}
if span:
span.set_attributes(references)
if log_record:
Expand Down
27 changes: 27 additions & 0 deletions util/opentelemetry-util-genai/tests/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,33 @@ def test_upload_then_shutdown(self):
"should have uploaded 3 files",
)

def test_upload_when_inputs_outputs_empty(self):
record = LogRecord()
self.hook.on_completion(
inputs=[],
outputs=[],
system_instruction=FAKE_SYSTEM_INSTRUCTION,
log_record=record,
)
# all items should be consumed
self.hook.shutdown()

self.assertEqual(
self.mock_fs.open.call_count,
1,
"should have uploaded 1 file",
)
assert record.attributes is not None
for ref_key in [
"gen_ai.input.messages_ref",
"gen_ai.output.messages_ref",
"gen_ai.system_instructions_ref",
]:
if ref_key == "gen_ai.system_instructions_ref":
self.assertIn(ref_key, record.attributes)
else:
self.assertNotIn(ref_key, record.attributes)

def test_upload_blocked(self):
with self.block_upload():
# fill the queue
Expand Down