diff --git a/agentops/instrumentation/crewai/instrumentation.py b/agentops/instrumentation/crewai/instrumentation.py index 3a2964733..b091a701c 100644 --- a/agentops/instrumentation/crewai/instrumentation.py +++ b/agentops/instrumentation/crewai/instrumentation.py @@ -13,7 +13,9 @@ from opentelemetry.sdk.resources import SERVICE_NAME, TELEMETRY_SDK_NAME, DEPLOYMENT_ENVIRONMENT from agentops.instrumentation.crewai.version import __version__ from agentops.semconv import SpanAttributes, AgentOpsSpanKindValues, Meters, ToolAttributes, MessageAttributes +from agentops.semconv.core import CoreAttributes from .crewai_span_attributes import CrewAISpanAttributes, set_span_attribute +from agentops import get_client # Initialize logger logger = logging.getLogger(__name__) @@ -159,12 +161,20 @@ def wrap_kickoff( logger.debug( f"CrewAI: Starting workflow instrumentation for Crew with {len(getattr(instance, 'agents', []))} agents" ) + + config = get_client().config + attributes = { + SpanAttributes.LLM_SYSTEM: "crewai", + } + + if config.default_tags and len(config.default_tags) > 0: + tag_list = list(config.default_tags) + attributes[CoreAttributes.TAGS] = tag_list + with tracer.start_as_current_span( "crewai.workflow", kind=SpanKind.INTERNAL, - attributes={ - SpanAttributes.LLM_SYSTEM: "crewai", - }, + attributes=attributes, ) as span: try: span.set_attribute(TELEMETRY_SDK_NAME, "agentops") @@ -381,12 +391,21 @@ def wrap_task_execute( ): task_name = instance.description if hasattr(instance, "description") else "task" + config = get_client().config + attributes = { + SpanAttributes.AGENTOPS_SPAN_KIND: AgentOpsSpanKindValues.TASK.value, + } + + if config.default_tags and len(config.default_tags) > 0: + tag_list = list(config.default_tags) + # TODO: This should be a set to prevent duplicates, but we need to ensure + # that the tags are not modified in place, so we convert to list first. + attributes[CoreAttributes.TAGS] = tag_list + with tracer.start_as_current_span( f"{task_name}.task", kind=SpanKind.CLIENT, - attributes={ - SpanAttributes.AGENTOPS_SPAN_KIND: AgentOpsSpanKindValues.TASK.value, - }, + attributes=attributes, ) as span: try: span.set_attribute(TELEMETRY_SDK_NAME, "agentops") diff --git a/tests/unit/test_session_legacy.py b/tests/unit/test_session_legacy.py index 610fc42eb..509efa055 100644 --- a/tests/unit/test_session_legacy.py +++ b/tests/unit/test_session_legacy.py @@ -144,3 +144,55 @@ def test_crewai_kwargs_force_flush(): # Explicitly ensure the core isn't already shut down for the test assert TracingCore.get_instance()._initialized, "TracingCore should still be initialized" + + +def test_crewai_task_instrumentation(instrumentation): + """ + Test the CrewAI task instrumentation focusing on span attributes and tags. + This test verifies that task spans are properly created with correct attributes + and tags without requiring a session. + """ + import agentops + from opentelemetry.trace import SpanKind + from agentops.semconv import SpanAttributes, AgentOpsSpanKindValues + from opentelemetry import trace + from agentops.semconv.core import CoreAttributes + + # Initialize AgentOps with API key and default tags + agentops.init( + api_key="test-api-key", + ) + agentops.start_session(tags=["test", "crewai-integration"]) + # Get the tracer + tracer = trace.get_tracer(__name__) + + # Create a mock task instance + class MockTask: + def __init__(self): + self.description = "Test Task Description" + self.agent = "Test Agent" + self.tools = ["tool1", "tool2"] + + task = MockTask() + + # Start a span for the task + with tracer.start_as_current_span( + f"{task.description}.task", + kind=SpanKind.CLIENT, + attributes={ + SpanAttributes.AGENTOPS_SPAN_KIND: AgentOpsSpanKindValues.TASK.value, + CoreAttributes.TAGS: ["crewai", "task-test"], + }, + ) as span: + # Verify span attributes + assert span.attributes[SpanAttributes.AGENTOPS_SPAN_KIND] == AgentOpsSpanKindValues.TASK.value + assert "crewai" in span.attributes[CoreAttributes.TAGS] + assert "task-test" in span.attributes[CoreAttributes.TAGS] + + # Verify span name + assert span.name == f"{task.description}.task" + + # Verify span kind + assert span.kind == SpanKind.CLIENT + + agentops.end_session(end_state="Success", end_state_reason="Test Finished", is_auto_end=True)