diff --git a/agentops/sdk/decorators/__init__.py b/agentops/sdk/decorators/__init__.py index 706bd4624..c17b08aa7 100644 --- a/agentops/sdk/decorators/__init__.py +++ b/agentops/sdk/decorators/__init__.py @@ -14,8 +14,9 @@ operation = create_entity_decorator(SpanKind.OPERATION) workflow = create_entity_decorator(SpanKind.WORKFLOW) session = create_entity_decorator(SpanKind.SESSION) +tool = create_entity_decorator(SpanKind.TOOL) operation = task -__all__ = ["agent", "task", "workflow", "session", "operation"] +__all__ = ["agent", "task", "workflow", "session", "operation", "tool"] # Create decorators task, workflow, session, agent diff --git a/agentops/sdk/decorators/factory.py b/agentops/sdk/decorators/factory.py index 13b09789d..582005ccd 100644 --- a/agentops/sdk/decorators/factory.py +++ b/agentops/sdk/decorators/factory.py @@ -2,10 +2,12 @@ import functools import asyncio + import wrapt # type: ignore from agentops.logging import logger from agentops.sdk.core import TracingCore +from agentops.semconv.span_attributes import SpanAttributes from .utility import ( _create_as_current_span, @@ -28,10 +30,10 @@ def create_entity_decorator(entity_kind: str): A decorator with optional arguments for name and version """ - def decorator(wrapped=None, *, name=None, version=None): + def decorator(wrapped=None, *, name=None, version=None, cost=None): # Handle case where decorator is called with parameters if wrapped is None: - return functools.partial(decorator, name=name, version=version) + return functools.partial(decorator, name=name, version=version, cost=cost) # Handle class decoration if inspect.isclass(wrapped): @@ -39,6 +41,7 @@ def decorator(wrapped=None, *, name=None, version=None): class WrappedClass(wrapped): def __init__(self, *args, **kwargs): operation_name = name or wrapped.__name__ + self._agentops_span_context_manager = _create_as_current_span(operation_name, entity_kind, version) self._agentops_active_span = self._agentops_span_context_manager.__enter__() @@ -51,23 +54,18 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) async def __aenter__(self): - # Added for async context manager support - # This allows using the class with 'async with' statement - # If span is already created in __init__, just return self if hasattr(self, "_agentops_active_span") and self._agentops_active_span is not None: return self # Otherwise create span (for backward compatibility) operation_name = name or wrapped.__name__ + self._agentops_span_context_manager = _create_as_current_span(operation_name, entity_kind, version) self._agentops_active_span = self._agentops_span_context_manager.__enter__() return self async def __aexit__(self, exc_type, exc_val, exc_tb): - # Added for proper async cleanup - # This ensures spans are properly closed when using 'async with' - if hasattr(self, "_agentops_active_span") and hasattr(self, "_agentops_span_context_manager"): try: _record_entity_output(self._agentops_active_span, self) @@ -104,10 +102,12 @@ def wrapper(wrapped, instance, args, kwargs): # Handle generator functions if is_generator: - # Use the old approach for generators span, ctx, token = _make_span(operation_name, entity_kind, version) try: _record_entity_input(span, args, kwargs) + # Set cost attribute if tool + if entity_kind == "tool" and cost is not None: + span.set_attribute(SpanAttributes.LLM_USAGE_TOOL_COST, cost) except Exception as e: logger.warning(f"Failed to record entity input: {e}") @@ -116,10 +116,12 @@ def wrapper(wrapped, instance, args, kwargs): # Handle async generator functions elif is_async_generator: - # Use the old approach for async generators span, ctx, token = _make_span(operation_name, entity_kind, version) try: _record_entity_input(span, args, kwargs) + # Set cost attribute if tool + if entity_kind == "tool" and cost is not None: + span.set_attribute(SpanAttributes.LLM_USAGE_TOOL_COST, cost) except Exception as e: logger.warning(f"Failed to record entity input: {e}") @@ -133,6 +135,9 @@ async def _wrapped_async(): with _create_as_current_span(operation_name, entity_kind, version) as span: try: _record_entity_input(span, args, kwargs) + # Set cost attribute if tool + if entity_kind == "tool" and cost is not None: + span.set_attribute(SpanAttributes.LLM_USAGE_TOOL_COST, cost) except Exception as e: logger.warning(f"Failed to record entity input: {e}") @@ -144,6 +149,7 @@ async def _wrapped_async(): logger.warning(f"Failed to record entity output: {e}") return result except Exception as e: + logger.error(f"Error in async function execution: {e}") span.record_exception(e) raise @@ -154,7 +160,9 @@ async def _wrapped_async(): with _create_as_current_span(operation_name, entity_kind, version) as span: try: _record_entity_input(span, args, kwargs) - + # Set cost attribute if tool + if entity_kind == "tool" and cost is not None: + span.set_attribute(SpanAttributes.LLM_USAGE_TOOL_COST, cost) except Exception as e: logger.warning(f"Failed to record entity input: {e}") @@ -167,6 +175,7 @@ async def _wrapped_async(): logger.warning(f"Failed to record entity output: {e}") return result except Exception as e: + logger.error(f"Error in sync function execution: {e}") span.record_exception(e) raise diff --git a/agentops/semconv/span_attributes.py b/agentops/semconv/span_attributes.py index 0561c3910..23d1d3d23 100644 --- a/agentops/semconv/span_attributes.py +++ b/agentops/semconv/span_attributes.py @@ -65,6 +65,7 @@ class SpanAttributes: LLM_USAGE_CACHE_READ_INPUT_TOKENS = "gen_ai.usage.cache_read_input_tokens" LLM_USAGE_REASONING_TOKENS = "gen_ai.usage.reasoning_tokens" LLM_USAGE_STREAMING_TOKENS = "gen_ai.usage.streaming_tokens" + LLM_USAGE_TOOL_COST = "gen_ai.usage.total_cost" # Message attributes # see ./message.py for message-related attributes diff --git a/docs/v1/concepts/sessions.mdx b/docs/v1/concepts/sessions.mdx index ec98cb7f5..60a503c8f 100644 --- a/docs/v1/concepts/sessions.mdx +++ b/docs/v1/concepts/sessions.mdx @@ -28,6 +28,7 @@ Optionally, sessions may include: - **Host Environment**: Automatically gathers basic information about the system on which the session ran. - **Video**: If applicable, an optional video recording of the session. + ### Methods #### `end_session` **Params** diff --git a/docs/v2/concepts/decorators.mdx b/docs/v2/concepts/decorators.mdx index 36206a94e..4596e4193 100644 --- a/docs/v2/concepts/decorators.mdx +++ b/docs/v2/concepts/decorators.mdx @@ -13,6 +13,7 @@ AgentOps provides the following decorators: | `@operation` | Track discrete operations performed by agents | OPERATION span | | `@workflow` | Track a sequence of operations | WORKFLOW span | | `@task` | Track smaller units of work (similar to operations) | TASK span | +| `@tool` | Track tool usage and cost in agent operations | TOOL span | ## Decorator Hierarchy @@ -190,6 +191,50 @@ class DataProcessor: The `@task` and `@operation` decorators function identically (they are aliases in the codebase), and you can choose the one that best fits your semantic needs. +### @tool + +The `@tool` decorator tracks tool usage within agent operations and supports cost tracking. It works with all function types: synchronous, asynchronous, generator, and async generator. + +```python +from agentops.sdk.decorators import agent, tool +import asyncio + +@agent +class ProcessingAgent: + def __init__(self): + pass + + @tool(cost=0.01) + def sync_tool(self, item): + """Synchronous tool with cost tracking.""" + return f"Processed {item}" + + @tool(cost=0.02) + async def async_tool(self, item): + """Asynchronous tool with cost tracking.""" + await asyncio.sleep(0.1) + return f"Async processed {item}" + + @tool(cost=0.03) + def generator_tool(self, items): + """Generator tool with cost tracking.""" + for item in items: + yield self.sync_tool(item) + + @tool(cost=0.04) + async def async_generator_tool(self, items): + """Async generator tool with cost tracking.""" + for item in items: + await asyncio.sleep(0.1) + yield await self.async_tool(item) +``` + +The tool decorator provides: +- Cost tracking for each tool call +- Proper span creation and nesting +- Support for all function types (sync, async, generator, async generator) +- Cost accumulation in generator and async generator operations + ## Decorator Attributes You can pass additional attributes to decorators: diff --git a/docs/v2/usage/sdk-reference.mdx b/docs/v2/usage/sdk-reference.mdx index 7bd33a45b..6852bc7e9 100644 --- a/docs/v2/usage/sdk-reference.mdx +++ b/docs/v2/usage/sdk-reference.mdx @@ -145,6 +145,7 @@ my_workflow() - `@agent`: Creates an agent span for tracking agent operations - `@operation` / `@task`: Creates operation/task spans for tracking specific operations (these are aliases) - `@workflow`: Creates workflow spans for organizing related operations +- `@tool`: Creates tool spans for tracking tool usage and cost in agent operations. Supports cost parameter for tracking tool usage costs. See [Decorators](/v2/concepts/decorators) for more detailed documentation on using these decorators. diff --git a/tests/unit/sdk/test_decorators.py b/tests/unit/sdk/test_decorators.py index 98f1cd402..96824d0fe 100644 --- a/tests/unit/sdk/test_decorators.py +++ b/tests/unit/sdk/test_decorators.py @@ -2,7 +2,7 @@ import asyncio import pytest -from agentops.sdk.decorators import agent, operation, session, workflow, task +from agentops.sdk.decorators import agent, operation, session, workflow, task, tool from agentops.semconv import SpanKind from agentops.semconv.span_attributes import SpanAttributes from tests.unit.sdk.instrumentation_tester import InstrumentationTester @@ -624,3 +624,140 @@ def __init__(self): with pytest.raises(ValueError): async with TestClass() as instance: raise ValueError("Trigger exception for __aexit__ coverage") + + +class TestToolDecorator: + """Tests for the tool decorator functionality.""" + + @pytest.fixture + def agent_class(self): + @agent + class TestAgent: + @tool(cost=0.01) + def process_item(self, item): + return f"Processed {item}" + + @tool(cost=0.02) + async def async_process_item(self, item): + await asyncio.sleep(0.1) + return f"Async processed {item}" + + @tool(cost=0.03) + def generator_process_items(self, items): + for item in items: + yield self.process_item(item) + + @tool(cost=0.04) + async def async_generator_process_items(self, items): + for item in items: + await asyncio.sleep(0.1) + yield await self.async_process_item(item) + + return TestAgent() + + def test_sync_tool_cost(self, agent_class, instrumentation: InstrumentationTester): + """Test synchronous tool with cost attribute.""" + result = agent_class.process_item("test") + + assert result == "Processed test" + + spans = instrumentation.get_finished_spans() + tool_span = next( + span for span in spans if span.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.TOOL + ) + assert tool_span.attributes.get(SpanAttributes.LLM_USAGE_TOOL_COST) == 0.01 + + @pytest.mark.asyncio + async def test_async_tool_cost(self, agent_class, instrumentation: InstrumentationTester): + """Test asynchronous tool with cost attribute.""" + result = await agent_class.async_process_item("test") + + assert result == "Async processed test" + + spans = instrumentation.get_finished_spans() + tool_span = next( + span for span in spans if span.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.TOOL + ) + assert tool_span.attributes.get(SpanAttributes.LLM_USAGE_TOOL_COST) == 0.02 + + def test_generator_tool_cost(self, agent_class, instrumentation: InstrumentationTester): + """Test generator tool with cost attribute.""" + items = ["item1", "item2", "item3"] + results = list(agent_class.generator_process_items(items)) + + assert len(results) == 3 + assert results[0] == "Processed item1" + assert results[1] == "Processed item2" + assert results[2] == "Processed item3" + + spans = instrumentation.get_finished_spans() + tool_spans = [span for span in spans if span.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.TOOL] + assert len(tool_spans) == 4 # Only one span for the generator + assert tool_spans[0].attributes.get(SpanAttributes.LLM_USAGE_TOOL_COST) == 0.01 + assert tool_spans[3].attributes.get(SpanAttributes.LLM_USAGE_TOOL_COST) == 0.03 + + @pytest.mark.asyncio + async def test_async_generator_tool_cost(self, agent_class, instrumentation: InstrumentationTester): + """Test async generator tool with cost attribute.""" + items = ["item1", "item2", "item3"] + results = [result async for result in agent_class.async_generator_process_items(items)] + + assert len(results) == 3 + assert results[0] == "Async processed item1" + assert results[1] == "Async processed item2" + assert results[2] == "Async processed item3" + + spans = instrumentation.get_finished_spans() + tool_span = [span for span in spans if span.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.TOOL] + assert len(tool_span) == 4 # Only one span for the generator + assert tool_span[0].attributes.get(SpanAttributes.LLM_USAGE_TOOL_COST) == 0.02 + assert tool_span[3].attributes.get(SpanAttributes.LLM_USAGE_TOOL_COST) == 0.04 + + def test_multiple_tool_calls(self, agent_class, instrumentation: InstrumentationTester): + """Test multiple calls to the same tool.""" + for i in range(3): + result = agent_class.process_item(f"item{i}") + assert result == f"Processed item{i}" + + spans = instrumentation.get_finished_spans() + tool_spans = [span for span in spans if span.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.TOOL] + assert len(tool_spans) == 3 + for span in tool_spans: + assert span.attributes.get(SpanAttributes.LLM_USAGE_TOOL_COST) == 0.01 + + @pytest.mark.asyncio + async def test_parallel_tool_calls(self, agent_class, instrumentation: InstrumentationTester): + """Test parallel execution of async tools.""" + results = await asyncio.gather( + agent_class.async_process_item("item1"), + agent_class.async_process_item("item2"), + agent_class.async_process_item("item3"), + ) + + assert len(results) == 3 + assert results[0] == "Async processed item1" + assert results[1] == "Async processed item2" + assert results[2] == "Async processed item3" + + spans = instrumentation.get_finished_spans() + tool_spans = [span for span in spans if span.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.TOOL] + assert len(tool_spans) == 3 + for span in tool_spans: + assert span.attributes.get(SpanAttributes.LLM_USAGE_TOOL_COST) == 0.02 + + def test_tool_without_cost(self, agent_class, instrumentation: InstrumentationTester): + """Test tool without cost parameter.""" + + @tool + def no_cost_tool(self): + return "No cost tool result" + + result = no_cost_tool(agent_class) + + assert result == "No cost tool result" + + spans = instrumentation.get_finished_spans() + tool_span = next( + span for span in spans if span.attributes.get(SpanAttributes.AGENTOPS_SPAN_KIND) == SpanKind.TOOL + ) + assert SpanAttributes.LLM_USAGE_TOOL_COST not in tool_span.attributes