Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
default_stages: [pre-commit]

repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.11.6
hooks:
# - id: ruff # Disabled - too strict for optional library handling
- id: ruff-format
# - repo: https://github.com/astral-sh/ruff-pre-commit
# rev: v0.11.6
# hooks:
# # - id: ruff # Disabled - too strict for optional library handling
# - id: ruff-format
Comment on lines +4 to +8
Copy link

Copilot AI Nov 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ruff formatter has been commented out. This removes automatic code formatting from the pre-commit hooks, which can lead to inconsistent code style. If there are specific issues with ruff formatting that need to be addressed, consider configuring ruff to skip problematic rules rather than disabling formatting entirely.

Suggested change
# - repo: https://github.com/astral-sh/ruff-pre-commit
# rev: v0.11.6
# hooks:
# # - id: ruff # Disabled - too strict for optional library handling
# - id: ruff-format
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.11.6
hooks:
# - id: ruff # Disabled - too strict for optional library handling
- id: ruff-format

Copilot uses AI. Check for mistakes.
- repo: https://github.com/astral-sh/uv-pre-commit
rev: 0.6.14
hooks:
Expand Down
262 changes: 151 additions & 111 deletions client/joinly_client/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import contextlib
import json
import logging
import time
from dataclasses import replace
from typing import Any, Self

Expand All @@ -21,6 +22,13 @@
from pydantic_ai.settings import ModelSettings, merge_model_settings
from pydantic_ai.tools import ToolDefinition

from joinly_client.datadog import (
increment_metric,
track_agent,
track_llm,
track_tool,
track_workflow,
)
from joinly_client.types import ToolExecutor, TranscriptSegment, Usage
from joinly_client.utils import get_prompt

Expand Down Expand Up @@ -116,38 +124,95 @@ async def _run_loop(self, segments: list[TranscriptSegment]) -> None:
segments (list[TranscriptSegment]): The segments of the transcript to
process.
"""
self._messages.append(
ModelRequest(
parts=[
UserPromptPart(
f"{segment.speaker or 'Participant'}: {segment.text}"
)
for segment in segments
]
)
# Build user input from segments
user_input = " | ".join(
f"{s.speaker or 'Participant'}: {s.text}" for s in segments
)

iteration: int = 0
self._messages = self._truncate_tool_results(
self._messages, max_chars=self._max_tool_result_chars
)
self._messages = self._omit_binary_tool_results(self._messages)
while self._max_agent_iter is None or iteration < self._max_agent_iter:
self._messages = self._limit_messages(
self._messages, max_messages=self._max_messages
# Track the entire agent conversation as a workflow
session_id = f"utterance-{time.time_ns()}"
with track_workflow(
"agent.conversation",
session_id=session_id,
metadata={
"segment_count": len(segments),
"model": self._llm.model_name,
"provider": self._llm.system,
},
) as workflow_ctx:
workflow_ctx["input_data"] = user_input

self._messages.append(
ModelRequest(
parts=[
UserPromptPart(
f"{segment.speaker or 'Participant'}: {segment.text}"
)
for segment in segments
]
)
)

iteration: int = 0
total_input_tokens = 0
total_output_tokens = 0
tools_called: list[str] = []

self._messages = self._truncate_tool_results(
self._messages, max_chars=self._max_ephemeral_tool_result_chars
self._messages, max_chars=self._max_tool_result_chars
)
self._messages = self._omit_binary_tool_results(self._messages)

while self._max_agent_iter is None or iteration < self._max_agent_iter:
self._messages = self._limit_messages(
self._messages, max_messages=self._max_messages
)
self._messages = self._truncate_tool_results(
self._messages, max_chars=self._max_ephemeral_tool_result_chars
)

# Track each iteration as an agent span
with track_agent(
f"iteration.{iteration}",
session_id=session_id,
metadata={"iteration": iteration},
) as agent_ctx:
response = await self._call_llm(self._messages)
request = await self._call_tools(response)

# Collect metrics
total_input_tokens += response.usage.request_tokens or 0
total_output_tokens += response.usage.response_tokens or 0

tool_calls = [
p for p in response.parts if isinstance(p, ToolCallPart)
]
tools_called.extend(tc.tool_name for tc in tool_calls)

agent_ctx["metadata"]["tool_calls"] = len(tool_calls)
agent_ctx["metadata"]["input_tokens"] = (
response.usage.request_tokens or 0
)
agent_ctx["metadata"]["output_tokens"] = (
response.usage.response_tokens or 0
)

self._messages.append(response)
if request:
self._messages.append(request)
if self._check_end_turn(response, request):
break
iteration += 1

response = await self._call_llm(self._messages)
request = await self._call_tools(response)
self._messages.append(response)
if request:
self._messages.append(request)
if self._check_end_turn(response, request):
break
iteration += 1
# Update workflow metadata
workflow_ctx["metadata"]["iterations"] = iteration + 1
workflow_ctx["metadata"]["total_input_tokens"] = total_input_tokens
workflow_ctx["metadata"]["total_output_tokens"] = total_output_tokens
workflow_ctx["metadata"]["tools_called"] = tools_called
workflow_ctx["metadata"]["unique_tools"] = list(set(tools_called))

# Increment conversation metric
increment_metric("agent.conversations")

def _format_llmobs_input(
self, messages: list[ModelMessage]
Expand All @@ -166,58 +231,6 @@ def _format_llmobs_input(
result.append({"role": "user", "content": content})
return result

def _annotate_llmobs(
self,
llmobs_span: Any, # noqa: ANN401 - ddtrace context manager type
input_messages: list[dict[str, str]],
response: ModelResponse | None = None,
error: Exception | None = None,
) -> None:
"""Annotate LLMObs span with input/output data."""
try:
from ddtrace.llmobs import LLMObs

if error:
LLMObs.annotate(
input_data=input_messages,
metadata={
"error": True,
"error.type": type(error).__name__,
"error.message": str(error),
},
)
llmobs_span.__exit__(type(error), error, error.__traceback__)
elif response:
response_texts = [
content
for p in response.parts
if isinstance(content := getattr(p, "content", None), str)
]
input_tokens = response.usage.request_tokens or 0
output_tokens = response.usage.response_tokens or 0
LLMObs.annotate(
input_data=input_messages,
output_data=[
{"role": "assistant", "content": t} for t in response_texts
]
or None,
metrics={
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens,
},
metadata={
"temperature": 0.2,
"model": self._llm.model_name,
"provider": self._llm.system,
},
)
llmobs_span.__exit__(None, None, None)
except Exception as e: # noqa: BLE001 - ddtrace can raise various errors
logger.debug("Failed to annotate LLMObs: %s", e)
with contextlib.suppress(Exception):
llmobs_span.__exit__(None, None, None)

async def _call_llm(self, messages: list[ModelMessage]) -> ModelResponse:
"""Call the LLM with the current messages.

Expand All @@ -232,24 +245,13 @@ async def _call_llm(self, messages: list[ModelMessage]) -> ModelResponse:
# Format input for LLM Observability
llmobs_input = self._format_llmobs_input(messages)

# Try to use LLMObs for proper LLM Observability
llmobs_span = None
llmobs_span_exited = False
try:
from ddtrace.llmobs import LLMObs

llmobs_span = LLMObs.llm(
model_name=self._llm.model_name,
model_provider=self._llm.system,
name="chat",
)
llmobs_span.__enter__()
except ImportError:
pass
except Exception as e: # noqa: BLE001 - ddtrace is optional
logger.debug("LLMObs not available: %s", e)
with track_llm(
model_name=self._llm.model_name,
model_provider=self._llm.system,
metadata={"temperature": 0.2, "message_count": len(messages)},
) as llm_ctx:
llm_ctx["input_data"] = llmobs_input

try:
response = await model_request(
self._llm,
[ModelRequest(parts=[SystemPromptPart(self._prompt)]), *messages],
Expand All @@ -270,11 +272,29 @@ async def _call_llm(self, messages: list[ModelMessage]) -> ModelResponse:
allow_text_output=self._llm.model_name.startswith("gpt-5"),
),
)
except Exception as e:
if llmobs_span:
self._annotate_llmobs(llmobs_span, llmobs_input, error=e)
llmobs_span_exited = True
raise

# Extract response content
response_texts = [
content
for p in response.parts
if isinstance(content := getattr(p, "content", None), str)
]

# Set LLMObs output and metrics
llm_ctx["output_data"] = [
{"role": "assistant", "content": t} for t in response_texts
] or None
llm_ctx["metrics"] = {
"input_tokens": response.usage.request_tokens or 0,
"output_tokens": response.usage.response_tokens or 0,
"total_tokens": (response.usage.request_tokens or 0)
+ (response.usage.response_tokens or 0),
}

# Log tool calls if any
tool_calls = [p for p in response.parts if isinstance(p, ToolCallPart)]
if tool_calls:
llm_ctx["metadata"]["tool_calls"] = [tc.tool_name for tc in tool_calls]

logger.debug(
"LLM response received with %d parts, %d input tokens and %d output tokens",
Expand All @@ -283,9 +303,6 @@ async def _call_llm(self, messages: list[ModelMessage]) -> ModelResponse:
response.usage.response_tokens or 0,
)

if llmobs_span and not llmobs_span_exited:
self._annotate_llmobs(llmobs_span, llmobs_input, response=response)

self._usage.add(
"llm",
usage={
Expand Down Expand Up @@ -341,22 +358,45 @@ async def _call_tool(
None,
)

tool_args = tool_call.args_as_dict()
logger.info(
"%s: %s",
tool_call.tool_name,
", ".join(
f'{k}="{v}"' if isinstance(v, str) else f"{k}={v}"
for k, v in tool_call.args_as_dict().items()
for k, v in tool_args.items()
),
)

try:
content = await self._tool_executor(
tool_call.tool_name, tool_call.args_as_dict()
)
except Exception:
logger.exception("Error calling tool %s", tool_call.tool_name)
content = f"Error calling tool {tool_call.tool_name}"
# Track the tool execution
with track_tool(
tool_call.tool_name,
arguments=tool_args,
metadata={"tool_call_id": tool_call.tool_call_id},
) as tool_ctx:
try:
content = await self._tool_executor(tool_call.tool_name, tool_args)
tool_ctx["metadata"]["success"] = True
except Exception:
logger.exception("Error calling tool %s", tool_call.tool_name)
content = f"Error calling tool {tool_call.tool_name}"
tool_ctx["metadata"]["success"] = False
tool_ctx["metadata"]["error"] = True

# Set output data for tracking
if isinstance(content, BinaryContent):
tool_ctx["output_data"] = (
f"BinaryContent({content.media_type}, {len(content.data)} bytes)"
)
tool_ctx["metadata"]["has_binary"] = True
elif isinstance(content, list):
tool_ctx["output_data"] = f"List with {len(content)} items"
else:
# Truncate long outputs for tracking
output_str = str(content)
tool_ctx["output_data"] = (
output_str[:500] if len(output_str) > 500 else output_str
)

logger.info(
"%s: %s",
Expand Down
Loading
Loading