diff --git a/agentops/__init__.py b/agentops/__init__.py index 3b252759a..ef79b4a73 100755 --- a/agentops/__init__.py +++ b/agentops/__init__.py @@ -15,7 +15,8 @@ from typing import List, Optional, Union, Dict, Any from agentops.client import Client from agentops.sdk.core import TracingCore, TraceContext -from agentops.sdk.decorators import trace, session, agent, task, workflow, operation +from agentops.sdk.decorators import trace, session, agent, task, workflow, operation, tool +from agentops.context_manager import InitializationProxy from agentops.logging.config import logger @@ -106,24 +107,28 @@ def init( elif default_tags: merged_tags = default_tags - return _client.init( - api_key=api_key, - endpoint=endpoint, - app_url=app_url, - max_wait_time=max_wait_time, - max_queue_size=max_queue_size, - default_tags=merged_tags, - trace_name=trace_name, - instrument_llm_calls=instrument_llm_calls, - auto_start_session=auto_start_session, - auto_init=auto_init, - skip_auto_end_session=skip_auto_end_session, - env_data_opt_out=env_data_opt_out, - log_level=log_level, - fail_safe=fail_safe, - exporter_endpoint=exporter_endpoint, + # Prepare initialization arguments + init_kwargs = { + "api_key": api_key, + "endpoint": endpoint, + "app_url": app_url, + "max_wait_time": max_wait_time, + "max_queue_size": max_queue_size, + "default_tags": merged_tags, + "trace_name": trace_name, + "instrument_llm_calls": instrument_llm_calls, + "auto_start_session": auto_start_session, + "auto_init": auto_init, + "skip_auto_end_session": skip_auto_end_session, + "env_data_opt_out": env_data_opt_out, + "log_level": log_level, + "fail_safe": fail_safe, + "exporter_endpoint": exporter_endpoint, **kwargs, - ) + } + + # Return a proxy that supports both regular and context manager usage + return InitializationProxy(_client, init_kwargs) def configure(**kwargs): @@ -247,4 +252,5 @@ def end_trace(trace_context: Optional[TraceContext] = None, end_state: str = "Su "task", "workflow", "operation", + "tool", ] diff --git a/agentops/context_manager.py b/agentops/context_manager.py new file mode 100644 index 000000000..09c1e391a --- /dev/null +++ b/agentops/context_manager.py @@ -0,0 +1,342 @@ +""" +Context manager for AgentOps SDK initialization and lifecycle management. +""" + +from typing import Optional, Any, Dict +from opentelemetry import context as context_api + +from agentops.logging import logger +from agentops.sdk.core import TracingCore, TraceContext +from agentops.legacy import Session + + +class AgentOpsContextManager: + """ + Context manager for AgentOps SDK that handles initialization and automatic cleanup. + + This class enables the following usage pattern: + + with agentops.init(api_key="...") as session: + # Your agent code here + pass + # Trace automatically ends here + + It ensures that traces are properly ended even if exceptions occur. + """ + + def __init__(self, client, init_kwargs: Dict[str, Any]): + """ + Initialize the context manager. + + Args: + client: The AgentOps Client instance + init_kwargs: Keyword arguments to pass to client.init() + """ + self.client = client + self.init_kwargs = init_kwargs + self.init_result = None + self.trace_context: Optional[TraceContext] = None + self.managed_session: Optional[Session] = None + self._created_trace = False + self._context_token: Optional[context_api.Token] = None + + def __enter__(self) -> Optional[Session]: + """ + Enter the context manager. + + This method: + 1. Initializes the client if not already initialized + 2. Starts a trace if auto_start_session is False + 3. Returns a Session object for the active trace + + Returns: + Session object for the active trace, or None if initialization fails + """ + # Perform initialization + self.init_result = self.client.init(**self.init_kwargs) + + # If init returned a Session (auto_start_session=True), use it + # Check for Session by checking if it has the expected attributes + if self.init_result is not None and hasattr(self.init_result, "trace_context"): + self.managed_session = self.init_result + return self.managed_session + + # Otherwise, check if we should start a trace for this context + tracing_core = TracingCore.get_instance() + if not tracing_core.initialized: + logger.warning("TracingCore not initialized after client.init(). Cannot start context trace.") + return None + + # If auto_start_session was False or None, start a trace for this context + auto_start = self.init_kwargs.get("auto_start_session") + if auto_start is False or auto_start is None: + trace_name = self.init_kwargs.get("trace_name", "context_session") + tags = self.init_kwargs.get("default_tags") + + self.trace_context = tracing_core.start_trace(trace_name=trace_name, tags=tags) + if self.trace_context: + self._created_trace = True + self.managed_session = Session(self.trace_context) + logger.debug(f"Started context-managed trace: {trace_name}") + else: + logger.error("Failed to start trace for context manager") + + return self.managed_session + + def __exit__(self, exc_type: Optional[type], exc_val: Optional[Exception], exc_tb: Optional[Any]) -> bool: + """ + Exit the context manager. + + This method: + 1. Determines the appropriate end state based on exceptions + 2. Ends any trace that was created by this context + 3. Does NOT end traces that were auto-started by init() + + Args: + exc_type: Exception type if an exception occurred + exc_val: Exception value if an exception occurred + exc_tb: Exception traceback if an exception occurred + + Returns: + False to propagate any exceptions + """ + tracing_core = TracingCore.get_instance() + if not tracing_core.initialized: + return False + + # Determine end state based on exception + if exc_type is not None: + end_state = "Error" + if exc_val: + logger.debug(f"Context manager exiting with exception: {exc_val}") + else: + end_state = "Success" + + # Only end traces that we created in __enter__ + if self._created_trace and self.trace_context: + try: + tracing_core.end_trace(self.trace_context, end_state) + logger.debug(f"Ended context-managed trace with state: {end_state}") + except Exception as e: + logger.error(f"Error ending context-managed trace: {e}") + + # For auto-started sessions, we don't end them here + # They will be ended by the existing atexit handler or manually + elif self.managed_session and hasattr(self.managed_session, "trace_context"): + logger.debug("Not ending auto-started session in context manager exit") + + # Don't suppress exceptions + return False + + def __getattr__(self, name: str) -> Any: + """ + Delegate attribute access to the managed session. + + This allows the context manager to be used as if it were the session itself. + """ + if self.managed_session: + return getattr(self.managed_session, name) + raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") + + +class InitializationProxy: + """ + A proxy object that can act as both a regular return value and a context manager. + + This allows agentops.init() to be used in both ways: + - session = agentops.init(...) # Regular usage + - with agentops.init(...) as session: # Context manager usage + """ + + def __init__(self, client, init_kwargs: Dict[str, Any]): + """ + Initialize the proxy. + + Args: + client: The AgentOps Client instance + init_kwargs: Keyword arguments for initialization + """ + self.client = client + self.init_kwargs = init_kwargs + self._result = None + self._initialized = False + + # Immediately initialize the client to maintain backward compatibility + # This ensures that agentops._client.initialized is True after init() + self._ensure_initialized() + + def _ensure_initialized(self): + """Ensure the client is initialized for non-context usage.""" + if not self._initialized: + try: + self._result = self.client.init(**self.init_kwargs) + self._initialized = True + except Exception as e: + self._initialized = True # Mark as attempted + self._result = None + # Re-raise the exception so it propagates to the caller + raise e + + def __enter__(self): + """ + Delegate to AgentOpsContextManager for context manager usage. + """ + ctx_manager = AgentOpsContextManager(self.client, self.init_kwargs) + self._ctx_manager = ctx_manager + return ctx_manager.__enter__() + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + Delegate to AgentOpsContextManager for context manager usage. + + This ensures proper cleanup following OpenTelemetry patterns. + """ + if hasattr(self, "_ctx_manager"): + return self._ctx_manager.__exit__(exc_type, exc_val, exc_tb) + return False + + def __getattr__(self, name: str) -> Any: + """ + For non-context usage, initialize and delegate to the result. + + This allows code like: + session = agentops.init(...) + session.record(event) # This triggers initialization + """ + # Special handling for certain attributes to avoid infinite recursion + if name.startswith("_"): + raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") + + self._ensure_initialized() + if self._result is not None: + return getattr(self._result, name) + raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") + + def __bool__(self) -> bool: + """ + Support boolean evaluation. + + This allows code like: + if agentops.init(...): + # Initialization successful + """ + try: + self._ensure_initialized() + return bool(self._result) + except Exception: + return False + + def __repr__(self) -> str: + """String representation that clearly identifies this as a proxy.""" + if self._initialized: + return f"" + return "" + + def __eq__(self, other) -> bool: + """Support equality comparison.""" + self._ensure_initialized() + return self._result == other + + def __ne__(self, other) -> bool: + """Support inequality comparison.""" + return not self.__eq__(other) + + def __hash__(self) -> int: + """Support hashing.""" + self._ensure_initialized() + return hash(self._result) if self._result else hash(None) + + def __str__(self) -> str: + """String conversion.""" + try: + self._ensure_initialized() + if self._result is not None and hasattr(self._result, "__str__"): + return str(self._result) + return "None" + except Exception: + return "" + + def get_wrapped_result(self): + """ + Get the actual wrapped result object. + + This method provides explicit access to the wrapped object + for cases where direct access is needed. + + Returns: + The actual result object from client.init() + """ + self._ensure_initialized() + return self._result + + def get_proxy_type(self) -> str: + """ + Get the type of this proxy for debugging purposes. + + This method makes it clear that this is a proxy object, + which helps with debugging and introspection. + + Returns: + String identifying this as an InitializationProxy + """ + return "InitializationProxy" + + def is_context_manager(self) -> bool: + """ + Check if this proxy can be used as a context manager. + + Returns: + True, since this proxy supports context manager protocol + """ + return True + + def __len__(self) -> int: + """Support len() function.""" + self._ensure_initialized() + if self._result is not None and hasattr(self._result, "__len__"): + return len(self._result) + raise TypeError(f"object of type '{self.__class__.__name__}' has no len()") + + def __iter__(self): + """Support iteration.""" + self._ensure_initialized() + if self._result is not None and hasattr(self._result, "__iter__"): + return iter(self._result) + raise TypeError(f"'{self.__class__.__name__}' object is not iterable") + + def __contains__(self, item) -> bool: + """Support 'in' operator.""" + self._ensure_initialized() + if self._result is not None and hasattr(self._result, "__contains__"): + return item in self._result + return False + + def __getitem__(self, key): + """Support indexing and slicing.""" + self._ensure_initialized() + if self._result is not None and hasattr(self._result, "__getitem__"): + return self._result[key] + raise TypeError(f"'{self.__class__.__name__}' object is not subscriptable") + + def __setitem__(self, key, value): + """Support item assignment.""" + self._ensure_initialized() + if self._result is not None and hasattr(self._result, "__setitem__"): + self._result[key] = value + else: + raise TypeError(f"'{self.__class__.__name__}' object does not support item assignment") + + def __delitem__(self, key): + """Support item deletion.""" + self._ensure_initialized() + if self._result is not None and hasattr(self._result, "__delitem__"): + del self._result[key] + else: + raise TypeError(f"'{self.__class__.__name__}' object doesn't support item deletion") + + def __call__(self, *args, **kwargs): + """Support calling the proxy as a function.""" + self._ensure_initialized() + if self._result is not None and callable(self._result): + return self._result(*args, **kwargs) + raise TypeError(f"'{self.__class__.__name__}' object is not callable") diff --git a/docs/mint.json b/docs/mint.json index f2c0865f7..d309a0823 100644 --- a/docs/mint.json +++ b/docs/mint.json @@ -186,6 +186,7 @@ "pages": [ "v2/usage/dashboard-info", "v2/usage/sdk-reference", + "v2/usage/context-manager", "v2/usage/advanced-configuration", "v2/usage/tracking-llm-calls", "v2/usage/tracking-agents", diff --git a/docs/v2/usage/context-manager.mdx b/docs/v2/usage/context-manager.mdx new file mode 100644 index 000000000..86a2a5215 --- /dev/null +++ b/docs/v2/usage/context-manager.mdx @@ -0,0 +1,410 @@ +--- +title: "Context Manager" +description: "Use AgentOps with Python's 'with' statement for automatic lifecycle management" +--- + +# Context Manager + +AgentOps v0.4+ supports Python's context manager protocol, allowing you to use the `with` statement for automatic initialization and cleanup of traces. This ensures that traces are properly ended even if exceptions occur. + +## Basic Usage + + + +```python Basic Context Manager +import agentops +from agentops import agent, task, tool + +@agent +class MyAgent: + def __init__(self, name: str): + self.name = name + + @task + def process_data(self, data: str) -> str: + return f"Processed: {data}" + +# Context manager automatically handles trace lifecycle +with agentops.init( + api_key="your-api-key", + trace_name="my_workflow", + auto_start_session=False # Let context manager handle the session +): + # Your agent code here + agent = MyAgent("ContextAgent") + result = agent.process_data("sample data") + print(f"Result: {result}") +# Trace automatically ends here with "Success" status +``` + +```python Traditional Approach +import agentops +from agentops import agent, task, tool + +@agent +class MyAgent: + def __init__(self, name: str): + self.name = name + + @task + def process_data(self, data: str) -> str: + return f"Processed: {data}" + +# Traditional initialization +agentops.init(api_key="your-api-key", auto_start_session=True) + +# Your agent code here +agent = MyAgent("TraditionalAgent") +result = agent.process_data("sample data") +print(f"Result: {result}") + +# Must remember to end session manually +agentops.end_session() +``` + + + +## Exception Handling + +The context manager automatically handles exceptions and marks traces with the appropriate status: + + + +```python Exception Handling +try: + with agentops.init( + api_key="your-api-key", + trace_name="error_handling_example" + ): + # Your agent code + agent = MyAgent("ErrorAgent") + + # Simulate an error + raise ValueError("Something went wrong") + +except ValueError as e: + print(f"Caught exception: {e}") + # Trace was automatically ended with "Error" status +``` + +```python Nested Exception Handling +try: + with agentops.init( + api_key="your-api-key", + trace_name="nested_error_test" + ): + agent = MyAgent("NestedAgent") + + # First level of operations + result1 = agent.process_data("level 1 data") + + # Nested operation that fails + try: + risky_operation() + except TimeoutError as nested_error: + print(f"Nested operation failed: {nested_error}") + # Continue with fallback + fallback_data = agent.process_data("fallback data") + + # Continue with main flow + final_result = agent.process_data("final data") + +except Exception as e: + print(f"Unexpected error: {e}") + # Trace automatically marked as "Error" +``` + + + +## Configuration Options + + + +```python Full Configuration +with agentops.init( + api_key="your-api-key", + endpoint="https://api.agentops.ai", + trace_name="configured_trace", + default_tags=["production", "v2.0"], + instrument_llm_calls=True, + auto_start_session=False, # Recommended for context manager usage + max_wait_time=10000, + max_queue_size=1024 +): + # Your instrumented code here + agent = MyAgent("ConfiguredAgent") + result = agent.process_data("configured data") +``` + +```python Minimal Configuration +with agentops.init( + api_key="your-api-key", + trace_name="simple_trace" +): + # Minimal setup with defaults + agent = MyAgent("SimpleAgent") + result = agent.process_data("simple data") +``` + + + +## Advanced Patterns + +### Retry Logic with Individual Traces + +Each retry attempt gets its own trace for clear debugging: + + + +```python Simple Retry Logic +# See examples/context_manager/04_retry_logic.py for complete example +max_retries = 3 + +for attempt in range(max_retries): + try: + with agentops.init( + api_key="your-api-key", + trace_name=f"retry_attempt_{attempt + 1}", + default_tags=["retry", f"attempt-{attempt + 1}"] + ): + agent = MyAgent(f"RetryAgent_Attempt{attempt + 1}") + + # Operation that might fail + if attempt < 2: # Simulate failures + raise ConnectionError(f"Simulated failure on attempt {attempt + 1}") + + # Success on final attempt + result = agent.process_data("retry operation data") + print(f"โœ… Success! Result: {result}") + break + + except ConnectionError as e: + print(f"โŒ Attempt {attempt + 1} failed: {e}") + if attempt == max_retries - 1: + print("๐Ÿ’ฅ All retry attempts exhausted") + raise + print("๐Ÿ”„ Retrying...") +``` + +```python Exponential Backoff Retry +# See examples/context_manager/04_retry_logic.py for complete example +import time + +max_retries = 4 +base_delay = 0.1 + +for attempt in range(max_retries): + delay = base_delay * (2 ** attempt) # Exponential backoff + + try: + with agentops.init( + api_key="your-api-key", + trace_name=f"backoff_retry_{attempt + 1}", + default_tags=["retry", "exponential-backoff", f"attempt-{attempt + 1}"] + ): + agent = MyAgent(f"BackoffAgent_Attempt{attempt + 1}") + + # Simulate random failure + if random.random() < 0.7: + raise TimeoutError(f"Random timeout on attempt {attempt + 1}") + + # Success + result = agent.process_data("backoff operation data") + print(f"โœ… Success! Result: {result}") + break + + except TimeoutError as e: + print(f"โŒ Attempt {attempt + 1} failed: {e}") + if attempt == max_retries - 1: + raise + print(f"โณ Waiting {delay:.1f}s before retry...") + time.sleep(delay) +``` + + + +### Batch Processing + +Process multiple items with individual traces: + + + +```python Batch Processing +# See examples/context_manager/04_retry_logic.py for complete example +items = ["item1", "item2", "item3", "item4", "item5"] +results = {} + +for item in items: + try: + with agentops.init( + api_key="your-api-key", + trace_name=f"process_{item}", + default_tags=["batch", item] + ): + agent = MyAgent(f"BatchAgent_{item}") + result = agent.process_data(f"data for {item}") + results[item] = result + print(f"โœ… {item} processed successfully") + + except Exception as e: + print(f"โŒ {item} failed: {e}") + results[item] = f"FAILED: {e}" + +print("๐Ÿ“‹ Batch Results:") +for item, result in results.items(): + status = "โœ…" if not result.startswith("FAILED") else "โŒ" + print(f" {status} {item}: {result}") +``` + +```python Conditional Error Handling +# See examples/context_manager/02_exception_handling.py for complete example +try: + with agentops.init( + api_key="your-api-key", + trace_name="conditional_operation" + ): + agent = MyAgent("ConditionalAgent") + result = agent.process_data("sensitive data") + +except ConnectionError as e: + print(f"Network error (retryable): {e}") + # Implement retry logic + +except PermissionError as e: + print(f"Auth error (not retryable): {e}") + # Handle authentication failure + +except Exception as e: + print(f"Unexpected error: {e}") + # Handle other errors +``` + + + +## Mixed Usage Patterns + + + +```python Pre-initialized Client +# Initialize once at application startup +agentops.init(api_key="your-api-key", auto_start_session=False) + +# Use context managers for specific workflows +with agentops.init(trace_name="task_1"): + agent1 = MyAgent("Task1Agent") + agent1.process_data("task 1 data") + +with agentops.init(trace_name="task_2"): + agent2 = MyAgent("Task2Agent") + agent2.process_data("task 2 data") +``` + +```python Backward Compatibility +# See examples/context_manager/03_backward_compatibility.py for complete example + +# Traditional initialization still works +agentops.init( + api_key="your-api-key", + auto_start_session=True +) + +# Your existing agent code +agent = MyAgent("TraditionalAgent") +result = agent.process_data("traditional data") + +# Manually end session (or let atexit handle it) +agentops.end_session() +``` + + + +## Best Practices + +### 1. Use `auto_start_session=False` with Context Managers + + + +```python โœ… Recommended +with agentops.init(api_key="key", auto_start_session=False): + # Your code + agent = MyAgent("RecommendedAgent") + result = agent.process_data("data") +``` + +```python โš ๏ธ Also Works +with agentops.init(api_key="key", auto_start_session=True): + # Creates an additional auto-started session + agent = MyAgent("ExtraSessionAgent") + result = agent.process_data("data") +``` + + + +### 2. Use Descriptive Trace Names + + + +```python โœ… Good - Descriptive +with agentops.init( + api_key="key", + trace_name="user_onboarding_workflow" +): + # Onboarding logic + agent = MyAgent("OnboardingAgent") + agent.process_data("user data") +``` + +```python โŒ Bad - Generic +with agentops.init(api_key="key", trace_name="operation"): + # What operation? + agent = MyAgent("GenericAgent") + agent.process_data("data") +``` + + + +### 3. Keep Context Scope Focused + + + +```python โœ… Good - Focused Scope +# Separate contexts for different operations +with agentops.init(trace_name="data_validation"): + validator = DataValidator() + validator.validate(data) + +with agentops.init(trace_name="data_processing"): + processor = DataProcessor() + processor.process(data) +``` + +```python โŒ Bad - Too Broad +with agentops.init(trace_name="entire_application"): + # Everything happens here - hard to debug + validator = DataValidator() + validator.validate(data) + processor = DataProcessor() + processor.process(data) + # ... many more operations +``` + + + +## Example Files Reference + +The AgentOps repository includes comprehensive examples that demonstrate all these patterns: + + + + Basic context manager usage and comparison with traditional approach + + + Exception handling patterns and guaranteed cleanup + + + Backward compatibility and mixed usage patterns + + + Retry patterns with exponential backoff and batch processing + + diff --git a/examples/context_manager/01_basic_usage.py b/examples/context_manager/01_basic_usage.py new file mode 100644 index 000000000..fc010323d --- /dev/null +++ b/examples/context_manager/01_basic_usage.py @@ -0,0 +1,170 @@ +""" +Basic Context Manager Usage Example + +This example demonstrates the fundamental usage of AgentOps context manager +and compares it with the traditional approach. +""" + +import os + +import agentops +from dotenv import load_dotenv + +load_dotenv() + +# Get API key from environment +AGENTOPS_API_KEY = os.environ.get("AGENTOPS_API_KEY") + + +# Define agent class inline for standalone example +@agentops.agent +class ExampleAgent: + """A simple example agent for demonstrating AgentOps functionality.""" + + def __init__(self, name: str): + self.name = name + print(f"๐Ÿค– Created agent: {self.name}") + + @agentops.task + def process_data(self, data: str) -> str: + """Process some data with the agent.""" + print(f"๐Ÿ“Š {self.name} processing: {data}") + result = f"Processed: {data}" + + # Use a tool as part of processing + tool_result = self.use_tool(result) + return tool_result + + @agentops.tool + def use_tool(self, input_data: str) -> str: + """Use a tool to transform data.""" + print(f"๐Ÿ”ง {self.name} using tool on: {input_data}") + return f"Tool output: {input_data.upper()}" + + @agentops.task + def analyze_sentiment(self, text: str) -> str: + """Analyze sentiment of text.""" + print(f"๐Ÿ’ญ {self.name} analyzing sentiment: {text}") + # Simple mock sentiment analysis + if "good" in text.lower() or "great" in text.lower(): + return "positive" + elif "bad" in text.lower() or "terrible" in text.lower(): + return "negative" + else: + return "neutral" + + @agentops.tool + def fetch_data(self, source: str) -> str: + """Simulate fetching data from a source.""" + print(f"๐Ÿ“ก {self.name} fetching data from: {source}") + return f"Data from {source}: [mock data]" + + +def traditional_approach(): + """Example of traditional AgentOps usage.""" + print("๐Ÿ”„ Traditional Approach:") + print("-" * 30) + + # Traditional initialization + agentops.init(api_key=AGENTOPS_API_KEY, auto_start_session=True, default_tags=["traditional", "example"]) + + if AGENTOPS_API_KEY: + print("โœ… Session started manually") + + # Create and use agent + agent = ExampleAgent("TraditionalAgent") + result = agent.process_data("traditional data") + print(f"๐Ÿ“ค Result: {result}") + + # Manually end session + agentops.end_session() + print("โœ… Session ended manually") + else: + print("โŒ Failed to start session") + + +def context_manager_approach(): + """Example of context manager usage.""" + print("\n๐Ÿ†• Context Manager Approach:") + print("-" * 35) + + # Context manager automatically handles session lifecycle + with agentops.init( + api_key=AGENTOPS_API_KEY, + auto_start_session=False, # Let context manager handle it + trace_name="basic_example", + default_tags=["context-manager", "example"], + ): + print("โœ… Session started automatically") + + # Create and use agent + agent = ExampleAgent("ContextAgent") + result = agent.process_data("context manager data") + print(f"๐Ÿ“ค Result: {result}") + + # Session will be ended automatically when exiting the 'with' block + + print("โœ… Session ended automatically") + + +def multiple_operations_example(): + """Example showing multiple operations within a single context.""" + print("\n๐Ÿ”„ Multiple Operations in One Context:") + print("-" * 40) + + with agentops.init( + api_key=AGENTOPS_API_KEY, + trace_name="multi_operations", + auto_start_session=False, + default_tags=["multi-ops", "example"], + ): + print("โœ… Context started") + + # Create agent + agent = ExampleAgent("MultiOpAgent") + + # Perform multiple operations + print("\n๐Ÿ“Š Operation 1: Data Processing") + result1 = agent.process_data("first dataset") + + print("\n๐Ÿ’ญ Operation 2: Sentiment Analysis") + sentiment = agent.analyze_sentiment("This is a great example!") + + print("\n๐Ÿ“ก Operation 3: Data Fetching") + data = agent.fetch_data("external_api") + + print("\n๐Ÿ“‹ Summary:") + print(f" - Processing result: {result1}") + print(f" - Sentiment: {sentiment}") + print(f" - Fetched data: {data}") + + print("โœ… All operations completed, context ended") + + +if __name__ == "__main__": + print("๐Ÿš€ AgentOps Basic Context Manager Usage") + print("=" * 50) + + try: + # Show traditional approach + traditional_approach() + + # Show context manager approach + context_manager_approach() + + # Show multiple operations + multiple_operations_example() + + print("\n" + "=" * 50) + print("โœ… Basic usage examples completed!") + print("\n๐Ÿ’ก Key Takeaways:") + print(" โ€ข Context manager automatically handles session lifecycle") + print(" โ€ข No need to manually call end_session()") + print(" โ€ข Clear scope definition for operations") + print(" โ€ข Same functionality as traditional approach") + + except Exception as e: + print(f"โŒ Error running examples: {e}") + import traceback + + traceback.print_exc() diff --git a/examples/context_manager/02_exception_handling.py b/examples/context_manager/02_exception_handling.py new file mode 100644 index 000000000..1490347ec --- /dev/null +++ b/examples/context_manager/02_exception_handling.py @@ -0,0 +1,266 @@ +""" +Exception Handling Example + +This example demonstrates how the AgentOps context manager handles +different types of exceptions while ensuring proper cleanup. +""" + +import os + +import agentops +from dotenv import load_dotenv + +load_dotenv() + +# Get API key from environment +AGENTOPS_API_KEY = os.environ.get("AGENTOPS_API_KEY") + + +# Define agent class inline for standalone example +@agentops.agent +class ExampleAgent: + """A simple example agent for demonstrating AgentOps functionality.""" + + def __init__(self, name: str): + self.name = name + print(f"๐Ÿค– Created agent: {self.name}") + + @agentops.task + def process_data(self, data: str) -> str: + """Process some data with the agent.""" + print(f"๐Ÿ“Š {self.name} processing: {data}") + result = f"Processed: {data}" + + # Use a tool as part of processing + tool_result = self.use_tool(result) + return tool_result + + @agentops.tool + def use_tool(self, input_data: str) -> str: + """Use a tool to transform data.""" + print(f"๐Ÿ”ง {self.name} using tool on: {input_data}") + return f"Tool output: {input_data.upper()}" + + @agentops.task + def analyze_sentiment(self, text: str) -> str: + """Analyze sentiment of text.""" + print(f"๐Ÿ’ญ {self.name} analyzing sentiment: {text}") + # Simple mock sentiment analysis + if "good" in text.lower() or "great" in text.lower(): + return "positive" + elif "bad" in text.lower() or "terrible" in text.lower(): + return "negative" + else: + return "neutral" + + @agentops.tool + def fetch_data(self, source: str) -> str: + """Simulate fetching data from a source.""" + print(f"๐Ÿ“ก {self.name} fetching data from: {source}") + return f"Data from {source}: [mock data]" + + +def span_recording_error_example(): + """Example of handling span recording errors.""" + print("๐Ÿงช Scenario 1: Span Recording Error") + print("-" * 40) + + try: + with agentops.init( + api_key=AGENTOPS_API_KEY, + trace_name="span_error_test", + auto_start_session=False, + default_tags=["error-test", "span"], + ): + print("โœ… Context manager entered") + + # Create agent and do some work + agent = ExampleAgent("ErrorTestAgent") + result = agent.process_data("test data") + print(f"๐Ÿ“Š Processing completed: {result}") + + # Simulate a span recording error (this would be internal to AgentOps) + # In real scenarios, this might be a network error, API failure, etc. + print("โš ๏ธ Simulating span recording error...") + raise ConnectionError("Failed to record span to AgentOps API") + + except ConnectionError as e: + print(f"โŒ Caught span error: {e}") + print("โœ… Context manager properly cleaned up trace") + + print("โœ… Scenario 1 completed\n") + + +def agent_error_example(): + """Example of handling agent-related errors.""" + print("๐Ÿงช Scenario 2: Agent Error (LLM API Failure)") + print("-" * 45) + + try: + with agentops.init( + api_key=AGENTOPS_API_KEY, + trace_name="agent_error_test", + auto_start_session=False, + default_tags=["error-test", "agent"], + ): + print("โœ… Context manager entered") + + # Create agent + agent = ExampleAgent("LLMAgent") + + # Simulate successful operations first + result1 = agent.process_data("initial data") + print(f"๐Ÿ“Š First operation: {result1}") + + # Simulate LLM API failure + print("โš ๏ธ Simulating LLM API failure...") + raise ConnectionError("OpenAI API is temporarily unavailable") + + except ConnectionError as e: + print(f"โŒ Caught agent error: {e}") + print("โœ… Context manager properly cleaned up trace") + print("๐Ÿ’ก All successful operations before error are preserved") + + print("โœ… Scenario 2 completed\n") + + +def unrelated_error_example(): + """Example of handling unrelated application errors.""" + print("๐Ÿงช Scenario 3: Unrelated Application Error") + print("-" * 45) + + try: + with agentops.init( + api_key=AGENTOPS_API_KEY, + trace_name="app_error_test", + auto_start_session=False, + default_tags=["error-test", "application"], + ): + print("โœ… Context manager entered") + + # Create agent and do successful work + agent = ExampleAgent("AppAgent") + result1 = agent.process_data("business data") + sentiment = agent.analyze_sentiment("This is working great!") + + print(f"๐Ÿ“Š Processing: {result1}") + print(f"๐Ÿ’ญ Sentiment: {sentiment}") + + # Simulate unrelated application error + print("โš ๏ธ Simulating unrelated application error...") + result = 10 / 0 # ZeroDivisionError # noqa: F841 + + except ZeroDivisionError as e: + print(f"โŒ Caught application error: {e}") + print("โœ… Context manager properly cleaned up trace") + print("๐Ÿ’ก All AgentOps data recorded before error is preserved") + + print("โœ… Scenario 3 completed\n") + + +def nested_exception_example(): + """Example of nested operations with exceptions.""" + print("๐Ÿงช Scenario 4: Nested Operations with Exception") + print("-" * 50) + + try: + with agentops.init( + api_key=AGENTOPS_API_KEY, + trace_name="nested_error_test", + auto_start_session=False, + default_tags=["error-test", "nested"], + ): + print("โœ… Outer context started") + + agent = ExampleAgent("NestedAgent") + + # First level of operations + result1 = agent.process_data("level 1 data") + print(f"๐Ÿ“Š Level 1: {result1}") + + # Nested operation that fails + try: + print("๐Ÿ”„ Starting nested operation...") + agent.fetch_data("unreliable_source") + + # This nested operation fails + raise TimeoutError("Data source timeout") + + except TimeoutError as nested_error: + print(f"โš ๏ธ Nested operation failed: {nested_error}") + print("๐Ÿ”„ Continuing with fallback...") + + # Fallback operation + fallback_data = agent.fetch_data("backup_source") + print(f"๐Ÿ“ก Fallback successful: {fallback_data}") + + # Continue with main flow + final_result = agent.analyze_sentiment("Overall this worked well") + print(f"๐Ÿ’ญ Final analysis: {final_result}") + + except Exception as e: + print(f"โŒ Unexpected error: {e}") + + print("โœ… Scenario 4 completed - nested errors handled gracefully\n") + + +def exception_propagation_example(): + """Example showing that exceptions are properly propagated.""" + print("๐Ÿงช Scenario 5: Exception Propagation Test") + print("-" * 45) + + exception_was_caught = False + + try: + with agentops.init( + api_key=AGENTOPS_API_KEY, + trace_name="propagation_test", + auto_start_session=False, + default_tags=["error-test", "propagation"], + ): + print("โœ… Context manager entered") + + agent = ExampleAgent("PropagationAgent") + agent.process_data("test data") + + # Raise an exception that should propagate + raise ValueError("This exception should NOT be suppressed") + + except ValueError as e: + exception_was_caught = True + print(f"โœ… Exception properly propagated: {e}") + + if exception_was_caught: + print("โœ… Context manager does NOT suppress exceptions") + else: + print("โŒ Context manager incorrectly suppressed exception") + + print("โœ… Scenario 5 completed\n") + + +if __name__ == "__main__": + print("๐Ÿš€ AgentOps Exception Handling Examples") + print("=" * 50) + + try: + # Test different exception scenarios + span_recording_error_example() + agent_error_example() + unrelated_error_example() + nested_exception_example() + exception_propagation_example() + + print("=" * 50) + print("โœ… All exception handling examples completed!") + print("\n๐Ÿ”’ Key Guarantees:") + print(" โ€ข Traces are ALWAYS cleaned up, even with exceptions") + print(" โ€ข Exceptions are NEVER suppressed") + print(" โ€ข Error state is properly recorded for debugging") + print(" โ€ข All data recorded before errors is preserved") + print(" โ€ข Nested error handling works correctly") + + except Exception as e: + print(f"โŒ Error running examples: {e}") + import traceback + + traceback.print_exc() diff --git a/examples/context_manager/03_backward_compatibility.py b/examples/context_manager/03_backward_compatibility.py new file mode 100644 index 000000000..4d71aeb54 --- /dev/null +++ b/examples/context_manager/03_backward_compatibility.py @@ -0,0 +1,232 @@ +""" +Backward Compatibility Example + +This example demonstrates that existing AgentOps code continues to work +unchanged with the new context manager implementation. +""" + +import os + +import agentops +from dotenv import load_dotenv + +load_dotenv() + +# Get API key from environment +AGENTOPS_API_KEY = os.environ.get("AGENTOPS_API_KEY") + + +# Define agent class inline for standalone example +@agentops.agent +class BackwardCompatibilityAgent: + """Example agent for backward compatibility demonstration.""" + + def __init__(self, name: str): + self.name = name + print(f"๐Ÿค– Created agent: {self.name}") + + @agentops.task + def process_data(self, data: str) -> str: + """Process some data with the agent.""" + print(f"๐Ÿ“Š {self.name} processing: {data}") + result = f"Processed: {data}" + + # Use a tool as part of processing + tool_result = self.use_tool(result) + return tool_result + + @agentops.tool + def use_tool(self, input_data: str) -> str: + """Use a tool to transform data.""" + print(f"๐Ÿ”ง {self.name} using tool on: {input_data}") + return f"Tool output: {input_data.upper()}" + + @agentops.task + def analyze_sentiment(self, text: str) -> str: + """Analyze sentiment of text.""" + print(f"๐Ÿ’ญ {self.name} analyzing sentiment: {text}") + # Simple mock sentiment analysis + if "good" in text.lower() or "great" in text.lower(): + return "positive" + elif "bad" in text.lower() or "terrible" in text.lower(): + return "negative" + else: + return "neutral" + + +def legacy_session_management(): + """Example of legacy session management - still works!""" + print("๐Ÿ”„ Legacy Session Management:") + print("-" * 35) + + # Traditional initialization - exactly as before + session = agentops.init( + api_key=AGENTOPS_API_KEY, auto_start_session=True, default_tags=["legacy", "backward-compatibility"] + ) + + if session: + print("โœ… Session started using legacy approach") + + # Create and use agent - exactly as before + agent = BackwardCompatibilityAgent("LegacyAgent") + result = agent.process_data("legacy data") + print(f"๐Ÿ“ค Result: {result}") + + # Manually end session - exactly as before + agentops.end_session() + print("โœ… Session ended using legacy approach") + else: + print("โŒ Failed to start session") + + +def legacy_start_session_api(): + """Example using the legacy start_session API.""" + print("\n๐Ÿ”„ Legacy start_session API:") + print("-" * 35) + + # Using the old start_session function + session = agentops.start_session(api_key=AGENTOPS_API_KEY, tags=["legacy-api", "start-session"]) + + if session: + print("โœ… Session started using start_session()") + + agent = BackwardCompatibilityAgent("StartSessionAgent") + result = agent.analyze_sentiment("This legacy API still works great!") + print(f"๐Ÿ’ญ Sentiment: {result}") + + # End using legacy API + agentops.end_session() + print("โœ… Session ended using end_session()") + else: + print("โŒ Failed to start session with start_session()") + + +def mixed_usage_example(): + """Example showing mixed usage of old and new approaches.""" + print("\n๐Ÿ”„ Mixed Usage Example:") + print("-" * 30) + + # Start with legacy approach + session = agentops.init(api_key=AGENTOPS_API_KEY, auto_start_session=True, default_tags=["mixed", "legacy-start"]) + + if session: + print("โœ… Started with legacy init()") + + agent = BackwardCompatibilityAgent("MixedAgent") + result1 = agent.process_data("initial data") + print(f"๐Ÿ“Š Legacy result: {result1}") + + # End legacy session + agentops.end_session() + print("โœ… Ended legacy session") + + # Now use new context manager approach + print("\n๐Ÿ†• Switching to context manager:") + with agentops.init( + api_key=AGENTOPS_API_KEY, + trace_name="mixed_context", + auto_start_session=False, + default_tags=["mixed", "context-manager"], + ): + print("โœ… Started with context manager") + + agent2 = BackwardCompatibilityAgent("ContextAgent") + result2 = agent2.analyze_sentiment("Context managers are awesome!") + print(f"๐Ÿ’ญ Context result: {result2}") + + print("โœ… Context manager session ended automatically") + + +def legacy_decorators_example(): + """Example showing legacy decorator usage still works.""" + print("\n๐Ÿ”„ Legacy Decorators Example:") + print("-" * 35) + + # Define functions with legacy decorators + @agentops.session + def legacy_session_function(): + """Function decorated with @session - still works!""" + print("โœ… Inside @session decorated function") + + agent = BackwardCompatibilityAgent("SessionDecoratorAgent") + return agent.process_data("session decorator data") + + @agentops.trace + def legacy_trace_function(): + """Function decorated with @trace - still works!""" + print("โœ… Inside @trace decorated function") + + agent = BackwardCompatibilityAgent("TraceDecoratorAgent") + return agent.analyze_sentiment("Trace decorators work perfectly!") + + # Initialize AgentOps first + agentops.init(api_key=AGENTOPS_API_KEY) + + # Use legacy decorators + result1 = legacy_session_function() + print(f"๐Ÿ“Š Session decorator result: {result1}") + + result2 = legacy_trace_function() + print(f"๐Ÿ’ญ Trace decorator result: {result2}") + + +def legacy_manual_trace_management(): + """Example of manual trace management - advanced legacy usage.""" + print("\n๐Ÿ”„ Legacy Manual Trace Management:") + print("-" * 40) + + # Initialize without auto-start + agentops.init(api_key=AGENTOPS_API_KEY, auto_start_session=False) + + # Manually start trace + trace_context = agentops.start_trace(trace_name="manual_legacy_trace", tags=["manual", "legacy", "advanced"]) + + if trace_context: + print("โœ… Manually started trace") + + agent = BackwardCompatibilityAgent("ManualAgent") + result = agent.process_data("manual trace data") + print(f"๐Ÿ“Š Manual trace result: {result}") + + # Manually end trace + agentops.end_trace(trace_context, "Success") + print("โœ… Manually ended trace") + else: + print("โŒ Failed to start manual trace") + + +if __name__ == "__main__": + print("๐Ÿš€ AgentOps Backward Compatibility Examples") + print("=" * 55) + print("This demonstrates that ALL existing code continues to work!") + print("=" * 55) + + try: + # Show various legacy approaches still work + legacy_session_management() + legacy_start_session_api() + mixed_usage_example() + legacy_decorators_example() + legacy_manual_trace_management() + + print("\n" + "=" * 55) + print("โœ… All backward compatibility examples completed!") + print("\n๐Ÿ”’ Compatibility Guarantees:") + print(" โ€ข All existing init() calls work unchanged") + print(" โ€ข Legacy start_session()/end_session() APIs work") + print(" โ€ข All decorators (@session, @trace, @agent, etc.) work") + print(" โ€ข Manual trace management APIs work") + print(" โ€ข Mixed usage patterns are supported") + print(" โ€ข Zero breaking changes for existing code") + + print("\n๐Ÿ’ก Migration Benefits:") + print(" โ€ข Adopt context managers gradually") + print(" โ€ข Mix old and new patterns as needed") + print(" โ€ข No pressure to refactor existing code") + print(" โ€ข Get automatic cleanup benefits immediately") + + except Exception as e: + print(f"โŒ Error running examples: {e}") + import traceback + + traceback.print_exc() diff --git a/examples/context_manager/04_retry_logic.py b/examples/context_manager/04_retry_logic.py new file mode 100644 index 000000000..e25eecd9b --- /dev/null +++ b/examples/context_manager/04_retry_logic.py @@ -0,0 +1,291 @@ +""" +Retry Logic Example + +This example demonstrates how to implement retry patterns using +AgentOps context manager, with individual traces for each attempt. +""" + +import os +import random +import time + +import agentops +from dotenv import load_dotenv + +load_dotenv() + +# Get API key from environment +AGENTOPS_API_KEY = os.environ.get("AGENTOPS_API_KEY") + + +# Define agent class inline for standalone example +@agentops.agent +class ExampleAgent: + """A simple example agent for demonstrating AgentOps functionality.""" + + def __init__(self, name: str): + self.name = name + print(f"๐Ÿค– Created agent: {self.name}") + + @agentops.task + def process_data(self, data: str) -> str: + """Process some data with the agent.""" + print(f"๐Ÿ“Š {self.name} processing: {data}") + result = f"Processed: {data}" + + # Use a tool as part of processing + tool_result = self.use_tool(result) + return tool_result + + @agentops.tool + def use_tool(self, input_data: str) -> str: + """Use a tool to transform data.""" + print(f"๐Ÿ”ง {self.name} using tool on: {input_data}") + return f"Tool output: {input_data.upper()}" + + @agentops.task + def analyze_sentiment(self, text: str) -> str: + """Analyze sentiment of text.""" + print(f"๐Ÿ’ญ {self.name} analyzing sentiment: {text}") + # Simple mock sentiment analysis + if "good" in text.lower() or "great" in text.lower(): + return "positive" + elif "bad" in text.lower() or "terrible" in text.lower(): + return "negative" + else: + return "neutral" + + @agentops.tool + def fetch_data(self, source: str) -> str: + """Simulate fetching data from a source.""" + print(f"๐Ÿ“ก {self.name} fetching data from: {source}") + return f"Data from {source}: [mock data]" + + +def simple_retry_example(): + """Simple retry logic with context manager.""" + print("๐Ÿ”„ Simple Retry Logic Example") + print("-" * 35) + + max_retries = 3 + + for attempt in range(max_retries): + try: + with agentops.init( + api_key=AGENTOPS_API_KEY, + trace_name=f"retry_attempt_{attempt + 1}", + auto_start_session=False, + default_tags=["retry", f"attempt-{attempt + 1}", "simple"], + ): + print(f"๐ŸŽฏ Attempt {attempt + 1}/{max_retries}") + + agent = ExampleAgent(f"RetryAgent_Attempt{attempt + 1}") + + # Simulate operation that might fail + if attempt < 2: # Fail first two attempts + print("โš ๏ธ Simulating failure...") + raise ConnectionError(f"Simulated failure on attempt {attempt + 1}") + + # Success on third attempt + result = agent.process_data("retry operation data") + print(f"โœ… Success! Result: {result}") + break + + except ConnectionError as e: + print(f"โŒ Attempt {attempt + 1} failed: {e}") + if attempt == max_retries - 1: + print("๐Ÿ’ฅ All retry attempts exhausted") + raise + print("๐Ÿ”„ Retrying...") + time.sleep(0.5) # Brief delay between retries + + print("โœ… Simple retry completed\n") + + +def exponential_backoff_retry(): + """Retry with exponential backoff.""" + print("๐Ÿ“ˆ Exponential Backoff Retry Example") + print("-" * 40) + + max_retries = 4 + base_delay = 0.1 + + for attempt in range(max_retries): + delay = base_delay * (2**attempt) # Exponential backoff + + try: + with agentops.init( + api_key=AGENTOPS_API_KEY, + trace_name=f"backoff_retry_{attempt + 1}", + auto_start_session=False, + default_tags=["retry", "exponential-backoff", f"attempt-{attempt + 1}"], + ): + print(f"๐ŸŽฏ Attempt {attempt + 1}/{max_retries} (delay: {delay:.1f}s)") + + agent = ExampleAgent(f"BackoffAgent_Attempt{attempt + 1}") + + # Simulate random failure (70% chance of failure) + if random.random() < 0.7: + print("โš ๏ธ Random failure occurred...") + raise TimeoutError(f"Random timeout on attempt {attempt + 1}") + + # Success + result = agent.process_data("backoff operation data") + print(f"โœ… Success! Result: {result}") + break + + except TimeoutError as e: + print(f"โŒ Attempt {attempt + 1} failed: {e}") + if attempt == max_retries - 1: + print("๐Ÿ’ฅ All retry attempts exhausted") + raise + print(f"โณ Waiting {delay:.1f}s before retry...") + time.sleep(delay) + + print("โœ… Exponential backoff retry completed\n") + + +def conditional_retry_example(): + """Retry with different strategies based on error type.""" + print("๐ŸŽ›๏ธ Conditional Retry Example") + print("-" * 35) + + max_retries = 3 + + for attempt in range(max_retries): + try: + with agentops.init( + api_key=AGENTOPS_API_KEY, + trace_name=f"conditional_retry_{attempt + 1}", + auto_start_session=False, + default_tags=["retry", "conditional", f"attempt-{attempt + 1}"], + ): + print(f"๐ŸŽฏ Attempt {attempt + 1}/{max_retries}") + + agent = ExampleAgent(f"ConditionalAgent_Attempt{attempt + 1}") + + # Simulate different types of errors + error_type = random.choice(["network", "auth", "rate_limit", "success"]) + + if error_type == "network": + print("๐ŸŒ Network error occurred") + raise ConnectionError("Network connection failed") + elif error_type == "auth": + print("๐Ÿ” Authentication error occurred") + raise PermissionError("Authentication failed") + elif error_type == "rate_limit": + print("โฑ๏ธ Rate limit error occurred") + raise Exception("Rate limit exceeded") + else: + # Success + result = agent.process_data("conditional operation data") + print(f"โœ… Success! Result: {result}") + break + + except ConnectionError as e: + print(f"โŒ Network error: {e}") + if attempt < max_retries - 1: + print("๐Ÿ”„ Network errors are retryable, trying again...") + time.sleep(0.5) + else: + print("๐Ÿ’ฅ Max network retries reached") + raise + + except PermissionError as e: + print(f"โŒ Auth error: {e}") + print("๐Ÿšซ Authentication errors are not retryable") + raise + + except Exception as e: + print(f"โŒ Rate limit error: {e}") + if attempt < max_retries - 1: + delay = 1.0 * (attempt + 1) # Longer delay for rate limits + print(f"โณ Rate limit hit, waiting {delay}s...") + time.sleep(delay) + else: + print("๐Ÿ’ฅ Max rate limit retries reached") + raise + + print("โœ… Conditional retry completed\n") + + +def batch_retry_example(): + """Retry individual items in a batch operation.""" + print("๐Ÿ“ฆ Batch Retry Example") + print("-" * 25) + + items = ["item1", "item2", "item3", "item4", "item5"] + results = {} + max_retries = 2 + + for item in items: + print(f"\n๐ŸŽฏ Processing {item}") + + for attempt in range(max_retries): + try: + with agentops.init( + api_key=AGENTOPS_API_KEY, + trace_name=f"batch_{item}_attempt_{attempt + 1}", + auto_start_session=False, + default_tags=["batch", "retry", item, f"attempt-{attempt + 1}"], + ): + print(f" ๐Ÿ“Š Attempt {attempt + 1}/{max_retries} for {item}") + + agent = ExampleAgent(f"BatchAgent_{item}") + + # Simulate random failures (40% chance) + if random.random() < 0.4: + print(f" โš ๏ธ Processing {item} failed") + raise RuntimeError(f"Failed to process {item}") + + # Success + result = agent.process_data(f"batch data for {item}") + results[item] = result + print(f" โœ… {item} processed successfully") + break + + except RuntimeError as e: + print(f" โŒ {item} attempt {attempt + 1} failed: {e}") + if attempt == max_retries - 1: + print(f" ๐Ÿ’ฅ {item} failed after all retries") + results[item] = f"FAILED: {e}" + else: + print(f" ๐Ÿ”„ Retrying {item}...") + time.sleep(0.2) + + print("\n๐Ÿ“‹ Batch Results:") + for item, result in results.items(): + status = "โœ…" if not result.startswith("FAILED") else "โŒ" + print(f" {status} {item}: {result}") + + print("โœ… Batch retry completed\n") + + +if __name__ == "__main__": + print("๐Ÿš€ AgentOps Retry Logic Examples") + print("=" * 40) + + try: + # Set random seed for reproducible examples + random.seed(42) + + # Run different retry examples + simple_retry_example() + exponential_backoff_retry() + conditional_retry_example() + batch_retry_example() + + print("=" * 40) + print("โœ… All retry logic examples completed!") + print("\n๐Ÿ’ก Key Benefits:") + print(" โ€ข Each retry attempt gets its own trace") + print(" โ€ข Easy to debug which attempt succeeded/failed") + print(" โ€ข Clear visibility into retry patterns") + print(" โ€ข Automatic cleanup even if all retries fail") + print(" โ€ข Supports different retry strategies") + + except Exception as e: + print(f"โŒ Error running examples: {e}") + import traceback + + traceback.print_exc() diff --git a/examples/context_manager/README.md b/examples/context_manager/README.md new file mode 100644 index 000000000..8c2053b75 --- /dev/null +++ b/examples/context_manager/README.md @@ -0,0 +1,101 @@ +# AgentOps Context Manager Examples + +This directory contains comprehensive examples demonstrating the new context manager functionality for AgentOps. Each example focuses on a specific use case or pattern and is completely standalone. + +## ๐Ÿ“ Example Files + +### Core Examples +- **`01_basic_usage.py`** - Simple context manager usage and comparison with traditional approach +- **`02_exception_handling.py`** - How exceptions are handled and traces are cleaned up automatically +- **`03_backward_compatibility.py`** - Demonstrates that existing code continues to work unchanged +- **`04_retry_logic.py`** - Implementing retry patterns with individual traces per attempt + +## ๐Ÿš€ Quick Start + +1. **Set your API key:** + ```bash + export AGENTOPS_API_KEY="your-api-key-here" + ``` + Or create a `.env` file with: + ``` + AGENTOPS_API_KEY=your-api-key-here + ``` + +2. **Run any example:** + ```bash + python examples/context_manager/01_basic_usage.py + python examples/context_manager/02_exception_handling.py + python examples/context_manager/03_backward_compatibility.py + python examples/context_manager/04_retry_logic.py + ``` + +## ๐ŸŽฏ Key Benefits Demonstrated + +- **Automatic Lifecycle Management**: Traces are automatically started and ended +- **Exception Safety**: Guaranteed cleanup even when errors occur +- **Clear Scope Definition**: Easy to see what operations belong to which trace +- **Backward Compatibility**: Existing code continues to work unchanged +- **Flexible Patterns**: Supports retry logic, error recovery, and complex workflows + +## ๐Ÿ“– Learning Path + +1. **Start with `01_basic_usage.py`** to understand the fundamentals + - Compare traditional vs context manager approaches + - See automatic session lifecycle management + - Learn basic patterns + +2. **Review `02_exception_handling.py`** to see error handling + - Understand how exceptions are handled + - See guaranteed cleanup behavior + - Learn error state recording + +3. **Check `03_backward_compatibility.py`** for migration confidence + - See that existing code works unchanged + - Learn about mixed usage patterns + - Understand migration strategies + +4. **Explore `04_retry_logic.py`** for advanced patterns + - Implement retry logic with individual traces + - Handle different error types + - Use exponential backoff and conditional retries + +## ๐Ÿ’ก Example Features + +- **All examples are completely standalone** - no shared dependencies +- Each example includes its own agent class definition +- Examples use environment variables for API keys +- All examples include detailed comments explaining the patterns +- Check the console output to see trace lifecycle events +- Each example demonstrates different aspects of the context manager + +## ๐Ÿ”ง Context Manager Benefits + +### Traditional Approach +```python +session = agentops.init(api_key=API_KEY, auto_start_session=True) +# ... do work ... +agentops.end_session() # Must remember to call this! +``` + +### Context Manager Approach +```python +with agentops.init(api_key=API_KEY, trace_name="my_trace") as session: + # ... do work ... + # Session automatically ended, even if exceptions occur! +``` + +## ๐Ÿ›ก๏ธ Error Handling + +The context manager guarantees: +- **Traces are ALWAYS cleaned up**, even with exceptions +- **Exceptions are NEVER suppressed** +- **Error state is properly recorded** for debugging +- **All data recorded before errors is preserved** + +## ๐Ÿ”„ Migration Strategy + +You can adopt context managers gradually: +- **No breaking changes** - existing code continues to work +- **Mix old and new patterns** as needed +- **Adopt incrementally** - no pressure to refactor everything at once +- **Get automatic cleanup benefits** immediately for new code diff --git a/tests/unit/test_context_manager.py b/tests/unit/test_context_manager.py new file mode 100644 index 000000000..6e8cf05a7 --- /dev/null +++ b/tests/unit/test_context_manager.py @@ -0,0 +1,341 @@ +""" +Unit tests for the AgentOps context manager functionality. +""" + +import pytest +from unittest.mock import Mock, patch, MagicMock +from agentops.context_manager import AgentOpsContextManager, InitializationProxy +from agentops.legacy import Session +from agentops.sdk.core import TraceContext + + +class TestAgentOpsContextManager: + """Test the AgentOpsContextManager class.""" + + def setup_method(self): + """Set up test fixtures.""" + self.mock_client = Mock() + self.init_kwargs = { + "api_key": "test-key", + "trace_name": "test_trace", + "auto_start_session": False, + "default_tags": ["test"], + } + self.context_manager = AgentOpsContextManager(self.mock_client, self.init_kwargs) + + @patch("agentops.context_manager.TracingCore") + def test_enter_with_auto_start_session_true(self, mock_tracing_core): + """Test __enter__ when auto_start_session=True.""" + # Setup + mock_session = Mock() + mock_session.trace_context = Mock() # Add trace_context attribute + self.mock_client.init.return_value = mock_session + self.init_kwargs["auto_start_session"] = True + + # Execute + result = self.context_manager.__enter__() + + # Verify + assert result == mock_session + assert self.context_manager.managed_session == mock_session + self.mock_client.init.assert_called_once_with(**self.init_kwargs) + + @patch("agentops.context_manager.TracingCore") + @patch("agentops.context_manager.Session") + def test_enter_with_auto_start_session_false(self, mock_session_class, mock_tracing_core_class): + """Test __enter__ when auto_start_session=False.""" + # Setup + mock_tracing_core = Mock() + mock_tracing_core_class.get_instance.return_value = mock_tracing_core + mock_tracing_core.initialized = True + + mock_trace_context = Mock(spec=TraceContext) + mock_tracing_core.start_trace.return_value = mock_trace_context + + mock_session = Mock() + mock_session_class.return_value = mock_session + + self.mock_client.init.return_value = None + + # Execute + result = self.context_manager.__enter__() + + # Verify + assert result == mock_session + assert self.context_manager.managed_session == mock_session + assert self.context_manager._created_trace is True + mock_tracing_core.start_trace.assert_called_once_with(trace_name="test_trace", tags=["test"]) + + @patch("agentops.context_manager.TracingCore") + def test_enter_tracing_core_not_initialized(self, mock_tracing_core_class): + """Test __enter__ when TracingCore is not initialized.""" + # Setup + mock_tracing_core = Mock() + mock_tracing_core_class.get_instance.return_value = mock_tracing_core + mock_tracing_core.initialized = False + + self.mock_client.init.return_value = None + + # Execute + result = self.context_manager.__enter__() + + # Verify + assert result is None + + @patch("agentops.context_manager.TracingCore") + def test_exit_success(self, mock_tracing_core_class): + """Test __exit__ with successful completion.""" + # Setup + mock_tracing_core = Mock() + mock_tracing_core_class.get_instance.return_value = mock_tracing_core + mock_tracing_core.initialized = True + + mock_trace_context = Mock(spec=TraceContext) + self.context_manager.trace_context = mock_trace_context + self.context_manager._created_trace = True + + # Execute + result = self.context_manager.__exit__(None, None, None) + + # Verify + assert result is False # Don't suppress exceptions + mock_tracing_core.end_trace.assert_called_once_with(mock_trace_context, "Success") + + @patch("agentops.context_manager.TracingCore") + def test_exit_with_exception(self, mock_tracing_core_class): + """Test __exit__ when an exception occurred.""" + # Setup + mock_tracing_core = Mock() + mock_tracing_core_class.get_instance.return_value = mock_tracing_core + mock_tracing_core.initialized = True + + mock_trace_context = Mock(spec=TraceContext) + self.context_manager.trace_context = mock_trace_context + self.context_manager._created_trace = True + + # Execute + result = self.context_manager.__exit__(ValueError, ValueError("test"), None) + + # Verify + assert result is False # Don't suppress exceptions + mock_tracing_core.end_trace.assert_called_once_with(mock_trace_context, "Error") + + @patch("agentops.context_manager.TracingCore") + def test_exit_auto_started_session(self, mock_tracing_core_class): + """Test __exit__ with auto-started session (should not end it).""" + # Setup + mock_tracing_core = Mock() + mock_tracing_core_class.get_instance.return_value = mock_tracing_core + mock_tracing_core.initialized = True + + mock_session = Mock(spec=Session) + mock_session.trace_context = Mock() + self.context_manager.managed_session = mock_session + self.context_manager._created_trace = False # Not created by context manager + + # Execute + result = self.context_manager.__exit__(None, None, None) + + # Verify + assert result is False + mock_tracing_core.end_trace.assert_not_called() # Should not end auto-started session + + def test_getattr_delegation(self): + """Test that attribute access is delegated to the managed session.""" + # Setup + mock_session = Mock() + mock_session.some_method = Mock(return_value="test_result") + self.context_manager.managed_session = mock_session + + # Execute + result = self.context_manager.some_method() + + # Verify + assert result == "test_result" + mock_session.some_method.assert_called_once() + + def test_getattr_no_session(self): + """Test that AttributeError is raised when no session is available.""" + # Setup + self.context_manager.managed_session = None + + # Execute & Verify + with pytest.raises(AttributeError): + self.context_manager.some_method() + + +class TestInitializationProxy: + """Test the InitializationProxy class.""" + + def setup_method(self): + """Set up test fixtures.""" + self.mock_client = Mock() + self.init_kwargs = {"api_key": "test-key"} + self.proxy = InitializationProxy(self.mock_client, self.init_kwargs) + + def test_ensure_initialized(self): + """Test that _ensure_initialized calls client.init.""" + # Setup + mock_result = Mock() + self.mock_client.init.return_value = mock_result + + # Create a new proxy that hasn't been initialized yet + proxy = InitializationProxy(self.mock_client, self.init_kwargs) + + # Verify it was initialized during construction + assert proxy._initialized is True + assert proxy._result == mock_result + # Should be called once during construction + self.mock_client.init.assert_called_with(**self.init_kwargs) + + @patch("agentops.context_manager.AgentOpsContextManager") + def test_enter_delegates_to_context_manager(self, mock_context_manager_class): + """Test that __enter__ creates and delegates to AgentOpsContextManager.""" + # Setup + mock_context_manager = MagicMock() + mock_context_manager_class.return_value = mock_context_manager + mock_context_manager.__enter__.return_value = "test_session" + + # Execute + result = self.proxy.__enter__() + + # Verify + assert result == "test_session" + mock_context_manager_class.assert_called_once_with(self.mock_client, self.init_kwargs) + mock_context_manager.__enter__.assert_called_once() + + def test_exit_delegates_to_context_manager(self): + """Test that __exit__ delegates to the stored context manager.""" + # Setup + mock_context_manager = MagicMock() + mock_context_manager.__exit__.return_value = False + self.proxy._ctx_manager = mock_context_manager + + # Execute + result = self.proxy.__exit__(None, None, None) + + # Verify + assert result is False + mock_context_manager.__exit__.assert_called_once_with(None, None, None) + + def test_getattr_triggers_initialization(self): + """Test that attribute access works with initialized proxy.""" + # Setup + mock_result = Mock() + mock_result.some_attr = "test_value" + self.mock_client.init.return_value = mock_result + + # Create a new proxy (which will initialize immediately) + proxy = InitializationProxy(self.mock_client, self.init_kwargs) + + # Execute + result = proxy.some_attr + + # Verify + assert result == "test_value" + assert proxy._initialized is True + self.mock_client.init.assert_called_with(**self.init_kwargs) + + def test_bool_evaluation(self): + """Test boolean evaluation of the proxy.""" + # Setup + mock_result = Mock() + self.mock_client.init.return_value = mock_result + + # Execute + result = bool(self.proxy) + + # Verify + assert result is True # Mock objects are truthy + assert self.proxy._initialized is True + + def test_repr(self): + """Test string representation of the proxy.""" + # Setup + mock_result = Mock() + self.mock_client.init.return_value = mock_result + + # Create a new proxy (which initializes immediately) + proxy = InitializationProxy(self.mock_client, self.init_kwargs) + + # Test representation after initialization + repr_result = repr(proxy) + assert "InitializationProxy" in repr_result # Should clearly identify as proxy + assert "Mock" in repr_result # Should show the mock result + + def test_no_class_override(self): + """Test that __class__ is not overridden and isinstance works correctly.""" + # Setup + mock_result = Mock() + self.mock_client.init.return_value = mock_result + proxy = InitializationProxy(self.mock_client, self.init_kwargs) + + # Verify that isinstance works correctly + assert isinstance(proxy, InitializationProxy) + assert not isinstance(proxy, Session) # Should not pretend to be a Session + assert type(proxy).__name__ == "InitializationProxy" + assert proxy.__class__ == InitializationProxy + + def test_get_wrapped_result(self): + """Test the get_wrapped_result method for explicit access to wrapped object.""" + # Setup + mock_result = Mock() + self.mock_client.init.return_value = mock_result + proxy = InitializationProxy(self.mock_client, self.init_kwargs) + + # Execute + wrapped_result = proxy.get_wrapped_result() + + # Verify + assert wrapped_result == mock_result + assert proxy._initialized is True + + def test_get_proxy_type(self): + """Test the get_proxy_type method for debugging purposes.""" + proxy = InitializationProxy(self.mock_client, self.init_kwargs) + assert proxy.get_proxy_type() == "InitializationProxy" + + def test_is_context_manager(self): + """Test the is_context_manager method.""" + proxy = InitializationProxy(self.mock_client, self.init_kwargs) + assert proxy.is_context_manager() is True + + +class TestIntegration: + """Integration tests for the context manager functionality.""" + + @patch("agentops.context_manager.TracingCore") + @patch("agentops.context_manager.Session") + def test_full_context_manager_flow(self, mock_session_class, mock_tracing_core_class): + """Test the complete flow of using the context manager.""" + # Setup + mock_client = Mock() + mock_client.init.return_value = None + + mock_tracing_core = Mock() + mock_tracing_core_class.get_instance.return_value = mock_tracing_core + mock_tracing_core.initialized = True + + mock_trace_context = Mock(spec=TraceContext) + mock_tracing_core.start_trace.return_value = mock_trace_context + + mock_session = Mock(spec=Session) + mock_session_class.return_value = mock_session + + init_kwargs = {"api_key": "test-key", "trace_name": "integration_test", "auto_start_session": False} + + # Execute + proxy = InitializationProxy(mock_client, init_kwargs) + + with proxy as session: + assert session == mock_session + # Simulate some work + session.record("test_event") + + # Verify + # init is called twice: once during proxy construction, once during context manager entry + assert mock_client.init.call_count == 2 + mock_client.init.assert_called_with(**init_kwargs) + mock_tracing_core.start_trace.assert_called_once() + mock_tracing_core.end_trace.assert_called_once_with(mock_trace_context, "Success") + mock_session.record.assert_called_once_with("test_event") diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index bf20aa764..6329dc328 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -92,6 +92,7 @@ def test_auto_start_session_true(mock_tracing_core, mock_api_client, mock_trace_ """Test initializing with auto_start_session=True""" import agentops from agentops.legacy import Session + from agentops.context_manager import InitializationProxy # Mock the start_trace method to return our mock trace context mock_tracing_core.start_trace.return_value = mock_trace_context @@ -101,15 +102,19 @@ def test_auto_start_session_true(mock_tracing_core, mock_api_client, mock_trace_ # Verify a trace was auto-started mock_tracing_core.start_trace.assert_called_once() - # init() should return a Session object when auto-starting a session - assert isinstance(result, Session) - assert result.trace_context == mock_trace_context + # init() now returns an InitializationProxy, not directly a Session + assert isinstance(result, InitializationProxy) + # But the wrapped result should be a Session when auto_start_session=True + wrapped_result = result.get_wrapped_result() + assert isinstance(wrapped_result, Session) + assert wrapped_result.trace_context == mock_trace_context def test_auto_start_session_default(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): """Test initializing with default auto_start_session behavior""" import agentops from agentops.legacy import Session + from agentops.context_manager import InitializationProxy # Mock the start_trace method to return our mock trace context mock_tracing_core.start_trace.return_value = mock_trace_context @@ -119,9 +124,12 @@ def test_auto_start_session_default(mock_tracing_core, mock_api_client, mock_tra # Verify that the client was initialized assert agentops._client.initialized - # Since auto_start_session defaults to True, init() should return a Session object - assert isinstance(result, Session) - assert result.trace_context == mock_trace_context + # init() now returns an InitializationProxy, not directly a Session + assert isinstance(result, InitializationProxy) + # But the wrapped result should be a Session when auto_start_session defaults to True + wrapped_result = result.get_wrapped_result() + assert isinstance(wrapped_result, Session) + assert wrapped_result.trace_context == mock_trace_context def test_start_trace_without_init(): diff --git a/tests/unit/test_session_legacy.py b/tests/unit/test_session_legacy.py index 509efa055..46ca9a8d4 100644 --- a/tests/unit/test_session_legacy.py +++ b/tests/unit/test_session_legacy.py @@ -1,11 +1,16 @@ def test_session_auto_start(instrumentation): import agentops from agentops.legacy import Session + from agentops.context_manager import InitializationProxy # Pass a dummy API key for the test session = agentops.init(api_key="test-api-key", auto_start_session=True) - assert isinstance(session, Session) + # init() now returns an InitializationProxy, not directly a Session + assert isinstance(session, InitializationProxy) + # But the wrapped result should be a Session when auto_start_session=True + wrapped_result = session.get_wrapped_result() + assert isinstance(wrapped_result, Session) def test_crewai_backwards_compatibility(instrumentation):