diff --git a/agentops/client/api/versions/v4.py b/agentops/client/api/versions/v4.py index 295a1c650..68f035041 100644 --- a/agentops/client/api/versions/v4.py +++ b/agentops/client/api/versions/v4.py @@ -68,4 +68,35 @@ def upload_object(self, body: Union[str, bytes]) -> UploadedObjectResponse: return UploadedObjectResponse(**response_data) except Exception as e: raise ApiServerException(f"Failed to process upload response: {str(e)}") + + + def upload_logfile(self, body: Union[str, bytes], trace_id: int) -> UploadedObjectResponse: + """ + Upload an log file to the API and return the response. + + Args: + body: The log file to upload, either as a string or bytes. + Returns: + UploadedObjectResponse: The response from the API after upload. + """ + if isinstance(body, bytes): + body = body.decode("utf-8") + + response = self.post("/v4/logs/upload/", body, {**self.prepare_headers(), "Trace-Id": str(trace_id)}) + + if response.status_code != 200: + error_msg = f"Upload failed: {response.status_code}" + try: + error_data = response.json() + if "error" in error_data: + error_msg = error_data["error"] + except Exception: + pass + raise ApiServerException(error_msg) + + try: + response_data = response.json() + return UploadedObjectResponse(**response_data) + except Exception as e: + raise ApiServerException(f"Failed to process upload response: {str(e)}") diff --git a/agentops/logging/__init__.py b/agentops/logging/__init__.py index 43fd391e4..167c4a673 100644 --- a/agentops/logging/__init__.py +++ b/agentops/logging/__init__.py @@ -1,3 +1,4 @@ from .config import configure_logging, logger +from .instrument_logging import setup_print_logger, upload_logfile -__all__ = ["logger", "configure_logging"] +__all__ = ["logger", "configure_logging", "setup_print_logger", "upload_logfile"] diff --git a/agentops/logging/instrument_logging.py b/agentops/logging/instrument_logging.py new file mode 100644 index 000000000..278bc4030 --- /dev/null +++ b/agentops/logging/instrument_logging.py @@ -0,0 +1,86 @@ +import builtins +import logging +import os +import atexit +from datetime import datetime +from typing import Any, TextIO +from agents import Span + +_original_print = builtins.print + +LOGFILE_NAME = "agentops-tmp.log" + +## Instrument loggers and print function to log to a file +def setup_print_logger() -> None: + """ + ~Monkeypatches~ *Instruments the built-in print function and configures logging to also log to a file. + Preserves existing logging configuration and console output behavior. + """ + log_file = os.path.join(os.getcwd(), LOGFILE_NAME) + + file_logger = logging.getLogger('agentops_file_logger') + file_logger.setLevel(logging.DEBUG) + + file_handler = logging.FileHandler(log_file) + file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) + file_handler.setLevel(logging.DEBUG) + file_logger.addHandler(file_handler) + + # Ensure the new logger doesn't propagate to root + file_logger.propagate = False + + def print_logger(*args: Any, **kwargs: Any) -> None: + """ + Custom print function that logs to file and console. + + Args: + *args: Arguments to print + **kwargs: Keyword arguments to print + """ + message = " ".join(str(arg) for arg in args) + file_logger.info(message) + + # print to console using original print + _original_print(*args, **kwargs) + + # replace the built-in print with ours + builtins.print = print_logger + + def cleanup(): + """ + Cleanup function to be called when the process exits. + Removes the log file and restores the original print function. + """ + try: + # Remove our file handler + for handler in file_logger.handlers[:]: + handler.close() + file_logger.removeHandler(handler) + + # Restore the original print function + builtins.print = _original_print + except Exception as e: + # If something goes wrong during cleanup, just print the error + _original_print(f"Error during cleanup: {e}") + + # Register the cleanup function to run when the process exits + atexit.register(cleanup) + + +def upload_logfile(trace_id: int) -> None: + """ + Upload the log file to the API. + """ + from agentops import get_client + + log_file = os.path.join(os.getcwd(), LOGFILE_NAME) + if not os.path.exists(log_file): + return + with open(log_file, "r") as f: + log_content = f.read() + + client = get_client() + client.api.v4.upload_logfile(log_content, trace_id) + + os.remove(log_file) + \ No newline at end of file diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index 6ce9be370..78b8129e2 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -21,7 +21,7 @@ from opentelemetry import context as context_api from agentops.exceptions import AgentOpsClientNotInitializedException -from agentops.logging import logger +from agentops.logging import logger, setup_print_logger from agentops.sdk.processors import InternalSpanProcessor from agentops.sdk.types import TracingConfig from agentops.semconv import ResourceAttributes @@ -170,6 +170,9 @@ def setup_telemetry( meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) metrics.set_meter_provider(meter_provider) + ### Logging + setup_print_logger() + # Initialize root context context_api.get_current() @@ -256,8 +259,8 @@ def initialize(self, jwt: Optional[str] = None, **kwargs) -> None: self._provider, self._meter_provider = setup_telemetry( service_name=config["service_name"] or "", project_id=config.get("project_id"), - exporter_endpoint=config["exporter_endpoint"] or "", - metrics_endpoint=config["metrics_endpoint"] or "", + exporter_endpoint=config["exporter_endpoint"], + metrics_endpoint=config["metrics_endpoint"], max_queue_size=config["max_queue_size"], max_wait_time=config["max_wait_time"], export_flush_interval=config["export_flush_interval"], diff --git a/agentops/sdk/processors.py b/agentops/sdk/processors.py index 985907635..c10fff868 100644 --- a/agentops/sdk/processors.py +++ b/agentops/sdk/processors.py @@ -12,11 +12,10 @@ from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor from opentelemetry.sdk.trace.export import SpanExporter -import agentops.semconv as semconv from agentops.logging import logger from agentops.helpers.dashboard import log_trace_url from agentops.semconv.core import CoreAttributes - +from agentops.logging import upload_logfile class LiveSpanProcessor(SpanProcessor): def __init__(self, span_exporter: SpanExporter, **kwargs): @@ -91,7 +90,7 @@ class InternalSpanProcessor(SpanProcessor): - This processor tries to use the native kind first, then falls back to the attribute """ - _root_span_id: Optional[Span] = None + _root_span_id: Optional[int] = None def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: """ @@ -124,6 +123,10 @@ def on_end(self, span: ReadableSpan) -> None: if self._root_span_id and (span.context.span_id is self._root_span_id): logger.debug(f"[agentops.InternalSpanProcessor] Ending root span: {span.name}") log_trace_url(span) + try: + upload_logfile(span.context.trace_id) + except Exception as e: + logger.error(f"[agentops.InternalSpanProcessor] Error uploading logfile: {e}") def shutdown(self) -> None: """Shutdown the processor.""" diff --git a/tests/unit/logging/test_instrument_logging.py b/tests/unit/logging/test_instrument_logging.py new file mode 100644 index 000000000..294f022de --- /dev/null +++ b/tests/unit/logging/test_instrument_logging.py @@ -0,0 +1,70 @@ +import os +import builtins +import pytest +from unittest.mock import patch, MagicMock +from agentops.logging.instrument_logging import setup_print_logger, upload_logfile, LOGFILE_NAME +import logging + +@pytest.fixture +def cleanup_log_file(): + """Fixture to clean up the log file before and after tests""" + log_file = os.path.join(os.getcwd(), LOGFILE_NAME) + if os.path.exists(log_file): + os.remove(log_file) + yield + if os.path.exists(log_file): + os.remove(log_file) + +def test_setup_print_logger_creates_log_file(cleanup_log_file): + """Test that setup_print_logger creates a log file""" + setup_print_logger() + log_file = os.path.join(os.getcwd(), LOGFILE_NAME) + assert os.path.exists(log_file) + +def test_print_logger_writes_to_file(cleanup_log_file): + """Test that the monkeypatched print function writes to the log file""" + setup_print_logger() + test_message = "Test log message" + print(test_message) + + log_file = os.path.join(os.getcwd(), LOGFILE_NAME) + with open(log_file, 'r') as f: + log_content = f.read() + assert test_message in log_content + +def test_print_logger_preserves_original_print(cleanup_log_file): + """Test that the original print function is preserved""" + original_print = builtins.print + setup_print_logger() + assert builtins.print != original_print + + # Cleanup should restore original print + for handler in logging.getLogger('agentops_file_logger').handlers[:]: + handler.close() + logging.getLogger('agentops_file_logger').removeHandler(handler) + builtins.print = original_print + +@patch('agentops.get_client') +def test_upload_logfile(mock_get_client, cleanup_log_file): + """Test that upload_logfile reads and uploads log content""" + # Setup + setup_print_logger() + test_message = "Test upload message" + print(test_message) + + # Mock the client + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + # Test upload + upload_logfile(trace_id=123) + + # Verify + mock_client.api.v4.upload_logfile.assert_called_once() + assert not os.path.exists(os.path.join(os.getcwd(), LOGFILE_NAME)) + +def test_upload_logfile_nonexistent_file(): + """Test that upload_logfile handles nonexistent log file gracefully""" + with patch('agentops.get_client') as mock_get_client: + upload_logfile(trace_id=123) + mock_get_client.assert_not_called() \ No newline at end of file