Skip to content

[BUG] Anthropic streaming wrapper returns wrong object, breaking get_final_message() #2467

@youest

Description

@youest

[BUG] Anthropic streaming wrapper returns wrong object, breaking get_final_message()

Description

The _MessageStreamManager wrapper in openinference-instrumentation-anthropic wraps the wrong stream object when entering the context manager. It wraps anthropic.Stream (low-level iterator) instead of anthropic.lib.streaming._messages.MessageStream (high-level stream with helper methods), causing AttributeError: 'Stream' object has no attribute 'get_final_message'.

Environment

  • Package: openinference-instrumentation-anthropic==0.1.20 (latest)
  • Anthropic SDK: anthropic==0.71.0
  • Python: 3.12
  • Context: Using with CrewAI framework

Steps to Reproduce

import os
from anthropic import Anthropic
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider

# Setup tracing
provider = TracerProvider()
trace.set_tracer_provider(provider)

# Instrument Anthropic
from openinference.instrumentation.anthropic import AnthropicInstrumentor
AnthropicInstrumentor().instrument()

# Create client and use streaming
client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))

with client.messages.stream(
    model="claude-3-5-haiku-20241022",
    max_tokens=50,
    messages=[{"role": "user", "content": "Say hello"}],
) as stream:
    # Consume stream
    for event in stream:
        if hasattr(event, "delta") and hasattr(event.delta, "text"):
            print(event.delta.text, end="", flush=True)

    # Try to get final message - THIS FAILS
    final_message = stream.get_final_message()  # ❌ AttributeError

Expected Behavior

The wrapped stream should have get_final_message() method available, just like the unwrapped MessageStream:

# WITHOUT instrumentation - works ✅
with client.messages.stream(...) as stream:
    # stream is MessageStream
    final_message = stream.get_final_message()  # Works!

Actual Behavior

AttributeError: 'Stream' object has no attribute 'get_final_message'

Root Cause Analysis

The wrapper wraps the wrong object:

with stream_manager as stream:
    print(type(stream))
    # <class 'openinference.instrumentation.anthropic._stream._MessagesStream'>

    print(type(stream.__wrapped__))
    # <class 'anthropic.Stream'>  ❌ WRONG!
    # Should be: <class 'anthropic.lib.streaming._messages.MessageStream'>

    print(hasattr(stream.__wrapped__, 'get_final_message'))
    # False ❌

What's wrapped: anthropic.Stream (low-level iterator)

  • Has only: close, response
  • Missing: get_final_message(), until_done(), text_stream, etc.

What should be wrapped: anthropic.lib.streaming._messages.MessageStream

  • Has all helper methods including get_final_message()

Impact

This breaks any code that uses get_final_message() with Anthropic streaming, including:

  • CrewAI framework's Anthropic integration
  • Any application using the standard Anthropic streaming pattern from their docs
  • Token usage tracking that depends on final message metadata

Proposed Fix

The __enter__ method of _MessageStreamManager should return the actual MessageStream object, not the underlying Stream iterator.

Additional Context

  • This appears to be a regression or oversight in the streaming support
  • Issue [anthropic] add support for streaming #986 mentioned adding streaming support, but get_final_message() delegation seems incomplete
  • ObjectProxy from wrapt should delegate methods, but it's wrapping the wrong object

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    Status

    No status

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions