Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1fd4063
logcapture.py, instrumentation.py, session helpers
teocns Jan 27, 2025
2682a1b
Handle colored log outputs
teocns Jan 27, 2025
8ed3258
raise exc if session not found
teocns Jan 27, 2025
0395eb5
tests: unit/test_log_capture.py
teocns Jan 27, 2025
5afd0d4
log_handler and SessionLogExporter
teocns Jan 27, 2025
0e2ff94
test export
teocns Jan 27, 2025
710c023
SessionLogExporter: point to /v3/logs
teocns Feb 3, 2025
b5abf03
http_client: expose GET/POST/PUT/DELETE methods
teocns Feb 3, 2025
9993f80
tests: req_mock | mock `/v3/logs/<session_id>` with regex pattern
teocns Feb 4, 2025
ecc5037
tests(session): +TestSessionLogExporter
teocns Feb 4, 2025
2a53ce4
tests(fixtures): +agentops_init, +agentops_session
teocns Feb 4, 2025
8385e6e
delete deprecated tests/integration/test_session_export.py
teocns Feb 4, 2025
d69030c
ruff
teocns Feb 4, 2025
b55c3f5
Consolidate telemetry management under `instrumentation.py` separing …
teocns Feb 5, 2025
d864886
draft
teocns Feb 5, 2025
986b24a
TestSessionLogExporter to match new instrumenation
teocns Feb 5, 2025
5bdaf24
TestSessionLogging: ensures we're tracking the specific handler creat…
teocns Feb 5, 2025
d487ff8
test-log-cap
teocns Feb 5, 2025
4cf1080
fix(instrumentation): correct session_id type in handler setup
teocns Feb 5, 2025
def0df7
test_instrumentation.py: use real agentops_session instead of mock se…
teocns Feb 5, 2025
0161ee5
session - log exporter: use safe_serialize instead of json.dumps on logs
teocns Feb 5, 2025
aacf0fb
ruff
teocns Feb 5, 2025
65a9b92
fix: SessionLogExporter - appropriate typing of LogData and serializa…
teocns Feb 5, 2025
4f28141
fix(test_decorators): use real session uuid rather than literal "test…
teocns Feb 5, 2025
e1ba755
feat(helpers.safe_serialize): + Enum serialization
teocns Feb 6, 2025
0fc50b7
fix: test_session x session log reqs
teocns Feb 6, 2025
44260bf
fix(test_sessions::test_add_tags): TypeError: list indices must be in…
teocns Feb 6, 2025
84f24f8
ruff
teocns Feb 6, 2025
8dab0a2
oh this should've gone in #670 (test_record_action, assert on len req…
teocns Feb 6, 2025
d73da32
tests: fix correct ussage of mock url (base_url) rather than hardcode…
teocns Feb 6, 2025
d529660
save
teocns Feb 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions agentops/helpers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from enum import Enum
import inspect
import json
from datetime import datetime, timezone
Expand Down Expand Up @@ -67,6 +68,10 @@ def default(o):
try:
if isinstance(o, UUID):
return str(o)
# Handle Enum types
elif isinstance(o, Enum):
return o.value
# Handle objects with attributes property that's dict-like
elif hasattr(o, "model_dump_json"):
return str(o.model_dump_json())
elif hasattr(o, "to_json"):
Expand Down
86 changes: 47 additions & 39 deletions agentops/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,27 @@ def _prepare_headers(
return headers

@classmethod
def post(
def _make_request(
cls,
method: str,
url: str,
payload: bytes,
api_key: Optional[str] = None,
parent_key: Optional[str] = None,
jwt: Optional[str] = None,
header: Optional[Dict[str, str]] = None,
payload: Optional[bytes] = None,
) -> Response:
"""Make HTTP POST request using connection pooling"""
"""Make HTTP request using connection pooling"""
result = Response()
try:
headers = cls._prepare_headers(api_key, parent_key, jwt, header)
session = cls.get_session()
res = session.post(url, data=payload, headers=headers, timeout=20)

kwargs = {"headers": headers, "timeout": 20}
if payload is not None:
kwargs["data"] = payload

res = getattr(session, method.lower())(url, **kwargs)
result.parse(res)

except requests.exceptions.Timeout:
Expand Down Expand Up @@ -168,41 +174,43 @@ def get(
jwt: Optional[str] = None,
header: Optional[Dict[str, str]] = None,
) -> Response:
"""Make HTTP GET request using connection pooling"""
result = Response()
try:
headers = cls._prepare_headers(api_key, None, jwt, header)
session = cls.get_session()
res = session.get(url, headers=headers, timeout=20)
result.parse(res)
"""Make HTTP GET request"""
return cls._make_request("GET", url, api_key=api_key, jwt=jwt, header=header)

except requests.exceptions.Timeout:
result.code = 408
result.status = HttpStatus.TIMEOUT
raise ApiServerException("Could not reach API server - connection timed out")
except requests.exceptions.HTTPError as e:
try:
result.parse(e.response)
except Exception:
result = Response()
result.code = e.response.status_code
result.status = Response.get_status(e.response.status_code)
result.body = {"error": str(e)}
raise ApiServerException(f"HTTPError: {e}")
except requests.exceptions.RequestException as e:
result.body = {"error": str(e)}
raise ApiServerException(f"RequestException: {e}")
@classmethod
def post(
cls,
url: str,
payload: bytes,
api_key: Optional[str] = None,
parent_key: Optional[str] = None,
jwt: Optional[str] = None,
header: Optional[Dict[str, str]] = None,
) -> Response:
"""Make HTTP POST request"""
return cls._make_request(
"POST", url, api_key=api_key, parent_key=parent_key, jwt=jwt, header=header, payload=payload
)

if result.code == 401:
raise ApiServerException(
f"API server: invalid API key: {api_key}. Find your API key at https://app.agentops.ai/settings/projects"
)
if result.code == 400:
if "message" in result.body:
raise ApiServerException(f"API server: {result.body['message']}")
else:
raise ApiServerException(f"API server: {result.body}")
if result.code == 500:
raise ApiServerException("API server: - internal server error")
@classmethod
def put(
cls,
url: str,
payload: bytes,
api_key: Optional[str] = None,
jwt: Optional[str] = None,
header: Optional[Dict[str, str]] = None,
) -> Response:
"""Make HTTP PUT request"""
return cls._make_request("PUT", url, api_key=api_key, jwt=jwt, header=header, payload=payload)

return result
@classmethod
def delete(
cls,
url: str,
api_key: Optional[str] = None,
jwt: Optional[str] = None,
header: Optional[Dict[str, str]] = None,
) -> Response:
"""Make HTTP DELETE request"""
return cls._make_request("DELETE", url, api_key=api_key, jwt=jwt, header=header)
138 changes: 138 additions & 0 deletions agentops/instrumentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from __future__ import annotations

import logging
import sys
from typing import TYPE_CHECKING, Dict, List, Optional
from uuid import UUID

from opentelemetry import trace
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import SpanProcessor, TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter
from opentelemetry.sdk.trace.sampling import ParentBased, Sampler, TraceIdRatioBased

if TYPE_CHECKING:
from agentops.client import Client

"""
This module handles OpenTelemetry instrumentation setup for AgentOps sessions.

Each AgentOps session requires its own telemetry setup to:
1. Track session-specific logs
2. Export logs to the AgentOps backend
3. Maintain isolation between different sessions running concurrently

The module provides functions to:
- Set up logging telemetry components for a new session
- Clean up telemetry components when a session ends
"""

# Map of session_id to LoggingHandler
_session_handlers: Dict[UUID, LoggingHandler] = {}


def get_session_handler(session_id: UUID) -> Optional[LoggingHandler]:
"""Get the logging handler for a specific session.

Args:
session_id: The UUID of the session

Returns:
The session's LoggingHandler if it exists, None otherwise
"""
return _session_handlers.get(session_id)


def set_session_handler(session_id: UUID, handler: Optional[LoggingHandler]) -> None:
"""Set or remove the logging handler for a session.

Args:
session_id: The UUID of the session
handler: The handler to set, or None to remove
"""
if handler is None:
_session_handlers.pop(session_id, None)
else:
_session_handlers[session_id] = handler


def setup_session_telemetry(session, log_exporter) -> tuple[LoggingHandler, BatchLogRecordProcessor]:
"""Set up OpenTelemetry logging components for a new session.

This function creates the necessary components to capture and export logs for a specific session:
- A LoggerProvider with session-specific resource attributes
- A BatchLogRecordProcessor to batch and export logs
- A LoggingHandler to capture logs and forward them to the processor

Args:
session_id: Unique identifier for the session, used to tag telemetry data
log_exporter: SessionLogExporter instance that handles sending logs to AgentOps backend

Returns:
Tuple containing:
- LoggingHandler: Handler that should be added to the logger
- BatchLogRecordProcessor: Processor that batches and exports logs
"""
# Create logging components
resource = Resource.create({SERVICE_NAME: f"agentops.session.{session.session_id}"})
logger_provider = LoggerProvider(resource=resource)

# Create processor and handler
log_processor = BatchLogRecordProcessor(log_exporter)
logger_provider.add_log_record_processor(log_processor) # Add processor to provider
Comment on lines +79 to +84

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log_processor and logger_provider are created but never used since the code is commented out, leading to resource leaks and non-functional logging setup.

📝 Committable Code Suggestion

‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
resource = Resource.create({SERVICE_NAME: f"agentops.session.{session.session_id}"})
logger_provider = LoggerProvider(resource=resource)
# Create processor and handler
log_processor = BatchLogRecordProcessor(log_exporter)
logger_provider.add_log_record_processor(log_processor) # Add processor to provider
resource = Resource.create({SERVICE_NAME: f"agentops.session.{session.session_id}"})
logger_provider = LoggerProvider(resource=resource)
# Create processor and handler
log_processor = BatchLogRecordProcessor(log_exporter)
logger_provider.add_log_record_processor(log_processor)


from agentops.log_capture import LogCapture

logcap = LogCapture(
session,
)

logcap.start()

# log_handler = LoggingHandler(
# level=logging.INFO,
# logger_provider=logger_provider,
# )
#
# # Register handler with session
# set_session_handler(session_id, log_handler)
#
# return log_handler, log_processor
Comment on lines +61 to +102

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function setup_session_telemetry no longer returns the promised tuple[LoggingHandler, BatchLogRecordProcessor] as specified in its return type annotation, causing type checking failures and potential runtime errors.

📝 Committable Code Suggestion

‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
def setup_session_telemetry(session, log_exporter) -> tuple[LoggingHandler, BatchLogRecordProcessor]:
"""Set up OpenTelemetry logging components for a new session.
This function creates the necessary components to capture and export logs for a specific session:
- A LoggerProvider with session-specific resource attributes
- A BatchLogRecordProcessor to batch and export logs
- A LoggingHandler to capture logs and forward them to the processor
Args:
session_id: Unique identifier for the session, used to tag telemetry data
log_exporter: SessionLogExporter instance that handles sending logs to AgentOps backend
Returns:
Tuple containing:
- LoggingHandler: Handler that should be added to the logger
- BatchLogRecordProcessor: Processor that batches and exports logs
"""
# Create logging components
resource = Resource.create({SERVICE_NAME: f"agentops.session.{session.session_id}"})
logger_provider = LoggerProvider(resource=resource)
# Create processor and handler
log_processor = BatchLogRecordProcessor(log_exporter)
logger_provider.add_log_record_processor(log_processor) # Add processor to provider
from agentops.log_capture import LogCapture
logcap = LogCapture(
session,
)
logcap.start()
# log_handler = LoggingHandler(
# level=logging.INFO,
# logger_provider=logger_provider,
# )
#
# # Register handler with session
# set_session_handler(session_id, log_handler)
#
# return log_handler, log_processor
def setup_session_telemetry(session, log_exporter) -> tuple[LoggingHandler, BatchLogRecordProcessor]:
# ...
logcap.start()
return log_handler, log_processor



def cleanup_session_telemetry(log_handler: LoggingHandler, log_processor: BatchLogRecordProcessor) -> None:
"""Clean up OpenTelemetry logging components when a session ends.

This function ensures proper cleanup by:
1. Removing the handler from the logger
2. Closing the handler to free resources
3. Flushing any pending logs in the processor
4. Shutting down the processor

Args:
log_handler: The session's LoggingHandler to be removed and closed
log_processor: The session's BatchLogRecordProcessor to be flushed and shutdown

Used by:
Session.end_session() to clean up logging components when the session ends
"""
from agentops.log_config import logger

try:
# Remove and close handler
logger.removeHandler(log_handler)
log_handler.close()

# Remove from session handlers
for session_id, handler in list(_session_handlers.items()):
if handler is log_handler:
set_session_handler(session_id, None)
break

# Shutdown processor
log_processor.force_flush(timeout_millis=5000)
log_processor.shutdown()
except Exception as e:
logger.warning(f"Error during logging cleanup: {e}")
Loading
Loading