Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 35 additions & 29 deletions agentops/logging/instrument_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,59 +3,64 @@
import os
import atexit
from typing import Any
from io import StringIO

_original_print = builtins.print

LOGFILE_NAME = "agentops-tmp.log"

# Instrument loggers and print function to log to a file

# Global buffer to store logs
_log_buffer = StringIO()

def setup_print_logger() -> None:
"""
~Monkeypatches~ *Instruments the built-in print function and configures logging to also log to a file.
Instruments the built-in print function and configures logging to use a memory buffer.
Preserves existing logging configuration and console output behavior.
"""
log_file = os.path.join(os.getcwd(), LOGFILE_NAME)
buffer_logger = logging.getLogger('agentops_buffer_logger')
buffer_logger.setLevel(logging.DEBUG)

file_logger = logging.getLogger('agentops_file_logger')
file_logger.setLevel(logging.DEBUG)
# Check if the logger already has handlers to prevent duplicates
if not buffer_logger.handlers:
# Create a StreamHandler that writes to our StringIO buffer
buffer_handler = logging.StreamHandler(_log_buffer)
buffer_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
buffer_handler.setLevel(logging.DEBUG)
buffer_logger.addHandler(buffer_handler)

file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
file_handler.setLevel(logging.DEBUG)
file_logger.addHandler(file_handler)

# Ensure the new logger doesn't propagate to root
file_logger.propagate = False
# Ensure the new logger doesn't propagate to root
buffer_logger.propagate = False

def print_logger(*args: Any, **kwargs: Any) -> None:
"""
Custom print function that logs to file and console.
Custom print function that logs to buffer and console.

Args:
*args: Arguments to print
**kwargs: Keyword arguments to print
"""
message = " ".join(str(arg) for arg in args)
file_logger.info(message)
buffer_logger.info(message)

# print to console using original print
_original_print(*args, **kwargs)

# replace the built-in print with ours
builtins.print = print_logger
# Only replace print if it hasn't been replaced already
if builtins.print is _original_print:
builtins.print = print_logger

def cleanup():
"""
Cleanup function to be called when the process exits.
Removes the log file and restores the original print function.
Restores the original print function and clears the buffer.
"""
try:
# Remove our file handler
for handler in file_logger.handlers[:]:
# Remove our buffer handler
for handler in buffer_logger.handlers[:]:
handler.close()
file_logger.removeHandler(handler)
buffer_logger.removeHandler(handler)

# Clear the buffer
_log_buffer.seek(0)
_log_buffer.truncate()

# Restore the original print function
builtins.print = _original_print
Expand All @@ -69,17 +74,18 @@ def cleanup():

def upload_logfile(trace_id: int) -> None:
"""
Upload the log file to the API.
Upload the log content from the memory buffer to the API.
"""
from agentops import get_client

log_file = os.path.join(os.getcwd(), LOGFILE_NAME)
if not os.path.exists(log_file):
# Get the content from the buffer
log_content = _log_buffer.getvalue()
if not log_content:
return
with open(log_file, "r") as f:
log_content = f.read()

client = get_client()
client.api.v4.upload_logfile(log_content, trace_id)

os.remove(log_file)
# Clear the buffer after upload
_log_buffer.seek(0)
_log_buffer.truncate()
67 changes: 27 additions & 40 deletions tests/unit/logging/test_instrument_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,56 @@
import builtins
import pytest
from unittest.mock import patch, MagicMock
from agentops.logging.instrument_logging import setup_print_logger, upload_logfile, LOGFILE_NAME
from agentops.logging.instrument_logging import setup_print_logger, upload_logfile
import logging

@pytest.fixture
def cleanup_log_file():
"""Fixture to clean up the log file before and after tests"""
log_file = os.path.join(os.getcwd(), LOGFILE_NAME)
if os.path.exists(log_file):
os.remove(log_file)
def reset_print():
"""Fixture to reset the print function after tests"""
original_print = builtins.print
yield
if os.path.exists(log_file):
os.remove(log_file)
builtins.print = original_print

def test_setup_print_logger_creates_log_file(cleanup_log_file):
"""Test that setup_print_logger creates a log file"""
def test_setup_print_logger_creates_buffer_logger_and_handler():
"""Test that setup_print_logger creates a buffer logger with a StreamHandler."""
setup_print_logger()
log_file = os.path.join(os.getcwd(), LOGFILE_NAME)
assert os.path.exists(log_file)
buffer_logger = logging.getLogger('agentops_buffer_logger')
assert buffer_logger.level == logging.DEBUG
assert len(buffer_logger.handlers) == 1
assert isinstance(buffer_logger.handlers[0], logging.StreamHandler)

def test_print_logger_writes_to_file(cleanup_log_file):
"""Test that the monkeypatched print function writes to the log file"""
def test_print_logger_writes_message_to_stringio_buffer(reset_print):
"""Test that the monkeypatched print function writes messages to the StringIO buffer."""
setup_print_logger()
test_message = "Test log message"
print(test_message)

log_file = os.path.join(os.getcwd(), LOGFILE_NAME)
with open(log_file, 'r') as f:
log_content = f.read()
assert test_message in log_content
buffer_logger = logging.getLogger('agentops_buffer_logger')
log_content = buffer_logger.handlers[0].stream.getvalue()
assert test_message in log_content

def test_print_logger_preserves_original_print(cleanup_log_file):
"""Test that the original print function is preserved"""
def test_print_logger_replaces_and_restores_builtin_print(reset_print):
"""Test that setup_print_logger replaces builtins.print and the fixture restores it after the test."""
import agentops.logging.instrument_logging as il
builtins.print = il._original_print
original_print = builtins.print
setup_print_logger()
assert builtins.print != original_print

# Cleanup should restore original print
for handler in logging.getLogger('agentops_file_logger').handlers[:]:
handler.close()
logging.getLogger('agentops_file_logger').removeHandler(handler)
builtins.print = original_print
# The reset_print fixture will restore print after the test

@patch('agentops.get_client')
def test_upload_logfile(mock_get_client, cleanup_log_file):
"""Test that upload_logfile reads and uploads log content"""
# Setup
def test_upload_logfile_sends_buffer_content_and_clears_buffer(mock_get_client):
"""Test that upload_logfile uploads the buffer content and clears the buffer after upload."""
setup_print_logger()
test_message = "Test upload message"
print(test_message)

# Mock the client
mock_client = MagicMock()
mock_get_client.return_value = mock_client

# Test upload
upload_logfile(trace_id=123)

# Verify
mock_client.api.v4.upload_logfile.assert_called_once()
assert not os.path.exists(os.path.join(os.getcwd(), LOGFILE_NAME))
buffer_logger = logging.getLogger('agentops_buffer_logger')
assert buffer_logger.handlers[0].stream.getvalue() == ""

def test_upload_logfile_nonexistent_file():
"""Test that upload_logfile handles nonexistent log file gracefully"""
def test_upload_logfile_does_nothing_when_buffer_is_empty():
"""Test that upload_logfile does nothing and does not call the client when the buffer is empty."""
with patch('agentops.get_client') as mock_get_client:
upload_logfile(trace_id=123)
mock_get_client.assert_not_called()
Loading