diff --git a/src/langtrace_python_sdk/instrumentation/agno/patch.py b/src/langtrace_python_sdk/instrumentation/agno/patch.py index b5c93357..66af5d44 100644 --- a/src/langtrace_python_sdk/instrumentation/agno/patch.py +++ b/src/langtrace_python_sdk/instrumentation/agno/patch.py @@ -15,26 +15,46 @@ from langtrace_python_sdk.utils.llm import get_span_name, set_span_attributes from langtrace_python_sdk.utils.misc import serialize_args, serialize_kwargs +def _safe_serialize(obj): + """Safely serialize objects that might not be JSON serializable""" + if hasattr(obj, 'to_dict'): + return obj.to_dict() + elif hasattr(obj, '__dict__'): + return {k: _safe_serialize(v) for k, v in obj.__dict__.items() if not k.startswith('_')} + elif isinstance(obj, dict): + return {k: _safe_serialize(v) for k, v in obj.items()} + elif isinstance(obj, (list, tuple)): + return [_safe_serialize(i) for i in obj] + return str(obj) + +def _safe_json_dumps(obj): + """Safely dump an object to JSON, handling non-serializable types""" + try: + return json.dumps(obj) + except (TypeError, ValueError): + return json.dumps(_safe_serialize(obj)) + def _extract_metrics(metrics: Dict[str, Any]) -> Dict[str, Any]: """Helper function to extract and format metrics""" - formatted_metrics = {} + if not metrics: + return {} + + if hasattr(metrics, 'to_dict'): + metrics = metrics.to_dict() + elif hasattr(metrics, '__dict__'): + metrics = {k: v for k, v in metrics.__dict__.items() if not k.startswith('_')} - # Extract basic metrics + formatted_metrics = {} + for key in ['time', 'time_to_first_token', 'input_tokens', 'output_tokens', - 'prompt_tokens', 'completion_tokens', 'total_tokens']: + 'prompt_tokens', 'completion_tokens', 'total_tokens', + 'prompt_tokens_details', 'completion_tokens_details', 'tool_call_times']: if key in metrics: formatted_metrics[key] = metrics[key] - - # Extract nested metric details if present - if 'prompt_tokens_details' in metrics: - formatted_metrics['prompt_tokens_details'] = metrics['prompt_tokens_details'] - if 'completion_tokens_details' in metrics: - formatted_metrics['completion_tokens_details'] = metrics['completion_tokens_details'] - if 'tool_call_times' in metrics: - formatted_metrics['tool_call_times'] = metrics['tool_call_times'] - + return formatted_metrics + def patch_memory(operation_name, version, tracer: Tracer): def traced_method(wrapped, instance, args, kwargs): service_provider = SERVICE_PROVIDERS["AGNO"] @@ -110,86 +130,120 @@ def traced_method(wrapped, instance, args, kwargs): try: set_span_attributes(span, attributes) AgnoSpanAttributes(span=span, instance=instance) - result_generator = wrapped(*args, **kwargs) - - accumulated_content = "" - current_tool_call = None - response_metadata = None - seen_tool_calls = set() - - try: - for response in result_generator: - if not hasattr(response, 'to_dict'): - yield response - continue - - if not response_metadata: - response_metadata = { - "run_id": response.run_id, - "agent_id": response.agent_id, - "session_id": response.session_id, - "model": response.model, - "content_type": response.content_type, - } - for key, value in response_metadata.items(): - if value is not None: - set_span_attribute(span, f"agno.agent.{key}", str(value)) - - if response.content: - accumulated_content += response.content - set_span_attribute(span, "agno.agent.response", accumulated_content) - - if response.messages: - for msg in response.messages: - if msg.tool_calls: - for tool_call in msg.tool_calls: - tool_id = tool_call.get('id') - if tool_id and tool_id not in seen_tool_calls: - seen_tool_calls.add(tool_id) - tool_info = { - 'id': tool_id, - 'name': tool_call.get('function', {}).get('name'), - 'arguments': tool_call.get('function', {}).get('arguments'), - 'start_time': msg.created_at, - } - current_tool_call = tool_info - set_span_attribute(span, f"agno.agent.tool_call.{tool_id}", json.dumps(tool_info)) - - if msg.metrics: - metrics = _extract_metrics(msg.metrics) - role_prefix = f"agno.agent.metrics.{msg.role}" - for key, value in metrics.items(): - set_span_attribute(span, f"{role_prefix}.{key}", str(value)) - - if response.tools: - for tool in response.tools: - tool_id = tool.get('tool_call_id') - if tool_id and current_tool_call and current_tool_call['id'] == tool_id: - tool_result = { - **current_tool_call, - 'result': tool.get('content'), - 'error': tool.get('tool_call_error'), - 'end_time': tool.get('created_at'), - 'metrics': tool.get('metrics'), - } - set_span_attribute(span, f"agno.agent.tool_call.{tool_id}", json.dumps(tool_result)) - current_tool_call = None - - yield response - - except Exception as err: - span.record_exception(err) - span.set_status(Status(StatusCode.ERROR, str(err))) - raise - finally: - span.set_status(Status(StatusCode.OK)) - if len(seen_tool_calls) > 0: - span.set_attribute("agno.agent.total_tool_calls", len(seen_tool_calls)) + is_streaming = kwargs.get('stream', False) + result = wrapped(*args, **kwargs) + + if not is_streaming and not operation_name.startswith('Agent._'): + if hasattr(result, 'to_dict'): + _process_response(span, result) + return result + + # Handle streaming (generator) case + return _process_generator(span, result) + except Exception as err: span.record_exception(err) span.set_status(Status(StatusCode.ERROR, str(err))) raise + # Helper function to process a generator + def _process_generator(span, result_generator): + accumulated_content = "" + current_tool_call = None + response_metadata = None + seen_tool_calls = set() + + try: + for response in result_generator: + if not hasattr(response, 'to_dict'): + yield response + continue + + _process_response(span, response, + accumulated_content=accumulated_content, + current_tool_call=current_tool_call, + response_metadata=response_metadata, + seen_tool_calls=seen_tool_calls) + + if response.content: + accumulated_content += response.content + + yield response + + except Exception as err: + span.record_exception(err) + span.set_status(Status(StatusCode.ERROR, str(err))) + raise + finally: + span.set_status(Status(StatusCode.OK)) + if len(seen_tool_calls) > 0: + span.set_attribute("agno.agent.total_tool_calls", len(seen_tool_calls)) + + def _process_response(span, response, accumulated_content="", current_tool_call=None, + response_metadata=None, seen_tool_calls=set()): + if not response_metadata: + response_metadata = { + "run_id": response.run_id, + "agent_id": response.agent_id, + "session_id": response.session_id, + "model": response.model, + "content_type": response.content_type, + } + for key, value in response_metadata.items(): + if value is not None: + set_span_attribute(span, f"agno.agent.{key}", str(value)) + + if response.content: + if accumulated_content: + accumulated_content += response.content + else: + accumulated_content = response.content + set_span_attribute(span, "agno.agent.response", accumulated_content) + + if response.messages: + for msg in response.messages: + if msg.tool_calls: + for tool_call in msg.tool_calls: + tool_id = tool_call.get('id') + if tool_id and tool_id not in seen_tool_calls: + seen_tool_calls.add(tool_id) + tool_info = { + 'id': tool_id, + 'name': tool_call.get('function', {}).get('name'), + 'arguments': tool_call.get('function', {}).get('arguments'), + 'start_time': msg.created_at, + } + current_tool_call = tool_info + set_span_attribute(span, f"agno.agent.tool_call.{tool_id}", _safe_json_dumps(tool_info)) + + if msg.metrics: + metrics = _extract_metrics(msg.metrics) + role_prefix = f"agno.agent.metrics.{msg.role}" + for key, value in metrics.items(): + set_span_attribute(span, f"{role_prefix}.{key}", str(value)) + + if response.tools: + for tool in response.tools: + tool_id = tool.get('tool_call_id') + if tool_id and current_tool_call and current_tool_call['id'] == tool_id: + tool_result = { + **current_tool_call, + 'result': tool.get('content'), + 'error': tool.get('tool_call_error'), + 'end_time': tool.get('created_at'), + 'metrics': tool.get('metrics'), + } + set_span_attribute(span, f"agno.agent.tool_call.{tool_id}", _safe_json_dumps(tool_result)) + current_tool_call = None + + if response.metrics: + metrics = _extract_metrics(response.metrics) + for key, value in metrics.items(): + set_span_attribute(span, f"agno.agent.metrics.{key}", str(value)) + + if len(seen_tool_calls) > 0: + span.set_attribute("agno.agent.total_tool_calls", len(seen_tool_calls)) + return traced_method class AgnoSpanAttributes: @@ -238,7 +292,7 @@ def run(self): if hasattr(self.instance.model, 'metrics') and self.instance.model.metrics: metrics = _extract_metrics(self.instance.model.metrics) - set_span_attribute(self.span, "agno.agent.model.metrics", json.dumps(metrics)) + set_span_attribute(self.span, "agno.agent.model.metrics", _safe_json_dumps(metrics)) if self.instance.tools: tool_list = [] diff --git a/src/langtrace_python_sdk/version.py b/src/langtrace_python_sdk/version.py index e4e78c0b..2ae7a963 100644 --- a/src/langtrace_python_sdk/version.py +++ b/src/langtrace_python_sdk/version.py @@ -1 +1 @@ -__version__ = "3.8.1" +__version__ = "3.8.2"