Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .envrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
source_env .env
65 changes: 55 additions & 10 deletions agentops/instrumentation/agentic/agno/instrumentor.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,42 @@ def __getattr__(self, name):
return getattr(self.original_result, name)


class AsyncStreamingResultWrapper:
"""Wrapper for async streaming results that maintains agent span as active throughout iteration."""

def __init__(self, original_result, span, agent_id, agent_context, streaming_context_manager):
self.original_result = original_result
self.span = span
self.agent_id = agent_id
self.agent_context = agent_context
self.streaming_context_manager = streaming_context_manager
self._consumed = False

def __aiter__(self):
"""Return async iterator that keeps agent span active during iteration."""
return self

async def __anext__(self):
"""Async iteration that keeps agent span active."""
context_token = otel_context.attach(self.agent_context)
try:
item = await self.original_result.__anext__()
return item
except StopAsyncIteration:
# Clean up when iteration is complete
if not self._consumed:
self._consumed = True
self.span.end()
self.streaming_context_manager.remove_context(self.agent_id)
raise
finally:
otel_context.detach(context_token)

def __getattr__(self, name):
"""Delegate attribute access to the original result."""
return getattr(self.original_result, name)


def create_streaming_workflow_wrapper(tracer, streaming_context_manager):
"""Create a streaming-aware wrapper for workflow run methods."""

Expand Down Expand Up @@ -442,7 +478,11 @@ async def wrapper(wrapped, instance, args, kwargs):
span.set_status(Status(StatusCode.OK))

# Wrap the result to maintain context and end span when complete
if hasattr(result, "__iter__"):
if hasattr(result, "__aiter__"):
return AsyncStreamingResultWrapper(
result, span, agent_id, current_context, streaming_context_manager
)
elif hasattr(result, "__iter__"):
return StreamingResultWrapper(result, span, agent_id, current_context, streaming_context_manager)
else:
# Not actually streaming, clean up immediately
Expand Down Expand Up @@ -835,7 +875,9 @@ def wrapper(wrapped, instance, args, kwargs):
def create_team_async_wrapper(tracer, streaming_context_manager):
"""Create an async wrapper for Team methods that establishes the team context."""

async def wrapper(wrapped, instance, args, kwargs):
def wrapper(wrapped, instance, args, kwargs):
import inspect

# Get team ID for context storage
team_id = getattr(instance, "team_id", None) or getattr(instance, "id", None) or id(instance)
team_id = str(team_id)
Expand Down Expand Up @@ -863,17 +905,20 @@ async def wrapper(wrapped, instance, args, kwargs):
# Execute the original function within team context
context_token = otel_context.attach(current_context)
try:
result = await wrapped(*args, **kwargs)

# For non-streaming, close the span
if not is_streaming:
span.end()
streaming_context_manager.remove_context(team_id)

return result
result = wrapped(*args, **kwargs)
finally:
otel_context.detach(context_token)

# For streaming, wrap the result to maintain context
if is_streaming and inspect.isasyncgen(result):
return AsyncStreamingResultWrapper(result, span, team_id, current_context, streaming_context_manager)
elif hasattr(result, "__iter__"):
return StreamingResultWrapper(result, span, team_id, current_context, streaming_context_manager)
else:
span.end()
streaming_context_manager.remove_context(team_id)
return result

except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
Expand Down
15 changes: 11 additions & 4 deletions app/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,19 @@ SUPABASE_S3_ACCESS_KEY_ID=
SUPABASE_S3_SECRET_ACCESS_KEY=

# ClickHouse
CLICKHOUSE_HOST=YOUR_CLICKHOUSE_HOST
CLICKHOUSE_PORT=8443
# ClickHouse Configuration
# For LOCAL development, use:
# CLICKHOUSE_HOST=127.0.0.1
# CLICKHOUSE_PORT=8123
# CLICKHOUSE_SECURE=false
# CLICKHOUSE_PASSWORD=password
# For CLOUD deployment, use your ClickHouse Cloud credentials:
CLICKHOUSE_HOST=127.0.0.1 # Change to YOUR_CLICKHOUSE_HOST for cloud
CLICKHOUSE_PORT=8123 # Change to 8443 for cloud
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=
CLICKHOUSE_PASSWORD=password # Set your password
CLICKHOUSE_DATABASE=otel_2
CLICKHOUSE_SECURE=true
CLICKHOUSE_SECURE=false # Change to true for cloud
CLICKHOUSE_ENDPOINT=
CLICKHOUSE_USERNAME=

Expand Down
59 changes: 59 additions & 0 deletions app/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,61 @@
Restart local stack and verify

- Restart services:
docker compose -f compose.yaml -f opentelemetry-collector/compose.yaml down --remove-orphans
docker compose -f compose.yaml -f opentelemetry-collector/compose.yaml up -d

- Check logs:
docker compose -f compose.yaml -f opentelemetry-collector/compose.yaml logs --since=90s api
docker compose -f compose.yaml -f opentelemetry-collector/compose.yaml logs --since=90s dashboard
docker compose -f compose.yaml -f opentelemetry-collector/compose.yaml logs --since=90s otelcollector
docker compose -f compose.yaml -f opentelemetry-collector/compose.yaml logs --since=90s clickhouse

- Open dashboard:
http://localhost:3000/signin

- Generate a trace (example):
AGENTOPS_API_KEY=<key> \
AGENTOPS_API_ENDPOINT=http://localhost:8000 \
AGENTOPS_APP_URL=http://localhost:3000 \
AGENTOPS_EXPORTER_ENDPOINT=http://localhost:4318/v1/traces \
OPENAI_API_KEY=<openai_key> \
python examples/openai/openai_example_sync.py

- Verify ClickHouse and dashboard:
curl -s -u default:password "http://localhost:8123/?query=SELECT%20count()%20FROM%20otel_2.otel_traces%20WHERE%20TraceId%20=%20'<TRACE_ID>'"

http://localhost:3000/traces?trace_id=<TRACE_ID>


Local ClickHouse (self-hosted)
- Set in .env:
CLICKHOUSE_HOST=127.0.0.1
CLICKHOUSE_PORT=8123
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=password
CLICKHOUSE_DATABASE=otel_2
CLICKHOUSE_SECURE=false
CLICKHOUSE_ENDPOINT=http://clickhouse:8123
CLICKHOUSE_USERNAME=default

- Start services (includes otelcollector + local ClickHouse):
docker compose -f compose.yaml -f opentelemetry-collector/compose.yaml up -d

- Initialize ClickHouse schema:
curl -u default:password 'http://localhost:8123/?query=CREATE%20DATABASE%20IF%20NOT%20EXISTS%20otel_2'
curl --data-binary @app/clickhouse/migrations/0000_init.sql -u default:password 'http://localhost:8123/?query='

- Run example with local OTLP exporter:
AGENTOPS_API_KEY=<your_key> \
AGENTOPS_API_ENDPOINT=http://localhost:8000 \
AGENTOPS_APP_URL=http://localhost:3000 \
AGENTOPS_EXPORTER_ENDPOINT=http://localhost:4318/v1/traces \
OPENAI_API_KEY=<openai_key> \
python examples/openai/openai_example_sync.py

- Verify:
- Dashboard: http://localhost:3000/traces?trace_id=<printed_id>
- CH rows: curl -s -u default:password "http://localhost:8123/?query=SELECT%20count()%20FROM%20otel_2.otel_traces%20WHERE%20TraceId%20=%20'<TRACE_ID>'"
# AgentOps

[![License: ELv2](https://img.shields.io/badge/License-ELv2-blue.svg)](https://www.elastic.co/licensing/elastic-license)
Expand Down Expand Up @@ -312,6 +370,7 @@ AgentOps requires several external services. Here's how to set them up:
```
4. Run migrations: `supabase db push`

For Linux environments with CLI install issues, see docs/local_supabase_linux.md for a manual binary install and env mapping steps.
**Option B: Cloud Supabase**

1. Create a new project at [supabase.com](https://supabase.com)
Expand Down
Loading
Loading