diff --git a/agentops/__init__.py b/agentops/__init__.py index c0e0f3c95..fc4ceb273 100755 --- a/agentops/__init__.py +++ b/agentops/__init__.py @@ -12,6 +12,17 @@ LLMEvent, ) # type: ignore +# Import all required modules at the top +from opentelemetry.trace import get_current_span +from agentops.semconv import ( + AgentAttributes, + ToolAttributes, + WorkflowAttributes, + CoreAttributes, + SpanKind, + SpanAttributes, +) +import json from typing import List, Optional, Union, Dict, Any from agentops.client import Client from agentops.sdk.core import TraceContext, tracer @@ -255,6 +266,181 @@ def end_trace( tracer.end_trace(trace_context=trace_context, end_state=end_state) +def update_trace_metadata(metadata: Dict[str, Any], prefix: str = "trace.metadata") -> bool: + """ + Update metadata on the current running trace. + + Args: + metadata: Dictionary of key-value pairs to set as trace metadata. + Values must be strings, numbers, booleans, or lists of these types. + Lists are converted to JSON string representation. + Keys can be either custom keys or semantic convention aliases. + prefix: Prefix for metadata attributes (default: "trace.metadata"). + Ignored for semantic convention attributes. + + Returns: + bool: True if metadata was successfully updated, False otherwise. + + """ + if not tracer.initialized: + logger.warning("AgentOps SDK not initialized. Cannot update trace metadata.") + return False + + # Build semantic convention mappings dynamically + def build_semconv_mappings(): + """Build mappings from user-friendly keys to semantic convention attributes.""" + mappings = {} + + # Helper function to extract attribute name from semantic convention + def extract_key_from_attr(attr_value: str) -> str: + parts = attr_value.split(".") + if len(parts) >= 2: + # Handle special cases + if parts[0] == "error": + # error.type -> error_type + return "_".join(parts) + else: + # Default: entity.attribute -> entity_attribute + return "_".join(parts) + return attr_value + + # Process each semantic convention class + for cls in [AgentAttributes, ToolAttributes, WorkflowAttributes, CoreAttributes, SpanAttributes]: + for attr_name, attr_value in cls.__dict__.items(): + if not attr_name.startswith("_") and isinstance(attr_value, str): + # Skip gen_ai attributes + if attr_value.startswith("gen_ai."): + continue + + # Generate user-friendly key + user_key = extract_key_from_attr(attr_value) + mappings[user_key] = attr_value + + # Add some additional convenience mappings + if attr_value == CoreAttributes.TAGS: + mappings["tags"] = attr_value + + return mappings + + # Build mappings if using semantic conventions + SEMCONV_MAPPINGS = build_semconv_mappings() + + # Collect all valid semantic convention attributes + VALID_SEMCONV_ATTRS = set() + for cls in [AgentAttributes, ToolAttributes, WorkflowAttributes, CoreAttributes, SpanAttributes]: + for key, value in cls.__dict__.items(): + if not key.startswith("_") and isinstance(value, str): + # Include all attributes except gen_ai ones + if not value.startswith("gen_ai."): + VALID_SEMCONV_ATTRS.add(value) + + # Find the current trace span + span = None + + # Get the current span from OpenTelemetry context + current_span = get_current_span() + + # Check if the current span is valid and recording + if current_span and hasattr(current_span, "is_recording") and current_span.is_recording(): + # Check if this is a trace/session span or a child span + span_name = getattr(current_span, "name", "") + + # If it's a session/trace span, use it directly + if span_name.endswith(f".{SpanKind.SESSION}"): + span = current_span + else: + # It's a child span, try to find the root trace span + # Get all active traces + active_traces = tracer.get_active_traces() + if active_traces: + # Find the trace that contains the current span + current_trace_id = current_span.get_span_context().trace_id + + for trace_id_str, trace_ctx in active_traces.items(): + try: + # Convert hex string back to int for comparison + trace_id = int(trace_id_str, 16) + if trace_id == current_trace_id: + span = trace_ctx.span + break + except (ValueError, AttributeError): + continue + + # If we couldn't find the parent trace, use the current span + if not span: + span = current_span + else: + # No active traces, use the current span + span = current_span + + # If no current span or it's not recording, check active traces + if not span: + active_traces = tracer.get_active_traces() + if active_traces: + # Get the most recently created trace (last in the dict) + trace_context = list(active_traces.values())[-1] + span = trace_context.span + logger.debug("Using most recent active trace for metadata update") + else: + logger.warning("No active trace found. Cannot update metadata.") + return False + + # Ensure the span is recording before updating + if not span or (hasattr(span, "is_recording") and not span.is_recording()): + logger.warning("Span is not recording. Cannot update metadata.") + return False + + # Update the span attributes with the metadata + try: + updated_count = 0 + for key, value in metadata.items(): + # Validate the value type + if value is None: + continue + + # Convert lists to JSON string representation for OpenTelemetry compatibility + if isinstance(value, list): + # Ensure all list items are valid types + if all(isinstance(item, (str, int, float, bool)) for item in value): + value = json.dumps(value) + else: + logger.warning(f"Skipping metadata key '{key}': list contains invalid types") + continue + elif not isinstance(value, (str, int, float, bool)): + logger.warning(f"Skipping metadata key '{key}': value type {type(value)} not supported") + continue + + # Determine the attribute key + attribute_key = key + + # Check if key is already a valid semantic convention attribute + if key in VALID_SEMCONV_ATTRS: + # Key is already a valid semantic convention, use as-is + attribute_key = key + elif key in SEMCONV_MAPPINGS: + # It's a user-friendly key, map it to semantic convention + attribute_key = SEMCONV_MAPPINGS[key] + logger.debug(f"Mapped '{key}' to semantic convention '{attribute_key}'") + else: + # Not a semantic convention, use with prefix + attribute_key = f"{prefix}.{key}" + + # Set the attribute + span.set_attribute(attribute_key, value) + updated_count += 1 + + if updated_count > 0: + logger.debug(f"Successfully updated {updated_count} metadata attributes on trace") + return True + else: + logger.warning("No valid metadata attributes were updated") + return False + + except Exception as e: + logger.error(f"Error updating trace metadata: {e}") + return False + + __all__ = [ "init", "configure", @@ -262,6 +448,7 @@ def end_trace( "record", "start_trace", "end_trace", + "update_trace_metadata", "start_session", "end_session", "track_agent", diff --git a/docs/v2/quickstart.mdx b/docs/v2/quickstart.mdx index c31b11a3d..10d7adb7d 100644 --- a/docs/v2/quickstart.mdx +++ b/docs/v2/quickstart.mdx @@ -171,6 +171,23 @@ def my_workflow_decorated(task_to_perform): # raise ``` +### Updating Trace Metadata + +You can also update metadata on running traces to add context or track progress: + +```python +from agentops import update_trace_metadata + +# Update metadata during trace execution +update_trace_metadata({ + "operation_name": "AI Agent Processing", + "processing_stage": "data_validation", + "records_processed": 1500, + "user_id": "user_123", + "tags": ["validation", "production"] +}) +``` + ## Complete Example with Decorators Here's a consolidated example showcasing how these decorators can work together: diff --git a/docs/v2/usage/manual-trace-control.mdx b/docs/v2/usage/manual-trace-control.mdx index c3db00a03..43ca5a462 100644 --- a/docs/v2/usage/manual-trace-control.mdx +++ b/docs/v2/usage/manual-trace-control.mdx @@ -77,6 +77,176 @@ for i, item in enumerate(batch_items): agentops.end_trace(trace, "Error") ``` +## Updating Trace Metadata During Execution + +You can update metadata on running traces at any point during execution using the `update_trace_metadata` function. This is useful for adding context, tracking progress, or storing intermediate results. + +### Basic Metadata Updates + +```python +import agentops + +# Initialize AgentOps +agentops.init("your-api-key", auto_start_session=False) + +# Start a trace with initial tags +trace = agentops.start_trace("ai-agent-workflow", tags=["startup", "initialization"]) + +# Your AI agent code runs here... +process_user_request() + +# Update metadata with results +agentops.update_trace_metadata({ + "operation_name": "AI Agent Processing Complete", + "stage": "completed", + "response_quality": "high", + "tags": ["ai-agent", "completed", "success"] # Tags show current status +}) + +# End the trace +agentops.end_trace(trace, "Success") +``` + +### Semantic Convention Support + +The function automatically maps user-friendly keys to semantic conventions when possible: + +```python +# These keys will be mapped to semantic conventions +agentops.update_trace_metadata({ + "operation_name": "AI Agent Data Processing", + "tags": ["production", "batch-job", "gpt-4"], # Maps to core.tags + "agent_name": "DataProcessorAgent", # Maps to agent.name + "workflow_name": "Intelligent ETL Pipeline", # Maps to workflow.name +}) + +``` + +### Advanced Metadata with Custom Prefix + +You can specify a custom prefix for your metadata attributes: + +```python +# Use a custom prefix for business-specific metadata +agentops.update_trace_metadata({ + "customer_id": "CUST_456", + "order_value": 99.99, + "payment_method": "credit_card", + "agent_interaction": "customer_support" +}, prefix="business") + +# Results in: +# business.customer_id = "CUST_456" +# business.order_value = 99.99 +# business.payment_method = "credit_card" +``` + +### Real-World Example: Progress Tracking + +Here's how to use metadata updates to track progress through a complex workflow: + +```python +import agentops +from agentops.sdk.decorators import operation + +agentops.init(auto_start_session=False) + +@operation +def process_batch(batch_data): + # Simulate batch processing + return f"Processed {len(batch_data)} items" + +def run_etl_pipeline(data_batches): + """ETL pipeline with progress tracking via metadata""" + + trace = agentops.start_trace("etl-pipeline", tags=["data-processing"]) + + total_batches = len(data_batches) + processed_records = 0 + + # Initial metadata + agentops.update_trace_metadata({ + "operation_name": "ETL Pipeline Execution", + "pipeline_stage": "starting", + "total_batches": total_batches, + "processed_batches": 0, + "processed_records": 0, + "estimated_completion": "calculating...", + "tags": ["etl", "data-processing", "async-operation"] + }) + + try: + for i, batch in enumerate(data_batches): + # Update progress + agentops.update_trace_metadata({ + "pipeline_stage": "processing", + "current_batch": i + 1, + "processed_batches": i, + "progress_percentage": round((i / total_batches) * 100, 2) + }) + + # Process the batch + result = process_batch(batch) + processed_records += len(batch) + + # Update running totals + agentops.update_trace_metadata({ + "processed_records": processed_records, + "last_batch_result": result + }) + + # Final metadata update + agentops.update_trace_metadata({ + "operation_name": "ETL Pipeline Completed", + "pipeline_stage": "completed", + "processed_batches": total_batches, + "progress_percentage": 100.0, + "completion_status": "success", + "total_execution_time": "calculated_automatically", + "tags": ["etl", "completed", "success"] + }) + + agentops.end_trace(trace, "Success") + + except Exception as e: + # Error metadata + agentops.update_trace_metadata({ + "operation_name": "ETL Pipeline Failed", + "pipeline_stage": "failed", + "error_message": str(e), + "completion_status": "error", + "failed_at_batch": i + 1 if 'i' in locals() else 0, + "tags": ["etl", "failed", "error"] + }) + + agentops.end_trace(trace, "Error") + raise + +# Example usage +data_batches = [ + ["record1", "record2", "record3"], + ["record4", "record5"], + ["record6", "record7", "record8", "record9"] +] + +run_etl_pipeline(data_batches) +``` + +### Supported Data Types + +The `update_trace_metadata` function supports various data types: + +```python +agentops.update_trace_metadata({ + "operation_name": "Multi-type Data Example", + "successful_operation": True, + "tags": ["example", "demo", "multi-agent"], + "processing_steps": ["validation", "transformation", "output"] +}) + +# Note: Lists are automatically converted to JSON strings for OpenTelemetry compatibility +``` + ## Integration with Decorators Manual trace control works seamlessly with AgentOps decorators: diff --git a/docs/v2/usage/sdk-reference.mdx b/docs/v2/usage/sdk-reference.mdx index c6bc046bb..b8b0bc146 100644 --- a/docs/v2/usage/sdk-reference.mdx +++ b/docs/v2/usage/sdk-reference.mdx @@ -157,6 +157,51 @@ agentops.end_trace(trace, "Success") agentops.end_trace(end_state="Emergency_Shutdown") ``` +### `update_trace_metadata()` + +Updates metadata on the currently running trace. This is useful for adding context, tracking progress, or storing intermediate results during trace execution. + +**Parameters**: + +- `metadata` (Dict[str, Any]): Dictionary of key-value pairs to set as trace metadata. Values must be strings, numbers, booleans, or lists of these types. Lists are automatically converted to JSON string representation. +- `prefix` (str, optional): Prefix for metadata attributes. Defaults to "trace.metadata". Ignored for semantic convention attributes. + +**Returns**: + +- `bool`: True if metadata was successfully updated, False otherwise. + +**Features**: + +- **Semantic Convention Support**: User-friendly keys like "tags", "agent_name", "workflow_name" are automatically mapped to OpenTelemetry semantic conventions. +- **Custom Attributes**: Non-semantic keys are prefixed with the specified prefix (default: "trace.metadata"). +- **Type Safety**: Validates input types and converts lists to JSON strings for OpenTelemetry compatibility. +- **Error Handling**: Returns boolean success indicator and logs warnings for invalid data. + +**Example**: + +```python +import agentops +from agentops import update_trace_metadata + +# Initialize and start trace with initial tags +agentops.init(auto_start_session=False) +trace = agentops.start_trace("ai-workflow", tags=["startup", "initialization"]) + +# Your code here... + +# Update metadata mid-run with new tags and operation info +update_trace_metadata({ + "operation_name": "OpenAI GPT-4o-mini", + "tags": ["ai-agent", "processing", "gpt-4"], # Updates tags + "status": "processing" +}) + +# End the trace +agentops.end_trace(trace, "Success") +``` + +For detailed examples and use cases, see [Manual Trace Control](/v2/usage/manual-trace-control#updating-trace-metadata-during-execution). + ## Decorators for Detailed Instrumentation For more granular control, AgentOps provides decorators that explicitly track different components of your application. **The `@trace` decorator is the recommended approach for creating custom traces**, especially in multi-threaded environments. These decorators are imported from `agentops.sdk.decorators`.