Skip to content
592 changes: 582 additions & 10 deletions agentops/instrumentation/README.md

Large diffs are not rendered by default.

227 changes: 104 additions & 123 deletions agentops/instrumentation/anthropic/instrumentor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@
- Maintains span context across multiple events
"""

from typing import List, Collection
from opentelemetry.trace import get_tracer
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.metrics import get_meter
from typing import List, Collection, Dict, Any, Optional
from opentelemetry.trace import Tracer
from wrapt import wrap_function_wrapper

from agentops.logging import logger
from agentops.instrumentation.common.wrappers import WrapConfig, wrap, unwrap
from agentops.instrumentation.common import EnhancedBaseInstrumentor, WrapConfig
from agentops.instrumentation.anthropic import LIBRARY_NAME, LIBRARY_VERSION
from agentops.instrumentation.anthropic.attributes.message import get_message_attributes, get_completion_attributes
from agentops.instrumentation.anthropic.stream_wrapper import (
Expand All @@ -44,46 +42,8 @@
)
from agentops.semconv import Meters

# Methods to wrap for instrumentation
WRAPPED_METHODS: List[WrapConfig] = [
# Main messages.create (modern API)
WrapConfig(
trace_name="anthropic.messages.create",
package="anthropic.resources.messages",
class_name="Messages",
method_name="create",
handler=get_message_attributes,
),
# Async variant
WrapConfig(
trace_name="anthropic.messages.create",
package="anthropic.resources.messages",
class_name="AsyncMessages",
method_name="create",
handler=get_message_attributes,
is_async=True,
),
# Legacy completions API
WrapConfig(
trace_name="anthropic.completions.create",
package="anthropic.resources.completions",
class_name="Completions",
method_name="create",
handler=get_completion_attributes,
),
# Async variant of legacy API
WrapConfig(
trace_name="anthropic.completions.create",
package="anthropic.resources.completions",
class_name="AsyncCompletions",
method_name="create",
handler=get_completion_attributes,
is_async=True,
),
]


class AnthropicInstrumentor(BaseInstrumentor):

class AnthropicInstrumentor(EnhancedBaseInstrumentor):
"""An instrumentor for Anthropic's Claude API.

This class provides instrumentation for Anthropic's Claude API by wrapping key methods
Expand All @@ -98,6 +58,69 @@ class AnthropicInstrumentor(BaseInstrumentor):
It captures metrics including token usage, operation duration, and exceptions.
"""

@property
def library_name(self) -> str:
"""Return the Anthropic library name."""
return LIBRARY_NAME

@property
def library_version(self) -> str:
"""Return the Anthropic library version."""
return LIBRARY_VERSION

@property
def wrapped_methods(self) -> List[WrapConfig]:
"""Return all methods that should be wrapped for Anthropic instrumentation."""
return [
# Main messages.create (modern API)
WrapConfig(
trace_name="anthropic.messages.create",
package="anthropic.resources.messages",
class_name="Messages",
method_name="create",
handler=get_message_attributes,
),
# Async variant
WrapConfig(
trace_name="anthropic.messages.create",
package="anthropic.resources.messages",
class_name="AsyncMessages",
method_name="create",
handler=get_message_attributes,
is_async=True,
),
# Legacy completions API
WrapConfig(
trace_name="anthropic.completions.create",
package="anthropic.resources.completions",
class_name="Completions",
method_name="create",
handler=get_completion_attributes,
),
# Async variant of legacy API
WrapConfig(
trace_name="anthropic.completions.create",
package="anthropic.resources.completions",
class_name="AsyncCompletions",
method_name="create",
handler=get_completion_attributes,
is_async=True,
),
]

@property
def supports_streaming(self) -> bool:
"""Anthropic supports streaming responses."""
return True

def get_streaming_wrapper(self, tracer: Tracer) -> Optional[Any]:
"""Return the sync streaming wrapper for Anthropic."""
return messages_stream_wrapper(tracer)

def get_async_streaming_wrapper(self, tracer: Tracer) -> Optional[Any]:
"""Return the async streaming wrapper for Anthropic."""
return messages_stream_async_wrapper(tracer)

def instrumentation_dependencies(self) -> Collection[str]:
"""Return packages required for instrumentation.

Expand All @@ -106,89 +129,47 @@ def instrumentation_dependencies(self) -> Collection[str]:
"""
return ["anthropic >= 0.7.0"]

def _instrument(self, **kwargs):
"""Instrument the Anthropic API.

This method wraps the key methods in the Anthropic client to capture
telemetry data for API calls. It sets up tracers, meters, and wraps the appropriate
methods for instrumentation.

Args:
**kwargs: Configuration options for instrumentation.
"""
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(LIBRARY_NAME, LIBRARY_VERSION, tracer_provider)

meter_provider = kwargs.get("meter_provider")
meter = get_meter(LIBRARY_NAME, LIBRARY_VERSION, meter_provider)

meter.create_histogram(
name=Meters.LLM_TOKEN_USAGE,
unit="token",
description="Measures number of input and output tokens used with Anthropic models",
)

meter.create_histogram(
name=Meters.LLM_OPERATION_DURATION,
unit="s",
description="Anthropic API operation duration",
)

meter.create_counter(
name=Meters.LLM_COMPLETIONS_EXCEPTIONS,
unit="time",
description="Number of exceptions occurred during Anthropic completions",
)

# Standard method wrapping approach
# Uses the common wrappers module to wrap methods with tracers
for wrap_config in WRAPPED_METHODS:
try:
wrap(wrap_config, tracer)
except (AttributeError, ModuleNotFoundError):
logger.debug(f"Could not wrap {wrap_config.package}.{wrap_config.class_name}.{wrap_config.method_name}")

# Special handling for streaming responses
# Uses direct wrapt.wrap_function_wrapper for stream methods
# This approach captures events as they arrive rather than waiting for completion
def _create_provider_metrics(self, meter) -> Dict[str, Any]:
"""Create Anthropic-specific metrics beyond the common ones."""
return {
"completion_exception_counter": meter.create_counter(
name=Meters.LLM_ANTHROPIC_COMPLETION_EXCEPTIONS,
unit="time",
description="Number of exceptions occurred during Anthropic completions",
),
}

def _apply_streaming_wrappers(self, tracer: Tracer):
"""Apply Anthropic-specific streaming wrappers."""
try:
wrap_function_wrapper(
"anthropic.resources.messages.messages",
"Messages.stream",
messages_stream_wrapper(tracer),
)

wrap_function_wrapper(
"anthropic.resources.messages.messages",
"AsyncMessages.stream",
messages_stream_async_wrapper(tracer),
)
except (AttributeError, ModuleNotFoundError):
logger.debug("Failed to wrap Anthropic streaming methods")

def _uninstrument(self, **kwargs):
"""Remove instrumentation from Anthropic API.

This method unwraps all methods that were wrapped during instrumentation,
restoring the original behavior of the Anthropic API.

Args:
**kwargs: Configuration options for uninstrumentation.
"""
# Unwrap standard methods
for wrap_config in WRAPPED_METHODS:
try:
unwrap(wrap_config)
except Exception:
logger.debug(
f"Failed to unwrap {wrap_config.package}.{wrap_config.class_name}.{wrap_config.method_name}"
# Get the wrappers
sync_wrapper = self.get_streaming_wrapper(tracer)
async_wrapper = self.get_async_streaming_wrapper(tracer)

# Apply sync streaming wrapper
if sync_wrapper:
wrap_function_wrapper(
"anthropic.resources.messages.messages",
"Messages.stream",
sync_wrapper,
)

# Apply async streaming wrapper
if async_wrapper:
wrap_function_wrapper(
"anthropic.resources.messages.messages",
"AsyncMessages.stream",
async_wrapper,
)
except (AttributeError, ModuleNotFoundError) as e:
logger.debug(f"Failed to wrap Anthropic streaming methods: {e}")

# Unwrap streaming methods
def _remove_streaming_wrappers(self):
"""Remove Anthropic-specific streaming wrappers."""
try:
from opentelemetry.instrumentation.utils import unwrap as otel_unwrap

otel_unwrap("anthropic.resources.messages.messages", "Messages.stream")
otel_unwrap("anthropic.resources.messages.messages", "AsyncMessages.stream")
except (AttributeError, ModuleNotFoundError):
logger.debug("Failed to unwrap Anthropic streaming methods")
except (AttributeError, ModuleNotFoundError) as e:
logger.debug(f"Failed to unwrap Anthropic streaming methods: {e}")
63 changes: 61 additions & 2 deletions agentops/instrumentation/common/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,63 @@
"""Common utilities for OpenTelemetry instrumentation.

This module provides shared utilities for all AgentOps instrumentors:
- Base instrumentor with common patterns
- Context management and propagation
- Span lifecycle management
- Enhanced attribute handling
- Wrapper utilities
"""

# Existing exports
from .attributes import AttributeMap, _extract_attributes_from_mapping
from .wrappers import _with_tracer_wrapper
from .wrappers import _with_tracer_wrapper, WrapConfig, wrap, unwrap
from .objects import get_uploaded_object_attributes

# New exports
from .base_instrumentor import EnhancedBaseInstrumentor
from .context import ContextManager, SpanManager, global_context_manager
from .span_lifecycle import (
SpanLifecycleManager,
TimingManager,
RetryHandler,
span_error_handler,
async_span_error_handler,
)
from .attribute_handlers import (
AttributeExtractor,
LLMAttributeHandler,
MessageAttributeHandler,
StreamingAttributeHandler,
create_composite_handler,
with_attribute_filter,
)

__all__ = ["AttributeMap", "_extract_attributes_from_mapping", "_with_tracer_wrapper"]
__all__ = [
# Existing
"AttributeMap",
"_extract_attributes_from_mapping",
"_with_tracer_wrapper",
"WrapConfig",
"wrap",
"unwrap",
"get_uploaded_object_attributes",
# New base instrumentor
"EnhancedBaseInstrumentor",
# Context management
"ContextManager",
"SpanManager",
"global_context_manager",
# Span lifecycle
"SpanLifecycleManager",
"TimingManager",
"RetryHandler",
"span_error_handler",
"async_span_error_handler",
# Attribute handlers
"AttributeExtractor",
"LLMAttributeHandler",
"MessageAttributeHandler",
"StreamingAttributeHandler",
"create_composite_handler",
"with_attribute_filter",
]
Loading
Loading