diff --git a/agentops/instrumentation/crewai/instrumentation.py b/agentops/instrumentation/crewai/instrumentation.py index 3a2964733..0ecfb8d06 100644 --- a/agentops/instrumentation/crewai/instrumentation.py +++ b/agentops/instrumentation/crewai/instrumentation.py @@ -15,6 +15,7 @@ from agentops.semconv import SpanAttributes, AgentOpsSpanKindValues, Meters, ToolAttributes, MessageAttributes from .crewai_span_attributes import CrewAISpanAttributes, set_span_attribute + # Initialize logger logger = logging.getLogger(__name__) @@ -410,58 +411,91 @@ def wrap_task_execute( def wrap_llm_call( tracer, duration_histogram, token_histogram, environment, application_name, wrapped, instance, args, kwargs ): - llm = instance.model if hasattr(instance, "model") else "llm" - with tracer.start_as_current_span(f"{llm}.llm", kind=SpanKind.CLIENT, attributes={}) as span: - start_time = time.time() - try: - span.set_attribute(TELEMETRY_SDK_NAME, "agentops") - span.set_attribute(SERVICE_NAME, application_name) - span.set_attribute(DEPLOYMENT_ENVIRONMENT, environment) + try: + llm = instance.model if hasattr(instance, "model") else "llm" + # To get the model provider (e.g. "openai") or the model name (e.g. "gpt-4o-mini") + provider = llm.split("/")[0] if "/" in llm else llm.split("-")[0] + + provider_instrumentor = { + "gpt": "OpenAIInstrumentor", + "openai": "OpenAIInstrumentor", + "claude": "AnthropicInstrumentor", + "anthropic": "AnthropicInstrumentor", + "google": "GoogleGenerativeAIInstrumentor", + "gemini": "GoogleGenerativeAIInstrumentor", + "ibm": "IBMWatsonXInstrumentor", + "watsonx": "IBMWatsonXInstrumentor", + "agents": "OpenAIAgentsInstrumentor", + } + + instrumentor = provider_instrumentor.get(provider.lower()) + + if instrumentor: + logger.debug(f"Skipping instrumentation for CrewAI LLM call for {provider} and using {instrumentor}") + result = wrapped(*args, **kwargs) + return result + else: + logger.debug(f"Instrumenting CrewAI LLM call for provider: {provider}") + with tracer.start_as_current_span(f"{llm}.llm", kind=SpanKind.CLIENT, attributes={}) as span: + start_time = time.time() + try: + span.set_attribute(TELEMETRY_SDK_NAME, "agentops") + span.set_attribute(SERVICE_NAME, application_name) + span.set_attribute(DEPLOYMENT_ENVIRONMENT, environment) - CrewAISpanAttributes(span=span, instance=instance) + CrewAISpanAttributes(span=span, instance=instance) - result = wrapped(*args, **kwargs) + result = wrapped(*args, **kwargs) - # Set prompt attributes from args - if args and isinstance(args[0], list): - for i, message in enumerate(args[0]): - if isinstance(message, dict): - if "role" in message: - span.set_attribute(MessageAttributes.PROMPT_ROLE.format(i=i), message["role"]) - if "content" in message: - span.set_attribute(MessageAttributes.PROMPT_CONTENT.format(i=i), message["content"]) + # Set prompt attributes from args + if args and isinstance(args[0], list): + for i, message in enumerate(args[0]): + if isinstance(message, dict): + if "role" in message: + span.set_attribute(MessageAttributes.PROMPT_ROLE.format(i=i), message["role"]) + if "content" in message: + span.set_attribute(MessageAttributes.PROMPT_CONTENT.format(i=i), message["content"]) + + # Set completion attributes from result + if result: + span.set_attribute(MessageAttributes.COMPLETION_CONTENT.format(i=0), str(result)) + span.set_attribute(MessageAttributes.COMPLETION_ROLE.format(i=0), "assistant") + + # Set token usage attributes from callbacks + if ( + "callbacks" in kwargs + and kwargs["callbacks"] + and hasattr(kwargs["callbacks"][0], "token_cost_process") + ): + token_process = kwargs["callbacks"][0].token_cost_process + if hasattr(token_process, "completion_tokens"): + span.set_attribute( + SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, token_process.completion_tokens + ) + if hasattr(token_process, "prompt_tokens"): + span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, token_process.prompt_tokens) + if hasattr(token_process, "total_tokens"): + span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, token_process.total_tokens) - # Set completion attributes from result - if result: - span.set_attribute(MessageAttributes.COMPLETION_CONTENT.format(i=0), str(result)) - span.set_attribute(MessageAttributes.COMPLETION_ROLE.format(i=0), "assistant") - - # Set token usage attributes from callbacks - if "callbacks" in kwargs and kwargs["callbacks"] and hasattr(kwargs["callbacks"][0], "token_cost_process"): - token_process = kwargs["callbacks"][0].token_cost_process - if hasattr(token_process, "completion_tokens"): - span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, token_process.completion_tokens) - if hasattr(token_process, "prompt_tokens"): - span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, token_process.prompt_tokens) - if hasattr(token_process, "total_tokens"): - span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, token_process.total_tokens) - - if duration_histogram: - duration = time.time() - start_time - duration_histogram.record( - duration, - attributes={ - SpanAttributes.LLM_SYSTEM: "crewai", - SpanAttributes.LLM_RESPONSE_MODEL: str(instance.model), - }, - ) + if duration_histogram: + duration = time.time() - start_time + duration_histogram.record( + duration, + attributes={ + SpanAttributes.LLM_SYSTEM: "crewai", + SpanAttributes.LLM_RESPONSE_MODEL: str(instance.model), + }, + ) - span.set_status(Status(StatusCode.OK)) - return result - except Exception as ex: - span.set_status(Status(StatusCode.ERROR, str(ex))) - logger.error("Error in trace creation: %s", ex) - raise + span.set_status(Status(StatusCode.OK)) + return result + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + logger.error("Error in trace creation: %s", e) + raise e + except Exception as e: + logger.error(f"Error in provider detection: {e}") + raise e def wrap_tool_execution(tracer, duration_histogram, environment, application_name):