diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/examples/manual/main.py b/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/examples/manual/main.py index 959f9607a1..0b40e31084 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/examples/manual/main.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/examples/manual/main.py @@ -183,9 +183,9 @@ def llm_invocation_demo(llm: ChatOpenAI): print(f"LLM output: {getattr(result, 'content', result)}") _flush_evaluations() # flush after second invocation -def embedding_invocation_demo(): +def embedding_invocation_demo(embeddings : AzureOpenAIEmbeddings): """Demonstrate OpenAI embeddings with telemetry. - + Shows: - Single query embedding (embed_query) - Batch document embeddings (embed_documents) @@ -193,29 +193,18 @@ def embedding_invocation_demo(): """ print("\n--- Embedding Invocation Demo ---") - endpoint = "https://etser-mf7gfr7m-eastus2.cognitiveservices.azure.com/" - deployment = "text-embedding-3-large" - - # Initialize embeddings model - embeddings = AzureOpenAIEmbeddings( # or "2023-05-15" if that's your API version - model=deployment, - azure_endpoint=endpoint, - api_key=os.getenv("AZURE_OPENAI_API_KEY"), - openai_api_version="2024-12-01-preview", - ) - # Demo 1: Single query embedding print("\n1. Single Query Embedding:") query = "What is the capital of France?" print(f" Query: {query}") - + try: query_vector = embeddings.embed_query(query) print(f" ✓ Embedded query into {len(query_vector)} dimensions") print(f" First 5 values: {query_vector[:5]}") except Exception as e: print(f" ✗ Error: {e}") - + # Demo 2: Batch document embeddings print("\n2. Batch Document Embeddings:") documents = [ @@ -225,7 +214,7 @@ def embedding_invocation_demo(): "Madrid is the capital of Spain.", ] print(f" Documents: {len(documents)} texts") - + try: doc_vectors = embeddings.embed_documents(documents) print(f" ✓ Embedded {len(doc_vectors)} documents") @@ -233,7 +222,7 @@ def embedding_invocation_demo(): print(f" First document vector (first 5): {doc_vectors[0][:5]}") except Exception as e: print(f" ✗ Error: {e}") - + # Demo 3: Mixed content embeddings print("\n3. Mixed Content Embeddings:") mixed_texts = [ @@ -241,7 +230,7 @@ def embedding_invocation_demo(): "LangChain simplifies LLM applications", "Vector databases store embeddings", ] - + try: mixed_vectors = embeddings.embed_documents(mixed_texts) print(f" ✓ Embedded {len(mixed_vectors)} mixed content texts") @@ -249,10 +238,136 @@ def embedding_invocation_demo(): print(f" - Text {i+1}: {text[:40]}... → {len(mixed_vectors[i])}D vector") except Exception as e: print(f" ✗ Error: {e}") - + print("\n--- End Embedding Demo ---\n") _flush_evaluations() +def retrieval_invocation_demo(embeddings : AzureOpenAIEmbeddings): + """Demonstrate retrieval operations with telemetry. + + Shows: + - Document loading and splitting + - Vector store creation with embeddings + - Similarity search (retrieval) + - Retrieval with scores + - Telemetry capture for retrieval operations + """ + print("\n--- Retrieval Invocation Demo ---") + + try: + from langchain_community.vectorstores import FAISS + from langchain_core.documents import Document + from langchain_text_splitters import CharacterTextSplitter + except ImportError: # pragma: no cover - optional dependency + print("FAISS or text splitters not installed; skipping retrieval demo.") + return + + # Create sample documents + documents = [ + Document( + page_content="Paris is the capital and most populous city of France. It is known for the Eiffel Tower and the Louvre Museum.", + metadata={"source": "geography", "country": "France"} + ), + Document( + page_content="Berlin is the capital of Germany. It is famous for its history, culture, and the Brandenburg Gate.", + metadata={"source": "geography", "country": "Germany"} + ), + Document( + page_content="Rome is the capital of Italy. It is home to ancient ruins like the Colosseum and the Roman Forum.", + metadata={"source": "geography", "country": "Italy"} + ), + Document( + page_content="Madrid is the capital of Spain. It features world-class museums like the Prado and vibrant nightlife.", + metadata={"source": "geography", "country": "Spain"} + ), + Document( + page_content="OpenTelemetry is an observability framework for cloud-native software. It provides APIs and tools for collecting telemetry data.", + metadata={"source": "technology", "topic": "observability"} + ), + Document( + page_content="LangChain is a framework for developing applications powered by language models. It simplifies building LLM applications.", + metadata={"source": "technology", "topic": "ai"} + ), + ] + + print(f"\n1. Creating Vector Store:") + print(f" Documents: {len(documents)} texts") + + try: + # Create vector store from documents + vectorstore = FAISS.from_documents(documents, embeddings) + print(f" ✓ Vector store created with {len(documents)} documents") + except Exception as e: + print(f" ✗ Error creating vector store: {e}") + return + + # Convert vectorstore to retriever for proper callback invocation + retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) + + # Demo 1: Basic retrieval using retriever + print("\n2. Basic Retrieval (Top 3):") + query = "What is the capital of France?" + print(f" Query: {query}") + + try: + results = retriever.invoke(query) + print(f" ✓ Retrieved {len(results)} documents") + for i, doc in enumerate(results, 1): + print(f" {i}. {doc.page_content[:60]}... [country: {doc.metadata.get('country', 'N/A')}]") + except Exception as e: + print(f" ✗ Error: {e}") + + # Demo 2: Retrieval with different query + print("\n3. Technology Query:") + query = "Tell me about observability and telemetry" + print(f" Query: {query}") + + try: + results = retriever.invoke(query) + print(f" ✓ Retrieved {len(results)} documents") + for i, doc in enumerate(results, 1): + print(f" {i}. {doc.page_content[:50]}...") + print(f" Metadata: {doc.metadata}") + except Exception as e: + print(f" ✗ Error: {e}") + + # Demo 3: MMR retriever for diversity + print("\n4. MMR Retrieval (Diverse Results):") + mmr_retriever = vectorstore.as_retriever( + search_type="mmr", + search_kwargs={"k": 3, "fetch_k": 6} + ) + query = "capital cities" + print(f" Query: {query}") + + try: + mmr_results = mmr_retriever.invoke(query) + print(f" ✓ Retrieved {len(mmr_results)} diverse documents") + for i, doc in enumerate(mmr_results, 1): + print(f" {i}. {doc.page_content[:60]}... [country: {doc.metadata.get('country', 'N/A')}]") + except Exception as e: + print(f" ✗ Error: {e}") + + # Demo 4: Similarity score threshold retriever + print("\n5. Similarity Score Threshold:") + threshold_retriever = vectorstore.as_retriever( + search_type="similarity_score_threshold", + search_kwargs={"score_threshold": 0.5, "k": 5} + ) + query = "European capitals" + print(f" Query: {query}") + + try: + threshold_results = threshold_retriever.invoke(query) + print(f" ✓ Retrieved {len(threshold_results)} documents above threshold") + for i, doc in enumerate(threshold_results, 1): + print(f" {i}. {doc.page_content[:60]}...") + except Exception as e: + print(f" ✗ Error: {e}") + + print("\n--- End Retrieval Demo ---\n") + _flush_evaluations() + def simple_agent_demo(llm: ChatOpenAI): """Simple single-agent LangGraph demo (renamed from agent_demo). @@ -735,12 +850,27 @@ def main(): model_kwargs={"user": json.dumps(user_md)} if user_md else {}, # always supply dict ) + endpoint = "https://etser-mf7gfr7m-eastus2.cognitiveservices.azure.com/" + deployment = "text-embedding-3-large" + + # Initialize embeddings model + embeddings = AzureOpenAIEmbeddings( + model=deployment, + azure_endpoint=endpoint, + api_key=os.getenv("AZURE_OPENAI_API_KEY"), + openai_api_version="2024-12-01-preview", + ) + # LLM invocation demo (simple) - # llm_invocation_demo(llm) + #llm_invocation_demo(llm) # Embedding invocation demo - # TODO: fix api keys - # embedding_invocation_demo() + # TODO: CIRCUIT doesn't support embeddings yet + #embedding_invocation_demo(embeddings) + + # Retrieval invocation demo + # TODO: CIRCUIT doesn't support embeddings yet + #retrieval_invocation_demo(embeddings) # Determine which demo to run (env GENAI_DEMO_MODE=multi or arg 'multi') mode = os.getenv("GENAI_DEMO_MODE", "simple").lower() diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/src/opentelemetry/instrumentation/langchain/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/src/opentelemetry/instrumentation/langchain/__init__.py index e027b5b6c4..3c750b37c2 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/src/opentelemetry/instrumentation/langchain/__init__.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/src/opentelemetry/instrumentation/langchain/__init__.py @@ -38,13 +38,14 @@ { "module": "langchain_openai.embeddings", "class_name": "OpenAIEmbeddings", - "methods": ["embed_query", "embed_documents"], - }, - { - "module": "langchain_openai.embeddings", - "class_name": "AzureOpenAIEmbeddings", - "methods": ["embed_query", "embed_documents"], + "methods": ["embed_query"], }, + # comment for now as it causes duplicate spans/instrumentation + # { + # "module": "langchain_openai.embeddings", + # "class_name": "AzureOpenAIEmbeddings", + # "methods": ["embed_query", "embed_documents"], + # }, { "module": "langchain_huggingface.embeddings", "class_name": "HuggingFaceEmbeddings", diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/src/opentelemetry/instrumentation/langchain/callback_handler.py index c495a8f46a..306f95f4d0 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/src/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -56,6 +56,7 @@ LLMInvocation as UtilLLMInvocation, OutputMessage as UtilOutputMessage, Task as UtilTask, + RetrievalInvocation as UtilRetrievalInvocation, Text as UtilText, Workflow as UtilWorkflow, ) @@ -63,6 +64,24 @@ from .utils import get_property_value +def _extract_class_name_from_serialized(serialized: Optional[dict[str, Any]]) -> str: + """Extract class name from serialized model information. + + Args: + serialized: Serialized model information from LangChain callback + + Returns: + Class name string, or empty string if not found + """ + class_id = (serialized or {}).get("id", []) + if isinstance(class_id, list) and len(class_id) > 0: + return class_id[-1] + elif class_id: + return str(class_id) + else: + return "" + + def _sanitize_metadata_value(value: Any) -> Any: """Convert metadata values to OpenTelemetry-compatible types.""" if value is None: @@ -155,6 +174,7 @@ def __init__( self._telemetry_handler = handler self._entities: dict[UUID, GenAI] = {} self._llms: dict[UUID, UtilLLMInvocation] = {} + self._retrievals: dict[UUID, UtilRetrievalInvocation] = {} self._lock = Lock() self._payload_truncation_bytes = 8 * 1024 # Implicit parent entity stack (workflow/agent) for contexts where @@ -173,10 +193,14 @@ def _get_name_from_callback( return serialized["kwargs"]["name"] if kwargs.get("name"): return kwargs["name"] - if serialized.get("name"): + if serialized and serialized.get("name"): return serialized["name"] - if "id" in serialized: - return serialized["id"][-1] + if serialized and "id" in serialized: + id_value = serialized["id"] + if isinstance(id_value, list) and len(id_value) > 0: + return id_value[-1] + elif isinstance(id_value, str): + return id_value return "unknown" @@ -484,6 +508,69 @@ def _serialize_payload(self, payload: Any) -> Optional[str]: except Exception: # pragma: no cover - defensive return None + def _detect_vector_store(self, serialized: Optional[dict[str, Any]]) -> Optional[str]: + """Detect vector store type from serialized info.""" + class_name = _extract_class_name_from_serialized(serialized) + if not class_name: + return None + + class_lower = class_name.lower() + if "faiss" in class_lower: + return "faiss" + elif "chroma" in class_lower: + return "chroma" + elif "pinecone" in class_lower: + return "pinecone" + elif "qdrant" in class_lower: + return "qdrant" + elif "milvus" in class_lower: + return "milvus" + elif "weaviate" in class_lower: + return "weaviate" + elif "redis" in class_lower: + return "redis" + return class_name + + def _detect_db_system(self, serialized: Optional[dict[str, Any]]) -> Optional[str]: + """Detect database system type from serialized info.""" + class_name = _extract_class_name_from_serialized(serialized) + if not class_name: + return None + + class_lower = class_name.lower() + # Vector databases + if "faiss" in class_lower: + return "faiss" + elif "chroma" in class_lower or "chromadb" in class_lower: + return "chromadb" + elif "pinecone" in class_lower: + return "pinecone" + elif "qdrant" in class_lower: + return "qdrant" + elif "milvus" in class_lower: + return "milvus" + elif "weaviate" in class_lower: + return "weaviate" + # Traditional databases + elif "postgres" in class_lower or "postgresql" in class_lower: + return "postgresql" + elif "mysql" in class_lower: + return "mysql" + elif "mongodb" in class_lower or "mongo" in class_lower: + return "mongodb" + elif "redis" in class_lower: + return "redis" + elif "elasticsearch" in class_lower or "elastic" in class_lower: + return "elasticsearch" + elif "opensearch" in class_lower: + return "opensearch" + elif "cassandra" in class_lower: + return "cassandra" + elif "neo4j" in class_lower: + return "neo4j" + + return None + def _is_agent_run( self, serialized: Optional[dict[str, Any]], @@ -1350,6 +1437,98 @@ def on_agent_error( """Run when agent errors.""" self._handle_error(error, run_id, parent_run_id, **kwargs) + @dont_throw + def on_retriever_start( + self, + serialized: dict[str, Any], + query: str, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + tags: Optional[list[str]] = None, + metadata: Optional[dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + """Run when retriever starts.""" + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return + + metadata_attrs = self._sanitize_metadata_dict(metadata) + extras: dict[str, Any] = {} + if tags: + extras["tags"] = [str(tag) for tag in tags] + + search_kwargs = kwargs.get("search_kwargs", {}) + top_k = search_kwargs.get("k") or search_kwargs.get("top_k") + retriever_type = search_kwargs.get("search_type", "similarity") + vector_store = metadata_attrs.get("vector_store") or self._detect_vector_store(serialized) + + extras.update(metadata_attrs) + + # Add additional retrieval attributes + retriever_name = kwargs.get("name") + if retriever_name: + extras["gen_ai.system"] = retriever_name + + # Add embedding provider as request model if available + embedding_provider = metadata_attrs.get("ls_embedding_provider") + if embedding_provider: + extras["gen_ai.request.model"] = embedding_provider + + # Add database system if available + db_system = self._detect_db_system(serialized) + if db_system: + extras["gen_ai.langchain.db.system"] = db_system + + # Add query text if content capture is enabled + if should_send_prompts() and query: + extras["db.query.text"] = query + + retrieval_inv = UtilRetrievalInvocation( + query=query, + top_k=top_k, + retriever_type=retriever_type, + vector_store=vector_store, + framework="langchain", + search_kwargs=search_kwargs, + attributes=extras, + run_id=run_id, + parent_run_id=parent_run_id, + ) + + self._telemetry_handler.start_retrieval(retrieval_inv) + with self._lock: + self._retrievals[run_id] = retrieval_inv + + @dont_throw + def on_retriever_end( + self, + documents: list[Any], + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> None: + """Run when retriever ends.""" + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return + + with self._lock: + retrieval_inv = self._retrievals.pop(run_id, None) + + if retrieval_inv: + retrieval_inv.documents_retrieved = len(documents) + retrieval_inv.results = [ + { + "content": getattr(doc, "page_content", str(doc))[:500], + "metadata": getattr(doc, "metadata", {}), + "score": getattr(doc, "score", None), + } + for doc in documents[:10] + ] + + self._telemetry_handler.stop_retrieval(retrieval_inv) + @dont_throw def on_retriever_error( self, @@ -1360,6 +1539,16 @@ def on_retriever_error( **kwargs: Any, ) -> None: """Run when retriever errors.""" + with self._lock: + retrieval_inv = self._retrievals.pop(run_id, None) + if retrieval_inv: + try: + self._telemetry_handler.fail_retrieval( + retrieval_inv, + UtilError(message=str(error), type=type(error)), + ) + except Exception: + pass self._handle_error(error, run_id, parent_run_id, **kwargs) def _emit_chat_input_events(self, messages): diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/src/opentelemetry/instrumentation/langchain/semconv_ai.py b/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/src/opentelemetry/instrumentation/langchain/semconv_ai.py index d0c77edf4b..ac131593f5 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/src/opentelemetry/instrumentation/langchain/semconv_ai.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/src/opentelemetry/instrumentation/langchain/semconv_ai.py @@ -303,4 +303,5 @@ class SpanKindValues(Enum): TASK = "task" AGENT = "agent" TOOL = "tool" + RETRIEVAL = "retrieval" UNKNOWN = "unknown" diff --git a/util/opentelemetry-util-genai-dev/examples/retrievals_example.py b/util/opentelemetry-util-genai-dev/examples/retrievals_example.py new file mode 100644 index 0000000000..efdcb954b1 --- /dev/null +++ b/util/opentelemetry-util-genai-dev/examples/retrievals_example.py @@ -0,0 +1,436 @@ +#!/usr/bin/env python3 +""" +Example demonstrating OpenTelemetry GenAI telemetry for retrieval operations. + +This example shows: +1. Basic retrieval invocation lifecycle +2. Retrieval with vector search +3. Retrieval with text query +4. Retrieval with filters and search kwargs +5. Error handling for retrieval operations +6. Retrieval with agent context +7. Metrics and span emission for retrievals +""" + +import time + +from opentelemetry import _logs as logs +from opentelemetry import trace +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import ( + ConsoleLogExporter, + SimpleLogRecordProcessor, +) +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import ( + ConsoleMetricExporter, + PeriodicExportingMetricReader, +) +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleSpanProcessor, +) +from opentelemetry.util.genai.handler import get_telemetry_handler +from opentelemetry.util.genai.types import Error, RetrievalInvocation + + +def setup_telemetry(): + """Set up OpenTelemetry providers for tracing, metrics, and logging.""" + # Set up tracing + trace_provider = TracerProvider() + trace_provider.add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) + ) + trace.set_tracer_provider(trace_provider) + + # Set up metrics + metric_reader = PeriodicExportingMetricReader( + ConsoleMetricExporter(), export_interval_millis=5000 + ) + meter_provider = MeterProvider(metric_readers=[metric_reader]) + + # Set up logging (for events) + logger_provider = LoggerProvider() + logger_provider.add_log_record_processor( + SimpleLogRecordProcessor(ConsoleLogExporter()) + ) + logs.set_logger_provider(logger_provider) + + return trace_provider, meter_provider, logger_provider + + +def example_basic_retrieval(): + """Example 1: Basic retrieval invocation with text query.""" + print("\n" + "=" * 60) + print("Example 1: Basic Retrieval Invocation") + print("=" * 60) + + handler = get_telemetry_handler() + + # Create retrieval invocation + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="What is OpenTelemetry?", + top_k=5, + retriever_type="vector_store", + vector_store="pinecone", + provider="pinecone", + ) + + # Start the retrieval operation + handler.start_retrieval(retrieval) + time.sleep(0.05) # Simulate API call + + # Simulate response - populate results + retrieval.documents_retrieved = 5 + retrieval.results = [ + {"id": "doc1", "score": 0.95, "content": "OpenTelemetry is..."}, + {"id": "doc2", "score": 0.89, "content": "OTEL provides..."}, + {"id": "doc3", "score": 0.85, "content": "Observability with..."}, + {"id": "doc4", "score": 0.82, "content": "Tracing and metrics..."}, + {"id": "doc5", "score": 0.78, "content": "Distributed tracing..."}, + ] + + # Finish the retrieval operation + handler.stop_retrieval(retrieval) + + print("✓ Completed retrieval for text query") + print(f" Query: {retrieval.query}") + print(f" Documents retrieved: {retrieval.documents_retrieved}") + print(f" Vector store: {retrieval.vector_store}") + + +def example_vector_search(): + """Example 2: Retrieval with vector search.""" + print("\n" + "=" * 60) + print("Example 2: Vector Search Retrieval") + print("=" * 60) + + handler = get_telemetry_handler() + + # Create retrieval with query vector + query_vector = [0.1, 0.2, 0.3, 0.4, 0.5] * 100 # 500-dim vector + + retrieval = RetrievalInvocation( + operation_name="retrieval", + query_vector=query_vector, + top_k=10, + retriever_type="vector_store", + vector_store="chroma", + provider="chroma", + framework="langchain", + ) + + # Start the retrieval operation + handler.start_retrieval(retrieval) + time.sleep(0.08) # Simulate API call + + # Simulate response + retrieval.documents_retrieved = 10 + retrieval.results = [ + {"id": f"doc{i}", "score": 0.95 - i * 0.05} + for i in range(10) + ] + + # Finish the retrieval operation + handler.stop_retrieval(retrieval) + + print("✓ Completed vector search retrieval") + print(f" Vector dimensions: {len(query_vector)}") + print(f" Documents retrieved: {retrieval.documents_retrieved}") + print(f" Framework: {retrieval.framework}") + + +def example_retrieval_with_filters(): + """Example 3: Retrieval with search filters and kwargs.""" + print("\n" + "=" * 60) + print("Example 3: Retrieval with Filters") + print("=" * 60) + + handler = get_telemetry_handler() + + # Create retrieval with filters + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="machine learning tutorials", + top_k=3, + retriever_type="hybrid_search", + vector_store="weaviate", + provider="weaviate", + search_filter={ + "category": "tutorial", + "difficulty": "beginner", + "language": "python", + }, + search_kwargs={ + "score_threshold": 0.7, + "fetch_k": 20, + "lambda_mult": 0.5, + }, + ) + + # Start the retrieval operation + handler.start_retrieval(retrieval) + time.sleep(0.06) # Simulate API call + + # Simulate response + retrieval.documents_retrieved = 3 + retrieval.results = [ + { + "id": "tut1", + "score": 0.92, + "content": "Intro to ML", + "metadata": {"category": "tutorial", "difficulty": "beginner"}, + }, + { + "id": "tut2", + "score": 0.88, + "content": "Python ML basics", + "metadata": {"category": "tutorial", "difficulty": "beginner"}, + }, + { + "id": "tut3", + "score": 0.85, + "content": "Getting started with ML", + "metadata": {"category": "tutorial", "difficulty": "beginner"}, + }, + ] + + # Finish the retrieval operation + handler.stop_retrieval(retrieval) + + print("✓ Completed retrieval with filters") + print(f" Query: {retrieval.query}") + print(f" Filters: {retrieval.search_filter}") + print(f" Search kwargs: {retrieval.search_kwargs}") + print(f" Documents retrieved: {retrieval.documents_retrieved}") + + +def example_retrieval_with_custom_attributes(): + """Example 4: Retrieval with custom attributes.""" + print("\n" + "=" * 60) + print("Example 4: Retrieval with Custom Attributes") + print("=" * 60) + + handler = get_telemetry_handler() + + # Create retrieval with custom attributes + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="customer support documentation", + top_k=5, + retriever_type="semantic_search", + vector_store="qdrant", + provider="qdrant", + attributes={ + "collection_name": "support_docs", + "user_id": "user-789", + "session_id": "session-456", + "search_type": "semantic", + }, + ) + + # Start the retrieval operation + handler.start_retrieval(retrieval) + time.sleep(0.07) # Simulate API call + + # Simulate response + retrieval.documents_retrieved = 5 + + # Finish the retrieval operation + handler.stop_retrieval(retrieval) + + print("✓ Completed retrieval with custom attributes") + print(f" Query: {retrieval.query}") + print(f" Custom attributes: {retrieval.attributes}") + + +def example_retrieval_with_agent_context(): + """Example 5: Retrieval within an agent context.""" + print("\n" + "=" * 60) + print("Example 5: Retrieval with Agent Context") + print("=" * 60) + + handler = get_telemetry_handler() + + # Create retrieval with agent context + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="latest product updates", + top_k=7, + retriever_type="vector_store", + vector_store="milvus", + provider="milvus", + framework="langchain", + agent_name="product_assistant", + agent_id="agent-123", + ) + + # Start the retrieval operation + handler.start_retrieval(retrieval) + time.sleep(0.05) # Simulate API call + + # Simulate response + retrieval.documents_retrieved = 7 + + # Finish the retrieval operation + handler.stop_retrieval(retrieval) + + print("✓ Completed retrieval with agent context") + print(f" Agent: {retrieval.agent_name} (ID: {retrieval.agent_id})") + print(f" Query: {retrieval.query}") + print(f" Documents retrieved: {retrieval.documents_retrieved}") + + +def example_retrieval_error(): + """Example 6: Handling retrieval errors.""" + print("\n" + "=" * 60) + print("Example 6: Retrieval Error Handling") + print("=" * 60) + + handler = get_telemetry_handler() + + # Create retrieval invocation + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="test query", + top_k=5, + retriever_type="vector_store", + vector_store="pinecone", + provider="pinecone", + ) + + # Start the retrieval operation + handler.start_retrieval(retrieval) + time.sleep(0.03) # Simulate API call + + # Simulate an error + error = Error( + message="Connection timeout to vector store", + type=TimeoutError, + ) + + # Fail the retrieval operation + handler.fail_retrieval(retrieval, error) + + print("✗ Retrieval failed with error") + print(f" Error: {error.message}") + print(f" Vector store: {retrieval.vector_store}") + + +def example_multiple_retrievals(): + """Example 7: Multiple sequential retrievals.""" + print("\n" + "=" * 60) + print("Example 7: Multiple Sequential Retrievals") + print("=" * 60) + + handler = get_telemetry_handler() + + queries = [ + "What is machine learning?", + "How does deep learning work?", + "Explain neural networks", + ] + + for idx, query_text in enumerate(queries, 1): + retrieval = RetrievalInvocation( + operation_name="retrieval", + query=query_text, + top_k=5, + retriever_type="vector_store", + vector_store="pinecone", + provider="pinecone", + attributes={"query_index": idx}, + ) + + handler.start_retrieval(retrieval) + time.sleep(0.04) # Simulate API call + + # Simulate response + retrieval.documents_retrieved = 5 + + handler.stop_retrieval(retrieval) + print(f" ✓ Completed retrieval {idx}/{len(queries)}") + + print(f"✓ Completed all {len(queries)} retrievals") + + +def example_hybrid_retrieval(): + """Example 8: Hybrid retrieval combining text and vector search.""" + print("\n" + "=" * 60) + print("Example 8: Hybrid Retrieval") + print("=" * 60) + + handler = get_telemetry_handler() + + # Create hybrid retrieval with both query and vector + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="artificial intelligence applications", + query_vector=[0.2] * 768, # 768-dim vector + top_k=8, + retriever_type="hybrid_search", + vector_store="elasticsearch", + provider="elasticsearch", + framework="langchain", + search_kwargs={ + "alpha": 0.5, # Balance between text and vector search + "boost_query": True, + }, + ) + + # Start the retrieval operation + handler.start_retrieval(retrieval) + time.sleep(0.09) # Simulate API call + + # Simulate response + retrieval.documents_retrieved = 8 + retrieval.results = [ + {"id": f"doc{i}", "score": 0.9 - i * 0.05, "type": "hybrid"} + for i in range(8) + ] + + # Finish the retrieval operation + handler.stop_retrieval(retrieval) + + print("✓ Completed hybrid retrieval") + print(f" Query: {retrieval.query}") + print(f" Vector dimensions: {len(retrieval.query_vector)}") + print(f" Retriever type: {retrieval.retriever_type}") + print(f" Documents retrieved: {retrieval.documents_retrieved}") + + +def main(): + """Run all retrieval examples.""" + print("\n" + "=" * 60) + print("OpenTelemetry GenAI Retrievals Examples") + print("=" * 60) + + # Set up telemetry + trace_provider, meter_provider, logger_provider = setup_telemetry() + + # Run examples + example_basic_retrieval() + example_vector_search() + example_retrieval_with_filters() + example_retrieval_with_custom_attributes() + example_retrieval_with_agent_context() + example_retrieval_error() + example_multiple_retrievals() + example_hybrid_retrieval() + + # Force flush to ensure all telemetry is exported + print("\n" + "=" * 60) + print("Flushing telemetry data...") + print("=" * 60) + trace_provider.force_flush() + meter_provider.force_flush() + logger_provider.force_flush() + + print("\n✓ All examples completed successfully!") + print("Check the console output above for spans, metrics, and events.\n") + + +if __name__ == "__main__": + main() diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/attributes.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/attributes.py index a6cefb6e78..3e98afa675 100644 --- a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/attributes.py +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/attributes.py @@ -55,6 +55,13 @@ GEN_AI_EMBEDDINGS_INPUT_TEXTS = "gen_ai.embeddings.input.texts" GEN_AI_REQUEST_ENCODING_FORMATS = "gen_ai.request.encoding_formats" +# Retrieval attributes +GEN_AI_RETRIEVAL_QUERY = "gen_ai.retrieval.query" +GEN_AI_RETRIEVAL_TOP_K = "gen_ai.retrieval.top_k" +GEN_AI_RETRIEVAL_TYPE = "gen_ai.retrieval.type" +GEN_AI_RETRIEVAL_VECTOR_STORE = "gen_ai.retrieval.vector_store" +GEN_AI_RETRIEVAL_DOCUMENTS_RETRIEVED = "gen_ai.retrieval.documents_retrieved" + # Server attributes (from semantic conventions) SERVER_ADDRESS = "server.address" SERVER_PORT = "server.port" diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/metrics.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/metrics.py index 51a7b64c1f..2329393e68 100644 --- a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/metrics.py +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/metrics.py @@ -15,6 +15,7 @@ EmbeddingInvocation, Error, LLMInvocation, + RetrievalInvocation, Task, Workflow, ) @@ -50,6 +51,9 @@ def __init__(self, meter: Optional[Meter] = None): self._task_duration_histogram: Histogram = ( instruments.task_duration_histogram ) + self._retrieval_duration_histogram: Histogram = ( + instruments.retrieval_duration_histogram + ) def on_start(self, obj: Any) -> None: # no-op for metrics return None @@ -64,6 +68,9 @@ def on_end(self, obj: Any) -> None: if isinstance(obj, Task): self._record_task_metrics(obj) return + if isinstance(obj, RetrievalInvocation): + self._record_retrieval_metrics(obj) + return if isinstance(obj, LLMInvocation): invocation = obj @@ -153,6 +160,9 @@ def on_error(self, error: Error, obj: Any) -> None: if isinstance(obj, Task): self._record_task_metrics(obj) return + if isinstance(obj, RetrievalInvocation): + self._record_retrieval_metrics(obj) + return # Handle existing types with agent context if isinstance(obj, LLMInvocation): @@ -234,6 +244,7 @@ def handles(self, obj: Any) -> bool: AgentInvocation, Task, EmbeddingInvocation, + RetrievalInvocation, ), ) @@ -316,3 +327,29 @@ def _record_task_metrics(self, task: Task) -> None: self._task_duration_histogram.record( duration, attributes=metric_attrs, context=context ) + + def _record_retrieval_metrics(self, retrieval: RetrievalInvocation) -> None: + """Record metrics for a retrieval operation.""" + if retrieval.end_time is None: + return + duration = retrieval.end_time - retrieval.start_time + metric_attrs = { + GenAI.GEN_AI_OPERATION_NAME: retrieval.operation_name, + } + if retrieval.retriever_type: + metric_attrs["gen_ai.retrieval.type"] = retrieval.retriever_type + if retrieval.vector_store: + metric_attrs["gen_ai.retrieval.vector_store"] = retrieval.vector_store + if retrieval.framework: + metric_attrs["gen_ai.framework"] = retrieval.framework + if retrieval.provider: + metric_attrs[GenAI.GEN_AI_PROVIDER_NAME] = retrieval.provider + # Add agent context if available + if retrieval.agent_name: + metric_attrs[GenAI.GEN_AI_AGENT_NAME] = retrieval.agent_name + if retrieval.agent_id: + metric_attrs[GenAI.GEN_AI_AGENT_ID] = retrieval.agent_id + + self._retrieval_duration_histogram.record( + duration, attributes=metric_attrs + ) diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/span.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/span.py index 4c4fbb3f09..cfd130bdba 100644 --- a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/span.py +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/span.py @@ -26,6 +26,11 @@ GEN_AI_OUTPUT_MESSAGES, GEN_AI_PROVIDER_NAME, GEN_AI_REQUEST_ENCODING_FORMATS, + GEN_AI_RETRIEVAL_DOCUMENTS_RETRIEVED, + GEN_AI_RETRIEVAL_QUERY, + GEN_AI_RETRIEVAL_TOP_K, + GEN_AI_RETRIEVAL_TYPE, + GEN_AI_RETRIEVAL_VECTOR_STORE, GEN_AI_TASK_ASSIGNED_AGENT, GEN_AI_TASK_NAME, GEN_AI_TASK_OBJECTIVE, @@ -45,6 +50,7 @@ EmbeddingInvocation, Error, LLMInvocation, + RetrievalInvocation, Task, ToolCall, Workflow, @@ -264,6 +270,8 @@ def on_start( self._start_agent(invocation) elif isinstance(invocation, Task): self._start_task(invocation) + elif isinstance(invocation, RetrievalInvocation): + self._start_retrieval(invocation) # Handle existing types elif isinstance(invocation, ToolCall): span_name = f"tool {invocation.name}" @@ -296,6 +304,8 @@ def on_end(self, invocation: LLMInvocation | EmbeddingInvocation) -> None: # ty self._finish_agent(invocation) elif isinstance(invocation, Task): self._finish_task(invocation) + elif isinstance(invocation, RetrievalInvocation): + self._finish_retrieval(invocation) elif isinstance(invocation, EmbeddingInvocation): self._finish_embedding(invocation) else: @@ -320,6 +330,8 @@ def on_error( self._error_agent(error, invocation) elif isinstance(invocation, Task): self._error_task(error, invocation) + elif isinstance(invocation, RetrievalInvocation): + self._error_retrieval(error, invocation) elif isinstance(invocation, EmbeddingInvocation): self._error_embedding(error, invocation) else: @@ -690,3 +702,87 @@ def _error_embedding( token.__exit__(None, None, None) # type: ignore[misc] except Exception: pass + span.end() + + # ---- Retrieval lifecycle --------------------------------------------- + def _start_retrieval(self, retrieval: RetrievalInvocation) -> None: + """Start a retrieval span.""" + span_name = f"{retrieval.operation_name}" + if retrieval.vector_store: + span_name = f"{retrieval.operation_name} {retrieval.vector_store}" + cm = self._tracer.start_as_current_span( + span_name, kind=SpanKind.CLIENT, end_on_exit=False + ) + span = cm.__enter__() + retrieval.span = span # type: ignore[assignment] + retrieval.context_token = cm # type: ignore[assignment] + + # Set semantic convention attributes + semconv_attrs = dict(retrieval.semantic_convention_attributes()) + _apply_gen_ai_semconv_attributes(span, semconv_attrs) + + # Apply custom attributes from the invocation + _apply_gen_ai_semconv_attributes( + span, getattr(retrieval, "attributes", None) + ) + + # Set retrieval-specific attributes + if retrieval.retriever_type: + span.set_attribute(GEN_AI_RETRIEVAL_TYPE, retrieval.retriever_type) + if retrieval.vector_store: + span.set_attribute( + GEN_AI_RETRIEVAL_VECTOR_STORE, retrieval.vector_store + ) + if retrieval.top_k is not None: + span.set_attribute(GEN_AI_RETRIEVAL_TOP_K, retrieval.top_k) + if retrieval.framework: + span.set_attribute("gen_ai.framework", retrieval.framework) + if retrieval.provider: + span.set_attribute(GEN_AI_PROVIDER_NAME, retrieval.provider) + if retrieval.query and self._capture_content: + span.set_attribute(GEN_AI_RETRIEVAL_QUERY, retrieval.query) + + def _finish_retrieval(self, retrieval: RetrievalInvocation) -> None: + """Finish a retrieval span.""" + span = retrieval.span + if span is None: + return + # Set documents retrieved count if available + if retrieval.documents_retrieved is not None: + span.set_attribute( + GEN_AI_RETRIEVAL_DOCUMENTS_RETRIEVED, + retrieval.documents_retrieved, + ) + _apply_gen_ai_semconv_attributes( + span, retrieval.semantic_convention_attributes() + ) + token = retrieval.context_token + if token is not None and hasattr(token, "__exit__"): + try: + token.__exit__(None, None, None) # type: ignore[misc] + except Exception: + pass + span.end() + + def _error_retrieval( + self, error: Error, retrieval: RetrievalInvocation + ) -> None: + """Fail a retrieval span with error status.""" + span = retrieval.span + if span is None: + return + span.set_status(Status(StatusCode.ERROR, error.message)) + if span.is_recording(): + span.set_attribute( + ErrorAttributes.ERROR_TYPE, error.type.__qualname__ + ) + _apply_gen_ai_semconv_attributes( + span, retrieval.semantic_convention_attributes() + ) + token = retrieval.context_token + if token is not None and hasattr(token, "__exit__"): + try: + token.__exit__(None, None, None) # type: ignore[misc] + except Exception: + pass + span.end() diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/handler.py index f84edc2e05..6d8902700c 100644 --- a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/handler.py @@ -70,6 +70,7 @@ EvaluationResult, GenAI, LLMInvocation, + RetrievalInvocation, Task, ToolCall, Workflow, @@ -547,6 +548,43 @@ def fail_task(self, task: Task, error: Error) -> Task: pass return task + def start_retrieval(self, retrieval: RetrievalInvocation) -> RetrievalInvocation: + """Start a retrieval operation and create a pending span entry.""" + self._refresh_capture_content() + retrieval.start_time = time.time() + self._emitter.on_start(retrieval) + return retrieval + + def stop_retrieval(self, retrieval: RetrievalInvocation) -> RetrievalInvocation: + """Finalize a retrieval operation successfully and end its span.""" + retrieval.end_time = time.time() + self._emitter.on_end(retrieval) + self._notify_completion(retrieval) + if ( + hasattr(self, "_meter_provider") + and self._meter_provider is not None + ): + try: + self._meter_provider.force_flush() + except Exception: + pass + return retrieval + + def fail_retrieval(self, retrieval: RetrievalInvocation, error: Error) -> RetrievalInvocation: + """Fail a retrieval operation and end its span with error status.""" + retrieval.end_time = time.time() + self._emitter.on_error(error, retrieval) + self._notify_completion(retrieval) + if ( + hasattr(self, "_meter_provider") + and self._meter_provider is not None + ): + try: + self._meter_provider.force_flush() + except Exception: + pass + return retrieval + def evaluate_llm( self, invocation: LLMInvocation, diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/instruments.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/instruments.py index f788eecf0b..bf1fed89eb 100644 --- a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/instruments.py +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/instruments.py @@ -47,3 +47,8 @@ def __init__(self, meter: Meter): unit="s", description="Duration of task executions", ) + self.retrieval_duration_histogram: Histogram = meter.create_histogram( + name="gen_ai.retrieval.duration", + unit="s", + description="Duration of retrieval operations", + ) diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/types.py index 54c40b6de0..38cbd72e0f 100644 --- a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/types.py @@ -313,6 +313,25 @@ class EmbeddingInvocation(GenAI): error_type: Optional[str] = None +@dataclass +class RetrievalInvocation(GenAI): + """Represents a single retrieval/search invocation.""" + + operation_name: str = field( + default="retrieval", + metadata={"semconv": GenAIAttributes.GEN_AI_OPERATION_NAME}, + ) + query: Optional[str] = None + query_vector: Optional[list[float]] = None + top_k: Optional[int] = None + retriever_type: Optional[str] = None + vector_store: Optional[str] = None + documents_retrieved: Optional[int] = None + search_filter: Optional[dict[str, Any]] = None + search_kwargs: dict[str, Any] = field(default_factory=_new_str_any_dict) + results: list[dict[str, Any]] = field(default_factory=list) + + @dataclass class Workflow(GenAI): """Represents a workflow orchestrating multiple agents and tasks. @@ -409,6 +428,7 @@ class Task(GenAI): "GenAI", "LLMInvocation", "EmbeddingInvocation", + "RetrievalInvocation", "Error", "EvaluationResult", # agentic AI types