Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions agentops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def init(
log_level: Optional[Union[str, int]] = None,
fail_safe: Optional[bool] = None,
exporter_endpoint: Optional[str] = None,
session_name: Optional[str] = None,
**kwargs,
):
"""
Expand Down Expand Up @@ -88,6 +89,7 @@ def init(
fail_safe (bool): Whether to suppress errors and continue execution when possible.
exporter_endpoint (str, optional): Endpoint for the exporter. If none is provided, key will
be read from the AGENTOPS_EXPORTER_ENDPOINT environment variable.
session_name (str, optional): Name of the session to be used in the span attributes.
**kwargs: Additional configuration parameters to be passed to the client.
"""
global _client
Expand Down Expand Up @@ -116,6 +118,7 @@ def init(
log_level=log_level,
fail_safe=fail_safe,
exporter_endpoint=exporter_endpoint,
session_name=session_name,
**kwargs,
)

Expand Down
12 changes: 10 additions & 2 deletions agentops/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,18 @@ def init(self, **kwargs):
if self.config.auto_start_session:
from agentops.legacy import start_session

# Pass default_tags if they exist
if self.config.default_tags:
if self.config.default_tags and self.config.session_name:
logger.debug(f"Starting session with tags: {self.config.default_tags}")
logger.debug(f"Starting session with name: {self.config.session_name}")
session = start_session(session_name=self.config.session_name, tags=list(self.config.default_tags))
elif self.config.default_tags:
logger.debug(f"Starting session with tags: {self.config.default_tags}")
session = start_session(tags=list(self.config.default_tags))
elif self.config.session_name:
logger.debug(f"Starting session with name: {self.config.session_name}")
session = start_session(session_name=self.config.session_name)
else:
logger.debug("Starting session without tags or name")
session = start_session()

# Register this session globally
Expand Down
10 changes: 10 additions & 0 deletions agentops/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ class Config:
default_factory=lambda: None, metadata={"description": "Custom span processor for OpenTelemetry trace data"}
)

session_name: Optional[str] = field(
default_factory=lambda: None,
metadata={"description": "Name of the session to be used in the span attributes"},
)

def configure(
self,
api_key: Optional[str] = None,
Expand All @@ -144,6 +149,7 @@ def configure(
exporter: Optional[SpanExporter] = None,
processor: Optional[SpanProcessor] = None,
exporter_endpoint: Optional[str] = None,
session_name: Optional[str] = None,
):
"""Configure settings from kwargs, validating where necessary"""
if api_key is not None:
Expand Down Expand Up @@ -214,6 +220,9 @@ def configure(
# else:
# self.exporter_endpoint = self.endpoint

if session_name is not None:
self.session_name = session_name

def dict(self):
"""Return a dictionary representation of the config"""
return {
Expand All @@ -235,6 +244,7 @@ def dict(self):
"exporter": self.exporter,
"processor": self.processor,
"exporter_endpoint": self.exporter_endpoint,
"session_name": self.session_name,
}

def json(self):
Expand Down
61 changes: 50 additions & 11 deletions agentops/instrumentation/crewai/instrumentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from agentops.instrumentation.crewai.version import __version__
from agentops.semconv import SpanAttributes, AgentOpsSpanKindValues, Meters, ToolAttributes, MessageAttributes
from .crewai_span_attributes import CrewAISpanAttributes, set_span_attribute
from agentops import get_client


# Initialize logger
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -156,15 +158,26 @@ def wrap_kickoff(
args,
kwargs,
):
span_name = "crewai.workflow"
logger.debug(
f"CrewAI: Starting workflow instrumentation for Crew with {len(getattr(instance, 'agents', []))} agents"
)
config = get_client().config
attributes = {
SpanAttributes.LLM_SYSTEM: "crewai",
}

if config.session_name:
span_name = f"{config.session_name}.workflow"

if config.default_tags and len(config.default_tags) > 0:
tag_list = list(config.default_tags)
attributes[SpanAttributes.AGENTOPS_SPAN_TAGS] = tag_list

with tracer.start_as_current_span(
"crewai.workflow",
name=span_name,
kind=SpanKind.INTERNAL,
attributes={
SpanAttributes.LLM_SYSTEM: "crewai",
},
attributes=attributes,
) as span:
try:
span.set_attribute(TELEMETRY_SDK_NAME, "agentops")
Expand Down Expand Up @@ -327,12 +340,19 @@ def wrap_agent_execute_task(
tracer, duration_histogram, token_histogram, environment, application_name, wrapped, instance, args, kwargs
):
agent_name = instance.role if hasattr(instance, "role") else "agent"
attributes = {
SpanAttributes.AGENTOPS_SPAN_KIND: AgentOpsSpanKindValues.AGENT.value,
}

config = get_client().config
if config.default_tags and len(config.default_tags) > 0:
tag_list = list(config.default_tags)
attributes[SpanAttributes.AGENTOPS_SPAN_TAGS] = tag_list

with tracer.start_as_current_span(
f"{agent_name}.agent",
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.AGENTOPS_SPAN_KIND: AgentOpsSpanKindValues.AGENT.value,
},
attributes=attributes,
) as span:
try:
span.set_attribute(TELEMETRY_SDK_NAME, "agentops")
Expand Down Expand Up @@ -381,12 +401,22 @@ def wrap_task_execute(
):
task_name = instance.description if hasattr(instance, "description") else "task"

config = get_client().config
attributes = {
SpanAttributes.AGENTOPS_SPAN_KIND: AgentOpsSpanKindValues.TASK.value,
}

if config.default_tags and len(config.default_tags) > 0:
tag_list = list(config.default_tags)
attributes[SpanAttributes.AGENTOPS_SPAN_TAGS] = tag_list

if config.session_name:
task_name = config.session_name

with tracer.start_as_current_span(
f"{task_name}.task",
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.AGENTOPS_SPAN_KIND: AgentOpsSpanKindValues.TASK.value,
},
attributes=attributes,
) as span:
try:
span.set_attribute(TELEMETRY_SDK_NAME, "agentops")
Expand All @@ -411,7 +441,16 @@ def wrap_llm_call(
tracer, duration_histogram, token_histogram, environment, application_name, wrapped, instance, args, kwargs
):
llm = instance.model if hasattr(instance, "model") else "llm"
with tracer.start_as_current_span(f"{llm}.llm", kind=SpanKind.CLIENT, attributes={}) as span:
attributes = {
SpanAttributes.AGENTOPS_SPAN_KIND: AgentOpsSpanKindValues.LLM.value,
}

config = get_client().config
if config.default_tags and len(config.default_tags) > 0:
tag_list = list(config.default_tags)
attributes[SpanAttributes.AGENTOPS_SPAN_TAGS] = tag_list

with tracer.start_as_current_span(f"{llm}.llm", kind=SpanKind.CLIENT, attributes=attributes) as span:
start_time = time.time()
try:
span.set_attribute(TELEMETRY_SDK_NAME, "agentops")
Expand Down
11 changes: 8 additions & 3 deletions agentops/legacy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ def end_session(self, **kwargs):
_flush_span_processors()


def _create_session_span(tags: Union[Dict[str, Any], List[str], None] = None) -> tuple:
def _create_session_span(
session_name: Optional[str] = None, tags: Union[Dict[str, Any], List[str], None] = None
) -> tuple:
"""
Helper function to create a session span with tags.

Expand All @@ -94,10 +96,13 @@ def _create_session_span(tags: Union[Dict[str, Any], List[str], None] = None) ->
attributes = {}
if tags:
attributes["tags"] = tags
return _make_span("session", span_kind=SpanKind.SESSION, attributes=attributes)

span_name = "session" if session_name is None else session_name
return _make_span(span_name, span_kind=SpanKind.SESSION, attributes=attributes)


def start_session(
session_name: Optional[str] = None,
tags: Union[Dict[str, Any], List[str], None] = None,
) -> Session:
"""
Expand Down Expand Up @@ -153,7 +158,7 @@ def start_session(
_current_session = dummy_session
return dummy_session

span, ctx, token = _create_session_span(tags)
span, ctx, token = _create_session_span(session_name, tags)
session = Session(span, token)

# Set the global session reference
Expand Down
3 changes: 2 additions & 1 deletion agentops/sdk/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
operation = create_entity_decorator(SpanKind.OPERATION)
workflow = create_entity_decorator(SpanKind.WORKFLOW)
session = create_entity_decorator(SpanKind.SESSION)
tool = create_entity_decorator(SpanKind.TOOL)
operation = task

__all__ = ["agent", "task", "workflow", "session", "operation"]
__all__ = ["agent", "task", "workflow", "session", "operation", "tool"]

# Create decorators task, workflow, session, agent
31 changes: 20 additions & 11 deletions agentops/sdk/decorators/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
import functools
import asyncio


import wrapt # type: ignore

from agentops.logging import logger
from agentops.sdk.core import TracingCore
from agentops.semconv.span_attributes import SpanAttributes

from .utility import (
_create_as_current_span,
Expand All @@ -28,17 +30,18 @@ def create_entity_decorator(entity_kind: str):
A decorator with optional arguments for name and version
"""

def decorator(wrapped=None, *, name=None, version=None):
def decorator(wrapped=None, *, name=None, version=None, cost=None):
# Handle case where decorator is called with parameters
if wrapped is None:
return functools.partial(decorator, name=name, version=version)
return functools.partial(decorator, name=name, version=version, cost=cost)

# Handle class decoration
if inspect.isclass(wrapped):
# Create a proxy class that wraps the original class
class WrappedClass(wrapped):
def __init__(self, *args, **kwargs):
operation_name = name or wrapped.__name__

self._agentops_span_context_manager = _create_as_current_span(operation_name, entity_kind, version)
self._agentops_active_span = self._agentops_span_context_manager.__enter__()

Expand All @@ -51,23 +54,18 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

async def __aenter__(self):
# Added for async context manager support
# This allows using the class with 'async with' statement

# If span is already created in __init__, just return self
if hasattr(self, "_agentops_active_span") and self._agentops_active_span is not None:
return self

# Otherwise create span (for backward compatibility)
operation_name = name or wrapped.__name__

self._agentops_span_context_manager = _create_as_current_span(operation_name, entity_kind, version)
self._agentops_active_span = self._agentops_span_context_manager.__enter__()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
# Added for proper async cleanup
# This ensures spans are properly closed when using 'async with'

if hasattr(self, "_agentops_active_span") and hasattr(self, "_agentops_span_context_manager"):
try:
_record_entity_output(self._agentops_active_span, self)
Expand Down Expand Up @@ -104,10 +102,12 @@ def wrapper(wrapped, instance, args, kwargs):

# Handle generator functions
if is_generator:
# Use the old approach for generators
span, ctx, token = _make_span(operation_name, entity_kind, version)
try:
_record_entity_input(span, args, kwargs)
# Set cost attribute if tool
if entity_kind == "tool" and cost is not None:
span.set_attribute(SpanAttributes.LLM_USAGE_TOOL_COST, cost)
except Exception as e:
logger.warning(f"Failed to record entity input: {e}")

Expand All @@ -116,10 +116,12 @@ def wrapper(wrapped, instance, args, kwargs):

# Handle async generator functions
elif is_async_generator:
# Use the old approach for async generators
span, ctx, token = _make_span(operation_name, entity_kind, version)
try:
_record_entity_input(span, args, kwargs)
# Set cost attribute if tool
if entity_kind == "tool" and cost is not None:
span.set_attribute(SpanAttributes.LLM_USAGE_TOOL_COST, cost)
except Exception as e:
logger.warning(f"Failed to record entity input: {e}")

Expand All @@ -133,6 +135,9 @@ async def _wrapped_async():
with _create_as_current_span(operation_name, entity_kind, version) as span:
try:
_record_entity_input(span, args, kwargs)
# Set cost attribute if tool
if entity_kind == "tool" and cost is not None:
span.set_attribute(SpanAttributes.LLM_USAGE_TOOL_COST, cost)
except Exception as e:
logger.warning(f"Failed to record entity input: {e}")

Expand All @@ -144,6 +149,7 @@ async def _wrapped_async():
logger.warning(f"Failed to record entity output: {e}")
return result
except Exception as e:
logger.error(f"Error in async function execution: {e}")
span.record_exception(e)
raise

Expand All @@ -154,7 +160,9 @@ async def _wrapped_async():
with _create_as_current_span(operation_name, entity_kind, version) as span:
try:
_record_entity_input(span, args, kwargs)

# Set cost attribute if tool
if entity_kind == "tool" and cost is not None:
span.set_attribute(SpanAttributes.LLM_USAGE_TOOL_COST, cost)
except Exception as e:
logger.warning(f"Failed to record entity input: {e}")

Expand All @@ -167,6 +175,7 @@ async def _wrapped_async():
logger.warning(f"Failed to record entity output: {e}")
return result
except Exception as e:
logger.error(f"Error in sync function execution: {e}")
span.record_exception(e)
raise

Expand Down
2 changes: 2 additions & 0 deletions agentops/semconv/span_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class SpanAttributes:
LLM_USAGE_CACHE_READ_INPUT_TOKENS = "gen_ai.usage.cache_read_input_tokens"
LLM_USAGE_REASONING_TOKENS = "gen_ai.usage.reasoning_tokens"
LLM_USAGE_STREAMING_TOKENS = "gen_ai.usage.streaming_tokens"
LLM_USAGE_TOOL_COST = "gen_ai.usage.total_cost"

# Message attributes
# see ./message.py for message-related attributes
Expand All @@ -86,6 +87,7 @@ class SpanAttributes:
AGENTOPS_ENTITY_INPUT = "agentops.entity.input"
AGENTOPS_SPAN_KIND = "agentops.span.kind"
AGENTOPS_ENTITY_NAME = "agentops.entity.name"
AGENTOPS_SPAN_TAGS = "tags"

# Operation attributes
OPERATION_NAME = "operation.name"
Expand Down
1 change: 1 addition & 0 deletions docs/v1/concepts/sessions.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Optionally, sessions may include:
- **Tags**: Tags allow for the categorization and later retrieval of sessions.
- **Host Environment**: Automatically gathers basic information about the system on which the session ran.
- **Video**: If applicable, an optional video recording of the session.
- **Session Name**: A custom name for the session that helps identify and organize different types of sessions in the dashboard. Can be set during initialization or when starting a session.

### Methods
#### `end_session`
Expand Down
1 change: 1 addition & 0 deletions docs/v1/usage/sdk-reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The first element of AgentOps is always calling .init()
- `auto_start_session` (bool): Whether to start a session automatically when the client is created. You may wish to delay starting a session in order to do additional setup or starting a session on a child process.
- `inherited_session_id` (str, optional): When creating the client, passing in this value will connect the client to an existing session. This is useful when having separate processes contribute to the same session.
- `skip_auto_end_session` (bool, optional): If you are using a framework such as Crew, the framework can decide when to halt execution. Setting this parameter to true will not end your agentops session when this happens.
- `session_name` (str, optional): Name of the session to be used in the span attributes. This helps identify and organize different types of sessions in the dashboard.

**Returns**:

Expand Down
Loading
Loading