diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index b9cdcb226..6f75cc65c 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -63,7 +63,7 @@ def get_instance(self) -> BaseInstrumentor: provider_import_name="anthropic", ), InstrumentorLoader( - module_name="opentelemetry.instrumentation.crewai", + module_name="agentops.instrumentation.crewai", class_name="CrewAIInstrumentor", provider_import_name="crewai", ), diff --git a/third_party/opentelemetry/instrumentation/crewai/LICENSE b/agentops/instrumentation/crewai/LICENSE similarity index 99% rename from third_party/opentelemetry/instrumentation/crewai/LICENSE rename to agentops/instrumentation/crewai/LICENSE index 0f2a333f0..6bfeb3c2c 100644 --- a/third_party/opentelemetry/instrumentation/crewai/LICENSE +++ b/agentops/instrumentation/crewai/LICENSE @@ -198,4 +198,4 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and - limitations under the License. + limitations under the License. \ No newline at end of file diff --git a/third_party/opentelemetry/instrumentation/crewai/NOTICE.md b/agentops/instrumentation/crewai/NOTICE.md similarity index 82% rename from third_party/opentelemetry/instrumentation/crewai/NOTICE.md rename to agentops/instrumentation/crewai/NOTICE.md index ca711b794..ff234ba5c 100644 --- a/third_party/opentelemetry/instrumentation/crewai/NOTICE.md +++ b/agentops/instrumentation/crewai/NOTICE.md @@ -6,3 +6,5 @@ Copyright notice from the original project: Copyright (c) Traceloop (https://traceloop.com) The Apache 2.0 license can be found in the LICENSE file in this directory. + +This code has been modified and adapted for use in the AgentOps project. \ No newline at end of file diff --git a/agentops/instrumentation/crewai/__init__.py b/agentops/instrumentation/crewai/__init__.py new file mode 100644 index 000000000..a5f1a5a99 --- /dev/null +++ b/agentops/instrumentation/crewai/__init__.py @@ -0,0 +1,6 @@ +"""OpenTelemetry CrewAI instrumentation""" + +from agentops.instrumentation.crewai.version import __version__ +from agentops.instrumentation.crewai.instrumentation import CrewAIInstrumentor + +__all__ = ["CrewAIInstrumentor", "__version__"] diff --git a/agentops/instrumentation/crewai/crewai_span_attributes.py b/agentops/instrumentation/crewai/crewai_span_attributes.py new file mode 100644 index 000000000..d1fb83264 --- /dev/null +++ b/agentops/instrumentation/crewai/crewai_span_attributes.py @@ -0,0 +1,330 @@ +"""OpenTelemetry instrumentation for CrewAI.""" + +import json +import logging +from typing import Any +from opentelemetry.trace import Span + +from agentops.semconv.span_attributes import SpanAttributes +from agentops.semconv.agent import AgentAttributes +from agentops.semconv.tool import ToolAttributes +from agentops.semconv.message import MessageAttributes + +# Initialize logger for logging potential issues and operations +logger = logging.getLogger(__name__) + +def _parse_tools(tools): + """Parse tools into a JSON string with name and description.""" + result = [] + for tool in tools: + res = {} + if hasattr(tool, "name") and tool.name is not None: + res["name"] = tool.name + if hasattr(tool, "description") and tool.description is not None: + res["description"] = tool.description + if res: + result.append(res) + return result + +def set_span_attribute(span: Span, key: str, value: Any) -> None: + """Set a single attribute on a span.""" + if value is not None and value != "": + if hasattr(value, "__str__"): + value = str(value) + span.set_attribute(key, value) + + +class CrewAISpanAttributes: + """Manages span attributes for CrewAI instrumentation.""" + + def __init__(self, span: Span, instance, skip_agent_processing=False) -> None: + self.span = span + self.instance = instance + self.skip_agent_processing = skip_agent_processing + self.process_instance() + + def process_instance(self): + """Process the instance based on its type.""" + instance_type = self.instance.__class__.__name__ + self._set_attribute(SpanAttributes.LLM_SYSTEM, "crewai") + self._set_attribute(SpanAttributes.AGENTOPS_ENTITY_NAME, instance_type) + + method_mapping = { + "Crew": self._process_crew, + "Agent": self._process_agent, + "Task": self._process_task, + "LLM": self._process_llm, + } + method = method_mapping.get(instance_type) + if method: + method() + + def _process_crew(self): + """Process a Crew instance.""" + crew_id = getattr(self.instance, "id", "") + self._set_attribute("crewai.crew.id", str(crew_id)) + self._set_attribute("crewai.crew.type", "crewai.crew") + self._set_attribute(SpanAttributes.AGENTOPS_SPAN_KIND, "workflow") + + logger.debug(f"CrewAI: Processing crew with id {crew_id}") + + for key, value in self.instance.__dict__.items(): + if value is None: + continue + + if key == "tasks": + if isinstance(value, list): + self._set_attribute("crewai.crew.max_turns", str(len(value))) + logger.debug(f"CrewAI: Found {len(value)} tasks") + elif key == "agents": + if isinstance(value, list): + logger.debug(f"CrewAI: Found {len(value)} agents in crew") + + if not self.skip_agent_processing: + self._parse_agents(value) + elif key == "llms": + self._parse_llms(value) + elif key == "result": + self._set_attribute("crewai.crew.final_output", str(value)) + self._set_attribute("crewai.crew.output", str(value)) + self._set_attribute(SpanAttributes.AGENTOPS_ENTITY_OUTPUT, str(value)) + else: + self._set_attribute(f"crewai.crew.{key}", str(value)) + + def _process_agent(self): + """Process an Agent instance.""" + agent = {} + self._set_attribute(SpanAttributes.AGENTOPS_SPAN_KIND, "agent") + + for key, value in self.instance.__dict__.items(): + if key == "tools": + parsed_tools = _parse_tools(value) + for i, tool in enumerate(parsed_tools): + tool_prefix = f"crewai.agent.tool.{i}." + for tool_key, tool_value in tool.items(): + self._set_attribute(f"{tool_prefix}{tool_key}", str(tool_value)) + + agent[key] = json.dumps(parsed_tools) + + if value is None: + continue + + if key != "tools": + agent[key] = str(value) + + self._set_attribute(AgentAttributes.AGENT_ID, agent.get('id', '')) + self._set_attribute(AgentAttributes.AGENT_ROLE, agent.get('role', '')) + self._set_attribute(AgentAttributes.AGENT_NAME, agent.get('name', '')) + self._set_attribute(AgentAttributes.AGENT_TOOLS, agent.get('tools', '')) + + if 'reasoning' in agent: + self._set_attribute(AgentAttributes.AGENT_REASONING, agent.get('reasoning', '')) + + if 'goal' in agent: + self._set_attribute(SpanAttributes.AGENTOPS_ENTITY_INPUT, agent.get('goal', '')) + + self._set_attribute("crewai.agent.goal", agent.get('goal', '')) + self._set_attribute("crewai.agent.backstory", agent.get('backstory', '')) + self._set_attribute("crewai.agent.cache", agent.get('cache', '')) + self._set_attribute("crewai.agent.allow_delegation", agent.get('allow_delegation', '')) + self._set_attribute("crewai.agent.allow_code_execution", agent.get('allow_code_execution', '')) + self._set_attribute("crewai.agent.max_retry_limit", agent.get('max_retry_limit', '')) + + if hasattr(self.instance, "llm") and self.instance.llm is not None: + model_name = getattr(self.instance.llm, "model", None) or getattr(self.instance.llm, "model_name", None) or "" + temp = getattr(self.instance.llm, "temperature", None) + max_tokens = getattr(self.instance.llm, "max_tokens", None) + top_p = getattr(self.instance.llm, "top_p", None) + + self._set_attribute(SpanAttributes.LLM_REQUEST_MODEL, model_name) + if temp is not None: + self._set_attribute(SpanAttributes.LLM_REQUEST_TEMPERATURE, str(temp)) + if max_tokens is not None: + self._set_attribute(SpanAttributes.LLM_REQUEST_MAX_TOKENS, str(max_tokens)) + if top_p is not None: + self._set_attribute(SpanAttributes.LLM_REQUEST_TOP_P, str(top_p)) + + self._set_attribute("crewai.agent.llm", str(model_name)) + self._set_attribute(AgentAttributes.AGENT_MODELS, str(model_name)) + + def _process_task(self): + """Process a Task instance.""" + task = {} + self._set_attribute(SpanAttributes.AGENTOPS_SPAN_KIND, "workflow.step") + + for key, value in self.instance.__dict__.items(): + if value is None: + continue + if key == "tools": + parsed_tools = _parse_tools(value) + for i, tool in enumerate(parsed_tools): + tool_prefix = f"crewai.task.tool.{i}." + for tool_key, tool_value in tool.items(): + self._set_attribute(f"{tool_prefix}{tool_key}", str(tool_value)) + + task[key] = json.dumps(parsed_tools) + + elif key == "agent": + task[key] = value.role if value else None + if value: + agent_id = getattr(value, "id", "") + self._set_attribute(AgentAttributes.FROM_AGENT, str(agent_id)) + else: + task[key] = str(value) + + self._set_attribute("crewai.task.name", task.get('description', '')) + self._set_attribute("crewai.task.type", "task") + self._set_attribute("crewai.task.input", task.get('context', '')) + self._set_attribute("crewai.task.expected_output", task.get('expected_output', '')) + + if 'description' in task: + self._set_attribute(SpanAttributes.AGENTOPS_ENTITY_INPUT, task.get('description', '')) + if 'output' in task: + self._set_attribute(SpanAttributes.AGENTOPS_ENTITY_OUTPUT, task.get('output', '')) + self._set_attribute("crewai.task.output", task.get('output', '')) + + if 'id' in task: + self._set_attribute("crewai.task.id", str(task.get('id', ''))) + + if 'status' in task: + self._set_attribute("crewai.task.status", task.get('status', '')) + + self._set_attribute("crewai.task.agent", task.get('agent', '')) + self._set_attribute("crewai.task.human_input", task.get('human_input', '')) + self._set_attribute("crewai.task.processed_by_agents", str(task.get('processed_by_agents', ''))) + + if 'tools' in task and task['tools']: + try: + tools = json.loads(task['tools']) + for i, tool in enumerate(tools): + self._set_attribute(MessageAttributes.TOOL_CALL_NAME.format(i=i), tool.get("name", "")) + self._set_attribute(MessageAttributes.TOOL_CALL_DESCRIPTION.format(i=i), tool.get("description", "")) + except (json.JSONDecodeError, TypeError): + logger.warning(f"Failed to parse tools for task: {task.get('id', 'unknown')}") + + def _process_llm(self): + """Process an LLM instance.""" + llm = {} + self._set_attribute(SpanAttributes.AGENTOPS_SPAN_KIND, "llm") + + for key, value in self.instance.__dict__.items(): + if value is None: + continue + llm[key] = str(value) + + model_name = llm.get('model_name', '') or llm.get('model', '') + self._set_attribute(SpanAttributes.LLM_REQUEST_MODEL, model_name) + self._set_attribute(SpanAttributes.LLM_REQUEST_TEMPERATURE, llm.get('temperature', '')) + self._set_attribute(SpanAttributes.LLM_REQUEST_MAX_TOKENS, llm.get('max_tokens', '')) + self._set_attribute(SpanAttributes.LLM_REQUEST_TOP_P, llm.get('top_p', '')) + + if 'frequency_penalty' in llm: + self._set_attribute(SpanAttributes.LLM_REQUEST_FREQUENCY_PENALTY, llm.get('frequency_penalty', '')) + if 'presence_penalty' in llm: + self._set_attribute(SpanAttributes.LLM_REQUEST_PRESENCE_PENALTY, llm.get('presence_penalty', '')) + if 'streaming' in llm: + self._set_attribute(SpanAttributes.LLM_REQUEST_STREAMING, llm.get('streaming', '')) + + if 'api_key' in llm: + self._set_attribute("gen_ai.request.api_key_present", "true") + + if 'base_url' in llm: + self._set_attribute(SpanAttributes.LLM_OPENAI_API_BASE, llm.get('base_url', '')) + + if 'api_version' in llm: + self._set_attribute(SpanAttributes.LLM_OPENAI_API_VERSION, llm.get('api_version', '')) + + def _parse_agents(self, agents): + """Parse agents into a list of dictionaries.""" + if not agents: + logger.debug("CrewAI: No agents to parse") + return + + agent_count = len(agents) + logger.debug(f"CrewAI: Parsing {agent_count} agents") + + # Pre-process all agents to collect their data first + agent_data_list = [] + + for idx, agent in enumerate(agents): + if agent is None: + logger.debug(f"CrewAI: Agent at index {idx} is None, skipping") + agent_data_list.append(None) + continue + + logger.debug(f"CrewAI: Processing agent at index {idx}") + try: + agent_data = self._extract_agent_data(agent) + agent_data_list.append(agent_data) + except Exception as e: + logger.error(f"CrewAI: Error extracting data for agent at index {idx}: {str(e)}") + agent_data_list.append(None) + + # Now set all attributes at once for each agent + for idx, agent_data in enumerate(agent_data_list): + if agent_data is None: + continue + + for key, value in agent_data.items(): + if key == "tools" and isinstance(value, list): + for tool_idx, tool in enumerate(value): + for tool_key, tool_value in tool.items(): + self._set_attribute(f"crewai.agents.{idx}.tools.{tool_idx}.{tool_key}", str(tool_value)) + else: + self._set_attribute(f"crewai.agents.{idx}.{key}", value) + + def _parse_llms(self, llms): + """Parse LLMs into a list of dictionaries.""" + for idx, llm in enumerate(llms): + if llm is not None: + model_name = getattr(llm, "model", None) or getattr(llm, "model_name", None) or "" + llm_data = { + "model": model_name, + "temperature": llm.temperature, + "max_tokens": llm.max_tokens, + "max_completion_tokens": llm.max_completion_tokens, + "top_p": llm.top_p, + "n": llm.n, + "seed": llm.seed, + "base_url": llm.base_url, + "api_version": llm.api_version, + } + + self._set_attribute(f"{SpanAttributes.LLM_REQUEST_MODEL}.{idx}", model_name) + if hasattr(llm, "temperature"): + self._set_attribute(f"{SpanAttributes.LLM_REQUEST_TEMPERATURE}.{idx}", str(llm.temperature)) + if hasattr(llm, "max_tokens"): + self._set_attribute(f"{SpanAttributes.LLM_REQUEST_MAX_TOKENS}.{idx}", str(llm.max_tokens)) + if hasattr(llm, "top_p"): + self._set_attribute(f"{SpanAttributes.LLM_REQUEST_TOP_P}.{idx}", str(llm.top_p)) + + for key, value in llm_data.items(): + if value is not None: + self._set_attribute(f"crewai.llms.{idx}.{key}", str(value)) + + def _extract_agent_data(self, agent): + """Extract data from an agent.""" + model = getattr(agent.llm, "model", None) or getattr(agent.llm, "model_name", None) or "" + + tools_list = [] + if hasattr(agent, "tools") and agent.tools: + tools_list = _parse_tools(agent.tools) + + return { + "id": str(agent.id), + "role": agent.role, + "goal": agent.goal, + "backstory": agent.backstory, + "cache": agent.cache, + "config": agent.config, + "verbose": agent.verbose, + "allow_delegation": agent.allow_delegation, + "tools": tools_list, + "max_iter": agent.max_iter, + "llm": str(model), + } + + def _set_attribute(self, key, value): + """Set an attribute on the span.""" + if value is not None and value != "": + set_span_attribute(self.span, key, value) diff --git a/agentops/instrumentation/crewai/instrumentation.py b/agentops/instrumentation/crewai/instrumentation.py new file mode 100644 index 000000000..b1e2bdcde --- /dev/null +++ b/agentops/instrumentation/crewai/instrumentation.py @@ -0,0 +1,543 @@ +import os +import time +import logging +from typing import Collection, Dict, List, Any +from contextlib import contextmanager + +from wrapt import wrap_function_wrapper +from opentelemetry.trace import SpanKind, get_tracer, Tracer, get_current_span +from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.metrics import Histogram, Meter, get_meter +from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.sdk.resources import SERVICE_NAME, TELEMETRY_SDK_NAME, DEPLOYMENT_ENVIRONMENT +from agentops.instrumentation.crewai.version import __version__ +from agentops.semconv import SpanAttributes, AgentOpsSpanKindValues, Meters, ToolAttributes +from .crewai_span_attributes import CrewAISpanAttributes, set_span_attribute + +# Initialize logger +logger = logging.getLogger(__name__) + +_instruments = ("crewai >= 0.70.0",) + +# Global context to store tool executions by parent span ID +_tool_executions_by_agent = {} + +@contextmanager +def store_tool_execution(): + """Context manager to store tool execution details for later attachment to agent spans.""" + parent_span = get_current_span() + parent_span_id = getattr(parent_span.get_span_context(), "span_id", None) + + if parent_span_id: + if parent_span_id not in _tool_executions_by_agent: + _tool_executions_by_agent[parent_span_id] = [] + + tool_details = {} + + try: + yield tool_details + + if tool_details: + _tool_executions_by_agent[parent_span_id].append(tool_details) + finally: + pass + + +def attach_tool_executions_to_agent_span(span): + """Attach stored tool executions to the agent span.""" + span_id = getattr(span.get_span_context(), "span_id", None) + + if span_id and span_id in _tool_executions_by_agent: + for idx, tool_execution in enumerate(_tool_executions_by_agent[span_id]): + for key, value in tool_execution.items(): + if value is not None: + span.set_attribute(f"crewai.agent.tool_execution.{idx}.{key}", str(value)) + + del _tool_executions_by_agent[span_id] + + +class CrewAIInstrumentor(BaseInstrumentor): + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + + def _instrument(self, **kwargs): + application_name = kwargs.get("application_name", "default_application") + environment = kwargs.get("environment", "default_environment") + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(__name__, __version__, tracer_provider) + + meter_provider = kwargs.get("meter_provider") + meter = get_meter(__name__, __version__, meter_provider) + + if is_metrics_enabled(): + ( + token_histogram, + duration_histogram, + ) = _create_metrics(meter) + else: + ( + token_histogram, + duration_histogram, + ) = (None, None) + + wrap_function_wrapper("crewai.crew", "Crew.kickoff", wrap_kickoff(tracer, duration_histogram, token_histogram, environment, application_name)) + wrap_function_wrapper( + "crewai.agent", "Agent.execute_task", wrap_agent_execute_task(tracer, duration_histogram, token_histogram, environment, application_name) + ) + wrap_function_wrapper( + "crewai.task", "Task.execute_sync", wrap_task_execute(tracer, duration_histogram, token_histogram, environment, application_name) + ) + wrap_function_wrapper("crewai.llm", "LLM.call", wrap_llm_call(tracer, duration_histogram, token_histogram, environment, application_name)) + + wrap_function_wrapper( + "crewai.utilities.tool_utils", "execute_tool_and_check_finality", + wrap_tool_execution(tracer, duration_histogram, environment, application_name) + ) + + wrap_function_wrapper( + "crewai.tools.tool_usage", "ToolUsage.use", + wrap_tool_usage(tracer, environment, application_name) + ) + + def _uninstrument(self, **kwargs): + unwrap("crewai.crew", "Crew.kickoff") + unwrap("crewai.agent", "Agent.execute_task") + unwrap("crewai.task", "Task.execute_sync") + unwrap("crewai.llm", "LLM.call") + unwrap("crewai.utilities.tool_utils", "execute_tool_and_check_finality") + unwrap("crewai.tools.tool_usage", "ToolUsage.use") + + +def with_tracer_wrapper(func): + """Helper for providing tracer for wrapper functions.""" + + def _with_tracer(tracer, duration_histogram, token_histogram, environment, application_name): + def wrapper(wrapped, instance, args, kwargs): + return func(tracer, duration_histogram, token_histogram, environment, application_name, wrapped, instance, args, kwargs) + + return wrapper + + return _with_tracer + + +@with_tracer_wrapper +def wrap_kickoff( + tracer: Tracer, duration_histogram: Histogram, token_histogram: Histogram, environment, application_name, wrapped, instance, args, kwargs +): + logger.debug(f"CrewAI: Starting workflow instrumentation for Crew with {len(getattr(instance, 'agents', []))} agents") + with tracer.start_as_current_span( + "crewai.workflow", + kind=SpanKind.INTERNAL, + attributes={ + SpanAttributes.LLM_SYSTEM: "crewai", + }, + ) as span: + try: + span.set_attribute(TELEMETRY_SDK_NAME, "agentops") + span.set_attribute(SERVICE_NAME, application_name) + span.set_attribute(DEPLOYMENT_ENVIRONMENT, environment) + + logger.debug("CrewAI: Processing crew instance attributes") + + # First set general crew attributes but skip agent processing + crew_attrs = CrewAISpanAttributes(span=span, instance=instance, skip_agent_processing=True) + + # Prioritize agent processing before task execution + if hasattr(instance, 'agents') and instance.agents: + logger.debug(f"CrewAI: Explicitly processing {len(instance.agents)} agents before task execution") + crew_attrs._parse_agents(instance.agents) + + logger.debug("CrewAI: Executing wrapped crew kickoff function") + result = wrapped(*args, **kwargs) + + if result: + class_name = instance.__class__.__name__ + span.set_attribute(f"crewai.{class_name.lower()}.result", str(result)) + span.set_status(Status(StatusCode.OK)) + if class_name == "Crew": + if hasattr(result, "usage_metrics"): + span.set_attribute("crewai.crew.usage_metrics", str(getattr(result, "usage_metrics"))) + + if hasattr(result, "tasks_output") and result.tasks_output: + span.set_attribute("crewai.crew.tasks_output", str(result.tasks_output)) + + try: + task_details_by_description = {} + if hasattr(instance, "tasks"): + for task in instance.tasks: + if task is not None: + agent_id = "" + agent_role = "" + if hasattr(task, "agent") and task.agent: + agent_id = str(getattr(task.agent, "id", "")) + agent_role = getattr(task.agent, "role", "") + + tools = [] + if hasattr(task, "tools") and task.tools: + for tool in task.tools: + tool_info = {} + if hasattr(tool, "name"): + tool_info["name"] = tool.name + if hasattr(tool, "description"): + tool_info["description"] = tool.description + if tool_info: + tools.append(tool_info) + + task_details_by_description[task.description] = { + "agent_id": agent_id, + "agent_role": agent_role, + "async_execution": getattr(task, "async_execution", False), + "human_input": getattr(task, "human_input", False), + "output_file": getattr(task, "output_file", ""), + "tools": tools + } + + for idx, task_output in enumerate(result.tasks_output): + task_prefix = f"crewai.crew.tasks.{idx}" + + task_attrs = { + "description": getattr(task_output, "description", ""), + "name": getattr(task_output, "name", ""), + "expected_output": getattr(task_output, "expected_output", ""), + "summary": getattr(task_output, "summary", ""), + "raw": getattr(task_output, "raw", ""), + "agent": getattr(task_output, "agent", ""), + "output_format": str(getattr(task_output, "output_format", "")), + } + + for attr_name, attr_value in task_attrs.items(): + if attr_value: + if attr_name == "raw" and len(str(attr_value)) > 1000: + attr_value = str(attr_value)[:997] + "..." + span.set_attribute(f"{task_prefix}.{attr_name}", str(attr_value)) + + span.set_attribute(f"{task_prefix}.status", "completed") + span.set_attribute(f"{task_prefix}.id", str(idx)) + + description = task_attrs.get("description", "") + if description and description in task_details_by_description: + details = task_details_by_description[description] + + span.set_attribute(f"{task_prefix}.agent_id", details["agent_id"]) + span.set_attribute(f"{task_prefix}.async_execution", str(details["async_execution"])) + span.set_attribute(f"{task_prefix}.human_input", str(details["human_input"])) + + if details["output_file"]: + span.set_attribute(f"{task_prefix}.output_file", details["output_file"]) + + for tool_idx, tool in enumerate(details["tools"]): + for tool_key, tool_value in tool.items(): + span.set_attribute(f"{task_prefix}.tools.{tool_idx}.{tool_key}", str(tool_value)) + except Exception as ex: + logger.warning(f"Failed to parse task outputs: {ex}") + + if hasattr(result, "token_usage"): + token_usage = str(getattr(result, "token_usage")) + span.set_attribute("crewai.crew.token_usage", token_usage) + + try: + metrics = {} + for item in token_usage.split(): + if "=" in item: + key, value = item.split("=") + try: + metrics[key] = int(value) + except ValueError: + metrics[key] = value + + if "total_tokens" in metrics: + span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, metrics["total_tokens"]) + if "prompt_tokens" in metrics: + span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, metrics["prompt_tokens"]) + if "completion_tokens" in metrics: + span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, metrics["completion_tokens"]) + if "cached_prompt_tokens" in metrics: + span.set_attribute(SpanAttributes.LLM_USAGE_CACHE_READ_INPUT_TOKENS, metrics["cached_prompt_tokens"]) + if "successful_requests" in metrics: + span.set_attribute("crewai.crew.successful_requests", metrics["successful_requests"]) + + if "prompt_tokens" in metrics and "completion_tokens" in metrics and metrics["prompt_tokens"] > 0: + efficiency = metrics["completion_tokens"] / metrics["prompt_tokens"] + span.set_attribute("crewai.crew.token_efficiency", f"{efficiency:.4f}") + + if "cached_prompt_tokens" in metrics and "prompt_tokens" in metrics and metrics["prompt_tokens"] > 0: + cache_ratio = metrics["cached_prompt_tokens"] / metrics["prompt_tokens"] + span.set_attribute("crewai.crew.cache_efficiency", f"{cache_ratio:.4f}") + except Exception as ex: + logger.warning(f"Failed to parse token usage metrics: {ex}") + return result + except Exception as ex: + span.set_status(Status(StatusCode.ERROR, str(ex))) + logger.error("Error in trace creation: %s", ex) + raise + + +@with_tracer_wrapper +def wrap_agent_execute_task(tracer, duration_histogram, token_histogram, environment, application_name, wrapped, instance, args, kwargs): + agent_name = instance.role if hasattr(instance, "role") else "agent" + with tracer.start_as_current_span( + f"{agent_name}.agent", + kind=SpanKind.CLIENT, + attributes={ + SpanAttributes.AGENTOPS_SPAN_KIND: AgentOpsSpanKindValues.AGENT.value, + }, + ) as span: + try: + span.set_attribute(TELEMETRY_SDK_NAME, "agentops") + span.set_attribute(SERVICE_NAME, application_name) + span.set_attribute(DEPLOYMENT_ENVIRONMENT, environment) + + CrewAISpanAttributes(span=span, instance=instance) + + result = wrapped(*args, **kwargs) + + attach_tool_executions_to_agent_span(span) + + if token_histogram and hasattr(instance, "_token_process"): + token_histogram.record( + instance._token_process.get_summary().prompt_tokens, + attributes={ + SpanAttributes.LLM_SYSTEM: "crewai", + SpanAttributes.LLM_TOKEN_TYPE: "input", + SpanAttributes.LLM_RESPONSE_MODEL: str(instance.llm.model), + }, + ) + token_histogram.record( + instance._token_process.get_summary().completion_tokens, + attributes={ + SpanAttributes.LLM_SYSTEM: "crewai", + SpanAttributes.LLM_TOKEN_TYPE: "output", + SpanAttributes.LLM_RESPONSE_MODEL: str(instance.llm.model), + }, + ) + + if hasattr(instance, "llm") and hasattr(instance.llm, "model"): + set_span_attribute(span, SpanAttributes.LLM_REQUEST_MODEL, str(instance.llm.model)) + set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, str(instance.llm.model)) + + span.set_status(Status(StatusCode.OK)) + return result + except Exception as ex: + span.set_status(Status(StatusCode.ERROR, str(ex))) + logger.error("Error in trace creation: %s", ex) + raise + + +@with_tracer_wrapper +def wrap_task_execute(tracer, duration_histogram, token_histogram, environment, application_name, wrapped, instance, args, kwargs): + task_name = instance.description if hasattr(instance, "description") else "task" + + with tracer.start_as_current_span( + f"{task_name}.task", + kind=SpanKind.CLIENT, + attributes={ + SpanAttributes.AGENTOPS_SPAN_KIND: AgentOpsSpanKindValues.TASK.value, + }, + ) as span: + try: + span.set_attribute(TELEMETRY_SDK_NAME, "agentops") + span.set_attribute(SERVICE_NAME, application_name) + span.set_attribute(DEPLOYMENT_ENVIRONMENT, environment) + + CrewAISpanAttributes(span=span, instance=instance) + + result = wrapped(*args, **kwargs) + + set_span_attribute(span, SpanAttributes.AGENTOPS_ENTITY_OUTPUT, str(result)) + span.set_status(Status(StatusCode.OK)) + return result + except Exception as ex: + span.set_status(Status(StatusCode.ERROR, str(ex))) + logger.error("Error in trace creation: %s", ex) + raise + + +@with_tracer_wrapper +def wrap_llm_call(tracer, duration_histogram, token_histogram, environment, application_name, wrapped, instance, args, kwargs): + llm = instance.model if hasattr(instance, "model") else "llm" + with tracer.start_as_current_span(f"{llm}.llm", kind=SpanKind.CLIENT, attributes={}) as span: + start_time = time.time() + try: + span.set_attribute(TELEMETRY_SDK_NAME, "agentops") + span.set_attribute(SERVICE_NAME, application_name) + span.set_attribute(DEPLOYMENT_ENVIRONMENT, environment) + + CrewAISpanAttributes(span=span, instance=instance) + + result = wrapped(*args, **kwargs) + + if duration_histogram: + duration = time.time() - start_time + duration_histogram.record( + duration, + attributes={ + SpanAttributes.LLM_SYSTEM: "crewai", + SpanAttributes.LLM_RESPONSE_MODEL: str(instance.model), + }, + ) + + span.set_status(Status(StatusCode.OK)) + return result + except Exception as ex: + span.set_status(Status(StatusCode.ERROR, str(ex))) + logger.error("Error in trace creation: %s", ex) + raise + + +def wrap_tool_execution(tracer, duration_histogram, environment, application_name): + """Wrapper for tool execution function.""" + def wrapper(wrapped, instance, args, kwargs): + agent_action = args[0] if args else None + tools = args[1] if len(args) > 1 else [] + + if not agent_action: + return wrapped(*args, **kwargs) + + tool_name = getattr(agent_action, "tool", "unknown_tool") + tool_input = getattr(agent_action, "tool_input", "") + + with store_tool_execution() as tool_details: + tool_details["name"] = tool_name + tool_details["parameters"] = str(tool_input) + + matching_tool = next((tool for tool in tools if hasattr(tool, "name") and tool.name == tool_name), None) + if matching_tool and hasattr(matching_tool, "description"): + tool_details["description"] = str(matching_tool.description) + + with tracer.start_as_current_span( + f"{tool_name}.tool", + kind=SpanKind.CLIENT, + attributes={ + SpanAttributes.AGENTOPS_SPAN_KIND: "tool", + ToolAttributes.TOOL_NAME: tool_name, + ToolAttributes.TOOL_PARAMETERS: str(tool_input), + }, + ) as span: + start_time = time.time() + try: + span.set_attribute(TELEMETRY_SDK_NAME, "agentops") + span.set_attribute(SERVICE_NAME, application_name) + span.set_attribute(DEPLOYMENT_ENVIRONMENT, environment) + + if matching_tool and hasattr(matching_tool, "description"): + span.set_attribute(ToolAttributes.TOOL_DESCRIPTION, str(matching_tool.description)) + + result = wrapped(*args, **kwargs) + + if duration_histogram: + duration = time.time() - start_time + duration_histogram.record( + duration, + attributes={ + SpanAttributes.LLM_SYSTEM: "crewai", + ToolAttributes.TOOL_NAME: tool_name, + }, + ) + + if hasattr(result, "result"): + tool_result = str(result.result) + span.set_attribute(ToolAttributes.TOOL_RESULT, tool_result) + tool_details["result"] = tool_result + + tool_status = "success" if not hasattr(result, "error") or not result.error else "error" + span.set_attribute(ToolAttributes.TOOL_STATUS, tool_status) + tool_details["status"] = tool_status + + if hasattr(result, "error") and result.error: + tool_details["error"] = str(result.error) + + duration = time.time() - start_time + tool_details["duration"] = f"{duration:.3f}" + + span.set_status(Status(StatusCode.OK)) + return result + except Exception as ex: + tool_status = "error" + span.set_attribute(ToolAttributes.TOOL_STATUS, tool_status) + tool_details["status"] = tool_status + tool_details["error"] = str(ex) + + span.set_status(Status(StatusCode.ERROR, str(ex))) + logger.error(f"Error in tool execution trace: {ex}") + raise + + return wrapper + + +def wrap_tool_usage(tracer, environment, application_name): + """Wrapper for ToolUsage.use method.""" + def wrapper(wrapped, instance, args, kwargs): + calling = args[0] if args else None + tool_string = args[1] if len(args) > 1 else "" + + if not calling: + return wrapped(*args, **kwargs) + + tool_name = getattr(calling, "tool_name", "unknown_tool") + + with store_tool_execution() as tool_details: + tool_details["name"] = tool_name + + if hasattr(calling, "arguments") and calling.arguments: + tool_details["parameters"] = str(calling.arguments) + + with tracer.start_as_current_span( + f"{tool_name}.tool_usage", + kind=SpanKind.INTERNAL, + attributes={ + SpanAttributes.AGENTOPS_SPAN_KIND: "tool.usage", + ToolAttributes.TOOL_NAME: tool_name, + }, + ) as span: + try: + span.set_attribute(TELEMETRY_SDK_NAME, "agentops") + span.set_attribute(SERVICE_NAME, application_name) + span.set_attribute(DEPLOYMENT_ENVIRONMENT, environment) + + if hasattr(calling, "arguments") and calling.arguments: + span.set_attribute(ToolAttributes.TOOL_PARAMETERS, str(calling.arguments)) + + result = wrapped(*args, **kwargs) + + tool_result = str(result) + span.set_attribute(ToolAttributes.TOOL_RESULT, tool_result) + tool_details["result"] = tool_result + + tool_status = "success" + span.set_attribute(ToolAttributes.TOOL_STATUS, tool_status) + tool_details["status"] = tool_status + + span.set_status(Status(StatusCode.OK)) + return result + except Exception as ex: + tool_status = "error" + span.set_attribute(ToolAttributes.TOOL_STATUS, tool_status) + tool_details["status"] = tool_status + tool_details["error"] = str(ex) + + span.set_status(Status(StatusCode.ERROR, str(ex))) + logger.error(f"Error in tool usage trace: {ex}") + raise + + return wrapper + + +def is_metrics_enabled() -> bool: + return (os.getenv("AGENTOPS_METRICS_ENABLED") or "true").lower() == "true" + + +def _create_metrics(meter: Meter): + token_histogram = meter.create_histogram( + name=Meters.LLM_TOKEN_USAGE, + unit="token", + description="Measures number of input and output tokens used", + ) + + duration_histogram = meter.create_histogram( + name=Meters.LLM_OPERATION_DURATION, + unit="s", + description="GenAI operation duration", + ) + + return token_histogram, duration_histogram diff --git a/third_party/opentelemetry/instrumentation/crewai/version.py b/agentops/instrumentation/crewai/version.py similarity index 100% rename from third_party/opentelemetry/instrumentation/crewai/version.py rename to agentops/instrumentation/crewai/version.py diff --git a/agentops/semconv/agent.py b/agentops/semconv/agent.py index db5bd97ca..296e77851 100644 --- a/agentops/semconv/agent.py +++ b/agentops/semconv/agent.py @@ -8,6 +8,7 @@ class AgentAttributes: AGENT_ID = "agent.id" # Unique identifier for the agent AGENT_NAME = "agent.name" # Name of the agent AGENT_ROLE = "agent.role" # Role of the agent + AGENT = "agent" # Root prefix for agent attributes # Capabilities AGENT_TOOLS = "agent.tools" # Tools available to the agent diff --git a/examples/crewai_examples/db/2acd4cdf-cba0-40f5-ae71-ffed7e64c152/data_level0.bin b/examples/crewai_examples/db/2acd4cdf-cba0-40f5-ae71-ffed7e64c152/data_level0.bin deleted file mode 100644 index ea3192e8e..000000000 Binary files a/examples/crewai_examples/db/2acd4cdf-cba0-40f5-ae71-ffed7e64c152/data_level0.bin and /dev/null differ diff --git a/examples/crewai_examples/db/2acd4cdf-cba0-40f5-ae71-ffed7e64c152/header.bin b/examples/crewai_examples/db/2acd4cdf-cba0-40f5-ae71-ffed7e64c152/header.bin deleted file mode 100644 index 3e0932a7d..000000000 Binary files a/examples/crewai_examples/db/2acd4cdf-cba0-40f5-ae71-ffed7e64c152/header.bin and /dev/null differ diff --git a/examples/crewai_examples/db/2acd4cdf-cba0-40f5-ae71-ffed7e64c152/length.bin b/examples/crewai_examples/db/2acd4cdf-cba0-40f5-ae71-ffed7e64c152/length.bin deleted file mode 100644 index fd74c49c5..000000000 Binary files a/examples/crewai_examples/db/2acd4cdf-cba0-40f5-ae71-ffed7e64c152/length.bin and /dev/null differ diff --git a/examples/crewai_examples/db/2acd4cdf-cba0-40f5-ae71-ffed7e64c152/link_lists.bin b/examples/crewai_examples/db/2acd4cdf-cba0-40f5-ae71-ffed7e64c152/link_lists.bin deleted file mode 100644 index e69de29bb..000000000 diff --git a/examples/crewai_examples/db/chroma.sqlite3 b/examples/crewai_examples/db/chroma.sqlite3 deleted file mode 100644 index 45d56951b..000000000 Binary files a/examples/crewai_examples/db/chroma.sqlite3 and /dev/null differ diff --git a/examples/crewai_examples/job_posting.md b/examples/crewai_examples/job_posting.md index dfb284fd5..c6dc0275f 100644 --- a/examples/crewai_examples/job_posting.md +++ b/examples/crewai_examples/job_posting.md @@ -1,35 +1,50 @@ -# Game Design Specialist -**Location:** Los Angeles, CA (Hybrid Work Available) - -## Introduction: -At Riot Games, we are driven by a single mission—to be the most player-focused gaming company in the world. Our culture thrives on innovation, empowerment, and collaboration, where every member of our team is valued and has the opportunity to influence player experiences at every turn. If you're passionate about gaming and excited to create unforgettable experiences that resonate with players, we would love to hear from you! - -## Role Description: -We are seeking a dedicated and talented **Game Design Specialist** who is ready to contribute to our mission. As a key member of our development team, you will play an essential role in designing engaging game mechanics, fostering player engagement, and ensuring our games exceed player expectations. Your insights will be critical in shaping the future of gaming at Riot Games. - -## Responsibilities: -- Collaborate with cross-functional teams to design and implement game features and mechanics. -- Analyze player feedback and data to continuously improve game experience and engagement. -- Develop and maintain game design documentation and prototypes for new gameplay features. -- Conduct playtests to gather insights and iterate on game designs. -- Engage with the community to understand player needs and incorporate their feedback into design choices. - -## Requirements: -- At least 5 years of experience in game design or a related technical field. -- Proven expertise in game mechanics design and player engagement strategies. -- Strong analytical skills with a knack for data analysis and user feedback interpretation. -- Excellent teamwork and interpersonal skills, fostering a collaborative work environment. -- A genuine passion for gaming and in-depth knowledge of current trends in the gaming industry. - -## Qualities and Characteristics: -- Player-focused mindset ensuring that player feedback drives every design choice. -- Creative problem solver with a history of innovative solutions to complex design challenges. -- Ambitious and humble, eager to learn continuously and share knowledge with the team. - -## Unique Benefits: -- Join a dynamic and inclusive workplace that values diversity and creativity. -- Opportunities for social impact through community engagement projects. -- A modern workspace featuring themed meeting rooms and recreational areas designed for player and employee experiences alike. -- Enjoy food and perks that reflect our dedication to our team’s comfort and engagement. - -Are you ready to take on the challenge and join a passionate team dedicated to enriching the world of gaming? **Apply now** and contribute to creating impactful and memorable player experiences with Riot Games! \ No newline at end of file +```markdown +# Senior Project Manager + +## About Burwood Partners +At Burwood Partners, we pride ourselves on our commitment to integrity, innovation, and a client-centric approach. We thrive in an adaptable, collaborative environment where our diverse talents converge to tackle complex consulting challenges. + +## Job Description +We are seeking a **Senior Project Manager** to join our dynamic team. In this role, you will lead cross-functional projects, ensure the successful delivery of consulting solutions, and foster strong relationships with our clients. Your expertise will help us navigate the evolving landscape of management consulting and make a significant impact on our clients' success. + +### Key Responsibilities +- Manage and oversee multiple projects from initiation to completion while ensuring adherence to quality standards and deadlines. +- Collaborate with clients to understand their unique needs, leverage technology, and develop solutions that drive value. +- Lead and mentor project team members, promoting a culture of continuous learning and improvement. +- Use data analysis and strategic planning skills to provide insightful recommendations to clients. +- Drive innovative solutions that align with industry trends, particularly in Environmental, Social, and Governance (ESG) practices. + +### Key Skills +- **Technology Proficiency**: Experience with AI, data analytics, and consulting tools. +- **Data Analysis**: Strong analytical skills to extract insights and make informed decisions. +- **Change Management**: Proven ability to facilitate organizational change effectively. +- **Strategic Planning**: Develop strategies that align with client goals. + +### Qualities +- Integrity and Strong Ethical Standards: A commitment to maintaining trust and ethical practices with clients. +- Client-Centric Orientation: A focus on delivering tailored, impactful solutions for clients. +- Adaptability and Innovation: Open to changes and developing innovative approaches to complex challenges. +- Strong Communication Skills: Excellent verbal and written skills to interact effectively with clients and team members. + +### What We Offer +- A flexible remote work environment. +- Opportunities for professional growth and advancement. +- A chance to work with industry leaders in ESG consulting. +- Competitive compensation and benefits package. + +## Join Us! +This is an exciting opportunity for professionals who thrive in a fast-paced, collaborative setting and are enthusiastic about driving positive change. If you are an innovative thinker with a commitment to excellence, we invite you to apply for the position of Senior Project Manager at Burwood Partners. + +**Apply Now!** + +--- + +We look forward to meeting candidates who are passionate about consulting and eager to make a difference. +``` + +### Feedback on Potential Improvements: +1. **Engage with Candidates**: Use proactive language that speaks directly to prospective applicants and encourages them to envision themselves in the role. +2. **Diversity and Inclusion**: Consider adding a statement about your commitment to a diverse and inclusive workplace. +3. **Application Process**: Clearly outline how to apply to enhance the candidate experience. + +With these revisions, the job posting is ready for final approval and publishing. \ No newline at end of file diff --git a/third_party/opentelemetry/instrumentation/crewai/__init__.py b/third_party/opentelemetry/instrumentation/crewai/__init__.py deleted file mode 100644 index a452a7f28..000000000 --- a/third_party/opentelemetry/instrumentation/crewai/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -"""OpenTelemetry CrewAI instrumentation""" - -from opentelemetry.instrumentation.crewai.version import __version__ -from opentelemetry.instrumentation.crewai.instrumentation import CrewAIInstrumentor - -__all__ = ["CrewAIInstrumentor", "__version__"] diff --git a/third_party/opentelemetry/instrumentation/crewai/crewai_span_attributes.py b/third_party/opentelemetry/instrumentation/crewai/crewai_span_attributes.py deleted file mode 100644 index a367d7f76..000000000 --- a/third_party/opentelemetry/instrumentation/crewai/crewai_span_attributes.py +++ /dev/null @@ -1,143 +0,0 @@ -from opentelemetry.trace import Span -import json - - -def set_span_attribute(span: Span, name, value): - if value is not None: - if value != "": - span.set_attribute(name, value) - return - - -class CrewAISpanAttributes: - def __init__(self, span: Span, instance) -> None: - self.span = span - self.instance = instance - self.crew = {"tasks": [], "agents": [], "llms": []} - self.process_instance() - - def process_instance(self): - instance_type = self.instance.__class__.__name__ - method_mapping = { - "Crew": self._process_crew, - "Agent": self._process_agent, - "Task": self._process_task, - "LLM": self._process_llm, - } - method = method_mapping.get(instance_type) - if method: - method() - - def _process_crew(self): - self._populate_crew_attributes() - for key, value in self.crew.items(): - self._set_attribute(f"crewai.crew.{key}", value) - - def _process_agent(self): - agent_data = self._populate_agent_attributes() - for key, value in agent_data.items(): - self._set_attribute(f"crewai.agent.{key}", value) - - def _process_task(self): - task_data = self._populate_task_attributes() - for key, value in task_data.items(): - self._set_attribute(f"crewai.task.{key}", value) - - def _process_llm(self): - llm_data = self._populate_llm_attributes() - for key, value in llm_data.items(): - self._set_attribute(f"crewai.llm.{key}", value) - - def _populate_crew_attributes(self): - for key, value in self.instance.__dict__.items(): - if value is None: - continue - if key == "tasks": - self._parse_tasks(value) - elif key == "agents": - self._parse_agents(value) - elif key == "llms": - self._parse_llms(value) - else: - self.crew[key] = str(value) - - def _populate_agent_attributes(self): - return self._extract_attributes(self.instance) - - def _populate_task_attributes(self): - task_data = self._extract_attributes(self.instance) - if "agent" in task_data: - task_data["agent"] = self.instance.agent.role if self.instance.agent else None - return task_data - - def _populate_llm_attributes(self): - return self._extract_attributes(self.instance) - - def _parse_agents(self, agents): - self.crew["agents"] = [self._extract_agent_data(agent) for agent in agents if agent is not None] - - def _parse_tasks(self, tasks): - self.crew["tasks"] = [ - { - "agent": task.agent.role if task.agent else None, - "description": task.description, - "async_execution": task.async_execution, - "expected_output": task.expected_output, - "human_input": task.human_input, - "tools": task.tools, - "output_file": task.output_file, - } - for task in tasks - ] - - def _parse_llms(self, llms): - self.crew["tasks"] = [ - { - "temperature": llm.temperature, - "max_tokens": llm.max_tokens, - "max_completion_tokens": llm.max_completion_tokens, - "top_p": llm.top_p, - "n": llm.n, - "seed": llm.seed, - "base_url": llm.base_url, - "api_version": llm.api_version, - } - for llm in llms - ] - - def _extract_agent_data(self, agent): - model = getattr(agent.llm, "model", None) or getattr(agent.llm, "model_name", None) or "" - - return { - "id": str(agent.id), - "role": agent.role, - "goal": agent.goal, - "backstory": agent.backstory, - "cache": agent.cache, - "config": agent.config, - "verbose": agent.verbose, - "allow_delegation": agent.allow_delegation, - "tools": agent.tools, - "max_iter": agent.max_iter, - "llm": str(model), - } - - def _extract_attributes(self, obj): - attributes = {} - for key, value in obj.__dict__.items(): - if value is None: - continue - if key == "tools": - attributes[key] = self._serialize_tools(value) - else: - attributes[key] = str(value) - return attributes - - def _serialize_tools(self, tools): - return json.dumps( - [{k: v for k, v in vars(tool).items() if v is not None and k in ["name", "description"]} for tool in tools] - ) - - def _set_attribute(self, key, value): - if value: - set_span_attribute(self.span, key, str(value) if isinstance(value, list) else value) diff --git a/third_party/opentelemetry/instrumentation/crewai/instrumentation.py b/third_party/opentelemetry/instrumentation/crewai/instrumentation.py deleted file mode 100644 index 9611c7272..000000000 --- a/third_party/opentelemetry/instrumentation/crewai/instrumentation.py +++ /dev/null @@ -1,201 +0,0 @@ -import os -import time -from typing import Collection - -from wrapt import wrap_function_wrapper -from opentelemetry.trace import SpanKind, get_tracer, Tracer -from opentelemetry.trace.status import Status, StatusCode -from opentelemetry.metrics import Histogram, Meter, get_meter -from opentelemetry.instrumentation.utils import unwrap -from opentelemetry.instrumentation.instrumentor import BaseInstrumentor -from opentelemetry.instrumentation.crewai.version import __version__ -from agentops.semconv import SpanAttributes, AgentOpsSpanKindValues, Meters -from .crewai_span_attributes import CrewAISpanAttributes, set_span_attribute - -_instruments = ("crewai >= 0.70.0",) - - -class CrewAIInstrumentor(BaseInstrumentor): - def instrumentation_dependencies(self) -> Collection[str]: - return _instruments - - def _instrument(self, **kwargs): - tracer_provider = kwargs.get("tracer_provider") - tracer = get_tracer(__name__, __version__, tracer_provider) - - meter_provider = kwargs.get("meter_provider") - meter = get_meter(__name__, __version__, meter_provider) - - if is_metrics_enabled(): - ( - token_histogram, - duration_histogram, - ) = _create_metrics(meter) - else: - ( - token_histogram, - duration_histogram, - ) = (None, None, None, None) - - wrap_function_wrapper("crewai.crew", "Crew.kickoff", wrap_kickoff(tracer, duration_histogram, token_histogram)) - wrap_function_wrapper( - "crewai.agent", "Agent.execute_task", wrap_agent_execute_task(tracer, duration_histogram, token_histogram) - ) - wrap_function_wrapper( - "crewai.task", "Task.execute_sync", wrap_task_execute(tracer, duration_histogram, token_histogram) - ) - wrap_function_wrapper("crewai.llm", "LLM.call", wrap_llm_call(tracer, duration_histogram, token_histogram)) - - def _uninstrument(self, **kwargs): - unwrap("crewai.crew.Crew", "kickoff") - unwrap("crewai.agent.Agent", "execute_task") - unwrap("crewai.task.Task", "execute_sync") - unwrap("crewai.llm.LLM", "call") - - -def with_tracer_wrapper(func): - """Helper for providing tracer for wrapper functions.""" - - def _with_tracer(tracer, duration_histogram, token_histogram): - def wrapper(wrapped, instance, args, kwargs): - return func(tracer, duration_histogram, token_histogram, wrapped, instance, args, kwargs) - - return wrapper - - return _with_tracer - - -@with_tracer_wrapper -def wrap_kickoff( - tracer: Tracer, duration_histogram: Histogram, token_histogram: Histogram, wrapped, instance, args, kwargs -): - with tracer.start_as_current_span( - "crewai.workflow", - kind=SpanKind.INTERNAL, - attributes={ - SpanAttributes.LLM_SYSTEM: "crewai", - }, - ) as span: - try: - CrewAISpanAttributes(span=span, instance=instance) - result = wrapped(*args, **kwargs) - if result: - class_name = instance.__class__.__name__ - span.set_attribute(f"crewai.{class_name.lower()}.result", str(result)) - span.set_status(Status(StatusCode.OK)) - if class_name == "Crew": - for attr in ["tasks_output", "token_usage", "usage_metrics"]: - if hasattr(result, attr): - span.set_attribute(f"crewai.crew.{attr}", str(getattr(result, attr))) - return result - except Exception as ex: - span.set_status(Status(StatusCode.ERROR, str(ex))) - raise - - -@with_tracer_wrapper -def wrap_agent_execute_task(tracer, duration_histogram, token_histogram, wrapped, instance, args, kwargs): - agent_name = instance.role if hasattr(instance, "role") else "agent" - with tracer.start_as_current_span( - f"{agent_name}.agent", - kind=SpanKind.CLIENT, - attributes={ - SpanAttributes.AGENTOPS_SPAN_KIND: AgentOpsSpanKindValues.AGENT.value, - }, - ) as span: - try: - CrewAISpanAttributes(span=span, instance=instance) - result = wrapped(*args, **kwargs) - if token_histogram: - token_histogram.record( - instance._token_process.get_summary().prompt_tokens, - attributes={ - SpanAttributes.LLM_SYSTEM: "crewai", - SpanAttributes.LLM_TOKEN_TYPE: "input", - SpanAttributes.LLM_RESPONSE_MODEL: str(instance.llm.model), - }, - ) - token_histogram.record( - instance._token_process.get_summary().completion_tokens, - attributes={ - SpanAttributes.LLM_SYSTEM: "crewai", - SpanAttributes.LLM_TOKEN_TYPE: "output", - SpanAttributes.LLM_RESPONSE_MODEL: str(instance.llm.model), - }, - ) - - set_span_attribute(span, SpanAttributes.LLM_REQUEST_MODEL, str(instance.llm.model)) - set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, str(instance.llm.model)) - span.set_status(Status(StatusCode.OK)) - return result - except Exception as ex: - span.set_status(Status(StatusCode.ERROR, str(ex))) - raise - - -@with_tracer_wrapper -def wrap_task_execute(tracer, duration_histogram, token_histogram, wrapped, instance, args, kwargs): - task_name = instance.description if hasattr(instance, "description") else "task" - - with tracer.start_as_current_span( - f"{task_name}.task", - kind=SpanKind.CLIENT, - attributes={ - SpanAttributes.AGENTOPS_SPAN_KIND: AgentOpsSpanKindValues.TASK.value, - }, - ) as span: - try: - CrewAISpanAttributes(span=span, instance=instance) - result = wrapped(*args, **kwargs) - set_span_attribute(span, SpanAttributes.AGENTOPS_ENTITY_OUTPUT, str(result)) - span.set_status(Status(StatusCode.OK)) - return result - except Exception as ex: - span.set_status(Status(StatusCode.ERROR, str(ex))) - raise - - -@with_tracer_wrapper -def wrap_llm_call(tracer, duration_histogram, token_histogram, wrapped, instance, args, kwargs): - llm = instance.model if hasattr(instance, "model") else "llm" - with tracer.start_as_current_span(f"{llm}.llm", kind=SpanKind.CLIENT, attributes={}) as span: - start_time = time.time() - try: - CrewAISpanAttributes(span=span, instance=instance) - result = wrapped(*args, **kwargs) - - if duration_histogram: - duration = time.time() - start_time - duration_histogram.record( - duration, - attributes={ - SpanAttributes.LLM_SYSTEM: "crewai", - SpanAttributes.LLM_RESPONSE_MODEL: str(instance.model), - }, - ) - - span.set_status(Status(StatusCode.OK)) - return result - except Exception as ex: - span.set_status(Status(StatusCode.ERROR, str(ex))) - raise - - -def is_metrics_enabled() -> bool: - return (os.getenv("AGENTOPS_METRICS_ENABLED") or "true").lower() == "true" - - -def _create_metrics(meter: Meter): - token_histogram = meter.create_histogram( - name=Meters.LLM_TOKEN_USAGE, - unit="token", - description="Measures number of input and output tokens used", - ) - - duration_histogram = meter.create_histogram( - name=Meters.LLM_OPERATION_DURATION, - unit="s", - description="GenAI operation duration", - ) - - return token_histogram, duration_histogram