From 2cff4b8996efc4622d32d0efdcd9a8f776a6efbf Mon Sep 17 00:00:00 2001 From: Dwij Patel Date: Thu, 1 May 2025 02:41:30 +0530 Subject: [PATCH 1/6] Add IBM watsonx.ai instrumentation --- agentops/instrumentation/__init__.py | 5 + .../ibm_watsonx_ai/__init__.py | 38 +++ .../ibm_watsonx_ai/attributes/__init__.py | 12 + .../ibm_watsonx_ai/attributes/attributes.py | 241 ++++++++++++++++ .../ibm_watsonx_ai/attributes/common.py | 66 +++++ .../ibm_watsonx_ai/instrumentor.py | 268 ++++++++++++++++++ 6 files changed, 630 insertions(+) create mode 100644 agentops/instrumentation/ibm_watsonx_ai/__init__.py create mode 100644 agentops/instrumentation/ibm_watsonx_ai/attributes/__init__.py create mode 100644 agentops/instrumentation/ibm_watsonx_ai/attributes/attributes.py create mode 100644 agentops/instrumentation/ibm_watsonx_ai/attributes/common.py create mode 100644 agentops/instrumentation/ibm_watsonx_ai/instrumentor.py diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index 69f1c17c0..4a041c328 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -77,6 +77,11 @@ def get_instance(self) -> BaseInstrumentor: class_name="GoogleGenerativeAIInstrumentor", provider_import_name="google.genai", ), + InstrumentorLoader( + module_name="agentops.instrumentation.ibm_watsonx_ai", + class_name="IBMWatsonXInstrumentor", + provider_import_name="ibm_watsonx_ai", + ), ] diff --git a/agentops/instrumentation/ibm_watsonx_ai/__init__.py b/agentops/instrumentation/ibm_watsonx_ai/__init__.py new file mode 100644 index 000000000..b80a18170 --- /dev/null +++ b/agentops/instrumentation/ibm_watsonx_ai/__init__.py @@ -0,0 +1,38 @@ +"""IBM watsonx.ai API instrumentation. + +This module provides instrumentation for the IBM watsonx.ai API, +including text generation, embeddings, and model management. +""" + +import logging +from typing import Collection + +def get_version() -> str: + """Get the version of the IBM watsonx.ai SDK, or 'unknown' if not found + + Attempts to retrieve the installed version of the IBM watsonx.ai SDK using importlib.metadata. + Falls back to 'unknown' if the version cannot be determined. + + Returns: + The version string of the IBM watsonx.ai SDK or 'unknown' + """ + try: + from importlib.metadata import version + return version("ibm-watson-machine-learning") + except ImportError: + logger.debug("Could not find IBM watsonx.ai SDK version") + return "unknown" + +LIBRARY_NAME = "ibm-watsonx" +LIBRARY_VERSION: str = get_version() + +logger = logging.getLogger(__name__) + +# Import after defining constants to avoid circular imports +from agentops.instrumentation.ibm_watsonx_ai.instrumentor import IBMWatsonXInstrumentor # noqa: E402 + +__all__ = [ + "LIBRARY_NAME", + "LIBRARY_VERSION", + "IBMWatsonXInstrumentor", +] \ No newline at end of file diff --git a/agentops/instrumentation/ibm_watsonx_ai/attributes/__init__.py b/agentops/instrumentation/ibm_watsonx_ai/attributes/__init__.py new file mode 100644 index 000000000..263548dd0 --- /dev/null +++ b/agentops/instrumentation/ibm_watsonx_ai/attributes/__init__.py @@ -0,0 +1,12 @@ +"""Attributes for IBM watsonx.ai instrumentation. + +This module provides attribute extraction functions for IBM watsonx.ai operations. +""" + +from agentops.instrumentation.ibm_watsonx_ai.attributes.attributes import ( + get_generate_attributes, +) + +__all__ = [ + "get_generate_attributes", +] \ No newline at end of file diff --git a/agentops/instrumentation/ibm_watsonx_ai/attributes/attributes.py b/agentops/instrumentation/ibm_watsonx_ai/attributes/attributes.py new file mode 100644 index 000000000..f306d1614 --- /dev/null +++ b/agentops/instrumentation/ibm_watsonx_ai/attributes/attributes.py @@ -0,0 +1,241 @@ +"""Attributes for IBM watsonx.ai model instrumentation. + +This module provides attribute extraction functions for IBM watsonx.ai model operations, +focusing on token usage recording. +""" +from typing import Any, Dict, Optional, Tuple +from agentops.instrumentation.common.attributes import AttributeMap +from agentops.semconv import SpanAttributes, MessageAttributes +from agentops.instrumentation.ibm_watsonx_ai.attributes.common import extract_params_attributes + +def get_generate_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None) -> AttributeMap: + """Extract token usage attributes from generate method calls. + + Args: + args: Positional arguments passed to the method + kwargs: Keyword arguments passed to the method + return_value: Return value from the method + + Returns: + Dictionary of token usage attributes to set on the span + """ + attributes = {} + + # Extract prompt from args or kwargs + prompt = None + if args and len(args) > 0: + prompt = args[0] + elif kwargs and 'prompt' in kwargs: + prompt = kwargs['prompt'] + + if prompt: + attributes[MessageAttributes.PROMPT_ROLE.format(i=0)] = "user" + attributes[MessageAttributes.PROMPT_CONTENT.format(i=0)] = prompt + attributes[MessageAttributes.PROMPT_TYPE.format(i=0)] = "text" + + # Extract parameters from args or kwargs + params = None + if args and len(args) > 1: + params = args[1] + elif kwargs and 'params' in kwargs: + params = kwargs['params'] + + if params: + attributes.update(extract_params_attributes(params)) + + # Extract response information + if return_value: + if isinstance(return_value, dict): + # Extract model information + if 'model_id' in return_value: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = return_value['model_id'] + + # Handle results + if 'results' in return_value: + for idx, result in enumerate(return_value['results']): + # Extract completion + if 'generated_text' in result: + attributes[MessageAttributes.COMPLETION_CONTENT.format(i=idx)] = result['generated_text'] + attributes[MessageAttributes.COMPLETION_ROLE.format(i=idx)] = "assistant" + attributes[MessageAttributes.COMPLETION_TYPE.format(i=idx)] = "text" + + # Extract token usage + if 'input_token_count' in result: + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = result['input_token_count'] + if 'generated_token_count' in result: + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = result['generated_token_count'] + if 'input_token_count' in result and 'generated_token_count' in result: + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = result['input_token_count'] + result['generated_token_count'] + + if 'stop_reason' in result: + attributes[SpanAttributes.LLM_RESPONSE_STOP_REASON] = result['stop_reason'] + + return attributes + +def get_tokenize_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None) -> AttributeMap: + """Extract attributes from tokenize method calls. + + Args: + args: Positional arguments passed to the method + kwargs: Keyword arguments passed to the method + return_value: Return value from the method + + Returns: + Dictionary of attributes to set on the span + """ + attributes = {} + + # Extract input from args or kwargs + prompt = None + if args and len(args) > 0: + prompt = args[0] + elif kwargs and "prompt" in kwargs: + prompt = kwargs["prompt"] + + if prompt: + attributes[MessageAttributes.PROMPT_ROLE.format(i=0)] = "user" + attributes[MessageAttributes.PROMPT_CONTENT.format(i=0)] = prompt + attributes[MessageAttributes.PROMPT_TYPE.format(i=0)] = "text" + + # Extract response information + if return_value and isinstance(return_value, dict): + if "model_id" in return_value: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = return_value["model_id"] + if "result" in return_value: + attributes["ibm.watsonx.tokenize.result"] = str(return_value["result"]) + if "token_count" in return_value["result"]: + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = return_value["result"]["token_count"] + + return attributes + +def get_model_details_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None) -> AttributeMap: + """Extract attributes from get_details method calls. + + Args: + args: Positional arguments passed to the method + kwargs: Keyword arguments passed to the method + return_value: Return value from the method + + Returns: + Dictionary of attributes to set on the span + """ + if not isinstance(return_value, dict): + return {} + + # Basic model information + attributes = { + f"ibm.watsonx.model.{key}": value + for key, value in return_value.items() + if key in ["model_id", "label", "provider", "source", "short_description", "long_description", + "number_params", "input_tier", "output_tier"] + } + + # Model functions + if "functions" in return_value: + attributes["ibm.watsonx.model.functions"] = str([func["id"] for func in return_value["functions"]]) + + # Model tasks + if "tasks" in return_value: + task_info = [ + {k: v for k, v in task.items() if k in ["id", "ratings", "tags"]} + for task in return_value["tasks"] + ] + attributes["ibm.watsonx.model.tasks"] = str(task_info) + + # Model limits + if "model_limits" in return_value: + limits = return_value["model_limits"] + attributes.update({ + f"ibm.watsonx.model.{key}": value + for key, value in limits.items() + if key in ["max_sequence_length", "max_output_tokens", "training_data_max_records"] + }) + + # Service tier limits + if "limits" in return_value: + for tier, tier_limits in return_value["limits"].items(): + attributes.update({ + f"ibm.watsonx.model.limits.{tier}.{key}": value + for key, value in tier_limits.items() + if key in ["call_time", "max_output_tokens"] + }) + + # Model lifecycle + if "lifecycle" in return_value: + attributes.update({ + f"ibm.watsonx.model.lifecycle.{stage['id']}": stage["start_date"] + for stage in return_value["lifecycle"] + if "id" in stage and "start_date" in stage + }) + + # Training parameters + if "training_parameters" in return_value: + attributes.update({ + f"ibm.watsonx.model.training.{key}": str(value) if isinstance(value, dict) else value + for key, value in return_value["training_parameters"].items() + }) + + return attributes + +def get_generate_text_stream_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None) -> AttributeMap: + """Extract token usage attributes from generate_text_stream method calls. + + Args: + args: Positional arguments passed to the method + kwargs: Keyword arguments passed to the method + return_value: Return value from the method + + Returns: + Dictionary of token usage attributes to set on the span + """ + attributes = {} + + # Extract prompt from args or kwargs + prompt = None + if args and len(args) > 0: + prompt = args[0] + elif kwargs and 'prompt' in kwargs: + prompt = kwargs['prompt'] + + if prompt: + attributes[MessageAttributes.PROMPT_ROLE.format(i=0)] = "user" + attributes[MessageAttributes.PROMPT_CONTENT.format(i=0)] = prompt + attributes[MessageAttributes.PROMPT_TYPE.format(i=0)] = "text" + + # Extract parameters from args or kwargs + params = None + if args and len(args) > 1: + params = args[1] + elif kwargs and 'params' in kwargs: + params = kwargs['params'] + + if params: + attributes.update(extract_params_attributes(params)) + + # For streaming responses, we'll update the attributes as we receive chunks + if return_value and isinstance(return_value, dict): + # Extract model information + if 'model_id' in return_value: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = return_value['model_id'] + + # Handle results + if 'results' in return_value: + for idx, result in enumerate(return_value['results']): + # Extract completion + if 'generated_text' in result: + attributes[MessageAttributes.COMPLETION_CONTENT.format(i=idx)] = result['generated_text'] + attributes[MessageAttributes.COMPLETION_ROLE.format(i=idx)] = "assistant" + attributes[MessageAttributes.COMPLETION_TYPE.format(i=idx)] = "text" + + # Extract token usage + if 'input_token_count' in result: + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = result['input_token_count'] + if 'generated_token_count' in result: + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = result['generated_token_count'] + if 'input_token_count' in result and 'generated_token_count' in result: + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = result['input_token_count'] + result['generated_token_count'] + + if 'stop_reason' in result: + attributes[SpanAttributes.LLM_RESPONSE_STOP_REASON] = result['stop_reason'] + + return attributes \ No newline at end of file diff --git a/agentops/instrumentation/ibm_watsonx_ai/attributes/common.py b/agentops/instrumentation/ibm_watsonx_ai/attributes/common.py new file mode 100644 index 000000000..9fa9cdf6e --- /dev/null +++ b/agentops/instrumentation/ibm_watsonx_ai/attributes/common.py @@ -0,0 +1,66 @@ +"""Common utilities and constants for IBM watsonx.ai attribute processing. + +This module contains shared constants, attribute mappings, and utility functions for processing +trace and span attributes in IBM watsonx.ai instrumentation. It provides the core functionality +for extracting and formatting attributes according to OpenTelemetry semantic conventions. +""" +from typing import Any, Dict +from agentops.instrumentation.common.attributes import AttributeMap +from agentops.semconv import SpanAttributes + +# Mapping of generation parameters to their OpenTelemetry attribute names +GENERATION_PARAM_ATTRIBUTES: AttributeMap = { + 'max_new_tokens': SpanAttributes.LLM_REQUEST_MAX_TOKENS, + 'min_new_tokens': 'ibm.watsonx.min_new_tokens', + 'temperature': SpanAttributes.LLM_REQUEST_TEMPERATURE, + 'top_p': SpanAttributes.LLM_REQUEST_TOP_P, + 'top_k': 'ibm.watsonx.top_k', + 'repetition_penalty': 'ibm.watsonx.repetition_penalty', + 'time_limit': 'ibm.watsonx.time_limit', + 'random_seed': 'ibm.watsonx.random_seed', + 'stop_sequences': 'ibm.watsonx.stop_sequences', + 'truncate_input_tokens': 'ibm.watsonx.truncate_input_tokens', + 'decoding_method': 'ibm.watsonx.decoding_method', +} + +# Mapping of guardrail parameters to their OpenTelemetry attribute names +GUARDRAIL_PARAM_ATTRIBUTES: AttributeMap = { + 'guardrails': 'ibm.watsonx.guardrails.enabled', + 'guardrails_hap_params': 'ibm.watsonx.guardrails.hap_params', + 'guardrails_pii_params': 'ibm.watsonx.guardrails.pii_params', +} + +def extract_params_attributes(params: Dict[str, Any]) -> AttributeMap: + """Extract generation parameters from a params dictionary. + + Args: + params: Dictionary of generation parameters + + Returns: + Dictionary of attributes to set on the span + """ + attributes = {} + + # Extract standard generation parameters + for param_name, attr_name in GENERATION_PARAM_ATTRIBUTES.items(): + if param_name in params: + value = params[param_name] + # Convert lists to strings for attributes + if isinstance(value, list): + value = str(value) + attributes[attr_name] = value + + # Extract guardrail parameters + for param_name, attr_name in GUARDRAIL_PARAM_ATTRIBUTES.items(): + if param_name in params: + value = params[param_name] + # Convert dicts to strings for attributes + if isinstance(value, dict): + value = str(value) + attributes[attr_name] = value + + # Extract concurrency limit + if 'concurrency_limit' in params: + attributes['ibm.watsonx.concurrency_limit'] = params['concurrency_limit'] + + return attributes \ No newline at end of file diff --git a/agentops/instrumentation/ibm_watsonx_ai/instrumentor.py b/agentops/instrumentation/ibm_watsonx_ai/instrumentor.py new file mode 100644 index 000000000..4a2d9ab07 --- /dev/null +++ b/agentops/instrumentation/ibm_watsonx_ai/instrumentor.py @@ -0,0 +1,268 @@ +"""IBM watsonx.ai Instrumentation for AgentOps + +This module provides instrumentation for the IBM watsonx.ai API, implementing OpenTelemetry +instrumentation for model requests and responses. + +We focus on instrumenting the following key endpoints: +- Model.generate - Text generation API +- Model.tokenize - Tokenization API +- Model.get_details - Model details API +""" +from typing import List, Optional, Collection +from opentelemetry.trace import get_tracer, SpanKind +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.metrics import get_meter +from wrapt import wrap_function_wrapper + +from agentops.logging import logger +from agentops.instrumentation.common.wrappers import WrapConfig, wrap, unwrap +from agentops.instrumentation.ibm_watsonx_ai import LIBRARY_NAME, LIBRARY_VERSION +from agentops.instrumentation.ibm_watsonx_ai.attributes.attributes import ( + get_generate_attributes, + get_tokenize_attributes, + get_model_details_attributes, + get_generate_text_stream_attributes, +) +from agentops.semconv import ( + SpanAttributes, + Meters, + LLMRequestTypeValues, + CoreAttributes, + MessageAttributes +) + +# Methods to wrap for instrumentation +WRAPPED_METHODS: List[WrapConfig] = [ + # Model-based API methods + WrapConfig( + trace_name="watsonx.generate", + package="ibm_watson_machine_learning.foundation_models", + class_name="Model", + method_name="generate", + handler=get_generate_attributes, + ), + WrapConfig( + trace_name="watsonx.generate_text_stream", + package="ibm_watson_machine_learning.foundation_models", + class_name="Model", + method_name="generate_text_stream", + handler=get_generate_text_stream_attributes, + ), + WrapConfig( + trace_name="watsonx.tokenize", + package="ibm_watson_machine_learning.foundation_models", + class_name="Model", + method_name="tokenize", + handler=get_tokenize_attributes, + ), + WrapConfig( + trace_name="watsonx.get_details", + package="ibm_watson_machine_learning.foundation_models", + class_name="Model", + method_name="get_details", + handler=get_model_details_attributes, + ), +] + +class TracedStream: + """A wrapper for IBM watsonx.ai's streaming response that adds telemetry. + + This class wraps the original stream to capture metrics about the streaming process, + including token counts, content, and errors. + """ + + def __init__(self, original_stream, span): + """Initialize with the original stream and span. + + Args: + original_stream: The IBM watsonx.ai stream to wrap + span: The OpenTelemetry span to record metrics on + """ + self.original_stream = original_stream + self.span = span + self.completion_content = "" + self.input_tokens = 0 + self.output_tokens = 0 + + def __iter__(self): + """Iterate through chunks, tracking tokens and content. + + Yields: + Chunks from the original stream + """ + try: + for chunk in self.original_stream: + try: + if isinstance(chunk, dict) and 'results' in chunk: + for result in chunk['results']: + if 'generated_text' in result: + self.completion_content += result['generated_text'] + + if 'input_token_count' in result: + self.input_tokens = result['input_token_count'] + + if 'generated_token_count' in result: + self.output_tokens = result['generated_token_count'] + + # Update span attributes with current token counts + self.span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, self.input_tokens) + self.span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, self.output_tokens) + self.span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, self.input_tokens + self.output_tokens) + + # Update completion content + if self.completion_content: + self.span.set_attribute(MessageAttributes.COMPLETION_TYPE.format(i=0), "text") + self.span.set_attribute(MessageAttributes.COMPLETION_ROLE.format(i=0), "assistant") + self.span.set_attribute(MessageAttributes.COMPLETION_CONTENT.format(i=0), self.completion_content) + except Exception as e: + logger.debug(f"Error processing stream chunk: {e}") + + yield chunk + finally: + # End the span when the stream is exhausted + if self.span.is_recording(): + self.span.end() + +def generate_text_stream_wrapper(wrapped, instance, args, kwargs): + """Wrapper for the Model.generate_text_stream method. + + This wrapper creates spans for tracking stream performance and injects + a stream wrapper to capture streaming events. + + Args: + wrapped: The original stream method + instance: The instance the method is bound to + args: Positional arguments to the method + kwargs: Keyword arguments to the method + + Returns: + A wrapped stream that captures telemetry data + """ + tracer = get_tracer(LIBRARY_NAME, LIBRARY_VERSION) + span = tracer.start_span( + "watsonx.generate_text_stream", + kind=SpanKind.CLIENT, + attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.COMPLETION.value}, + ) + + # Extract prompt and parameters + prompt = None + if args and len(args) > 0: + prompt = args[0] + elif kwargs and 'prompt' in kwargs: + prompt = kwargs['prompt'] + + if prompt: + span.set_attribute(MessageAttributes.PROMPT_ROLE.format(i=0), "user") + span.set_attribute(MessageAttributes.PROMPT_CONTENT.format(i=0), prompt) + span.set_attribute(MessageAttributes.PROMPT_TYPE.format(i=0), "text") + + # Extract parameters from args or kwargs + params = None + if args and len(args) > 1: + params = args[1] + elif kwargs and 'params' in kwargs: + params = kwargs['params'] + + if params: + # Use common attribute extraction + from agentops.instrumentation.ibm_watsonx_ai.attributes.common import extract_params_attributes + span_attributes = extract_params_attributes(params) + for key, value in span_attributes.items(): + span.set_attribute(key, value) + + span.set_attribute(SpanAttributes.LLM_REQUEST_STREAMING, True) + + try: + stream = wrapped(*args, **kwargs) + return TracedStream(stream, span) + except Exception as e: + span.record_exception(e) + span.set_attribute(CoreAttributes.ERROR_MESSAGE, str(e)) + span.set_attribute(CoreAttributes.ERROR_TYPE, e.__class__.__name__) + span.end() + raise + +class IBMWatsonXInstrumentor(BaseInstrumentor): + """An instrumentor for IBM watsonx.ai API. + + This class provides instrumentation for IBM's watsonx.ai API by wrapping key methods + in the client library and capturing telemetry data. It supports both synchronous and + asynchronous API calls. + + It captures metrics including token usage, operation duration, and exceptions. + """ + + def instrumentation_dependencies(self) -> Collection[str]: + """Return packages required for instrumentation. + + Returns: + A collection of package specifications required for this instrumentation. + """ + return ["ibm-watson-machine-learning >= 1.0.0"] + + def _instrument(self, **kwargs): + """Instrument the IBM watsonx.ai API. + + This method wraps the key methods in the IBM watsonx.ai client to capture + telemetry data for API calls. It sets up tracers, meters, and wraps the appropriate + methods for instrumentation. + + Args: + **kwargs: Configuration options for instrumentation. + """ + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(LIBRARY_NAME, LIBRARY_VERSION, tracer_provider) + + meter_provider = kwargs.get("meter_provider") + meter = get_meter(LIBRARY_NAME, LIBRARY_VERSION, meter_provider) + + tokens_histogram = meter.create_histogram( + name=Meters.LLM_TOKEN_USAGE, + unit="token", + description="Measures number of input and output tokens used with IBM watsonx.ai models", + ) + + duration_histogram = meter.create_histogram( + name=Meters.LLM_OPERATION_DURATION, + unit="s", + description="IBM watsonx.ai operation duration", + ) + + exception_counter = meter.create_counter( + name=Meters.LLM_COMPLETIONS_EXCEPTIONS, + unit="time", + description="Number of exceptions occurred during IBM watsonx.ai completions", + ) + + # Standard method wrapping approach for regular methods + for wrap_config in WRAPPED_METHODS: + try: + if wrap_config.method_name == "generate_text_stream": + wrap_function_wrapper( + wrap_config.package, + f"{wrap_config.class_name}.{wrap_config.method_name}", + generate_text_stream_wrapper, + ) + else: + wrap(wrap_config, tracer) + logger.debug(f"Wrapped {wrap_config.package}.{wrap_config.class_name}.{wrap_config.method_name}") + except (AttributeError, ModuleNotFoundError) as e: + logger.debug(f"Could not wrap {wrap_config.package}.{wrap_config.class_name}.{wrap_config.method_name}: {e}") + + def _uninstrument(self, **kwargs): + """Remove instrumentation from IBM watsonx.ai API. + + This method unwraps all methods that were wrapped during instrumentation, + restoring the original behavior of the IBM watsonx.ai API. + + Args: + **kwargs: Configuration options for uninstrumentation. + """ + # Unwrap standard methods + for wrap_config in WRAPPED_METHODS: + try: + unwrap(wrap_config) + logger.debug(f"Unwrapped {wrap_config.package}.{wrap_config.class_name}.{wrap_config.method_name}") + except Exception as e: + logger.debug(f"Failed to unwrap {wrap_config.package}.{wrap_config.class_name}.{wrap_config.method_name}: {e}") \ No newline at end of file From 77d0cc76a0248150029900d46516b13efb1d4169 Mon Sep 17 00:00:00 2001 From: Dwij Patel Date: Thu, 1 May 2025 07:41:38 +0530 Subject: [PATCH 2/6] Add IBMMachineLearningInstrumentor and extend watsonx.ai instrumentation with new methods --- agentops/instrumentation/__init__.py | 5 + .../ibm_machine_learning/__init__.py | 39 +++ .../attributes/__init__.py | 12 + .../attributes/attributes.py | 241 +++++++++++++++++ .../ibm_machine_learning/attributes/common.py | 66 +++++ .../ibm_machine_learning/instrumentor.py | 243 ++++++++++++++++++ .../ibm_watsonx_ai/instrumentor.py | 39 ++- 7 files changed, 636 insertions(+), 9 deletions(-) create mode 100644 agentops/instrumentation/ibm_machine_learning/__init__.py create mode 100644 agentops/instrumentation/ibm_machine_learning/attributes/__init__.py create mode 100644 agentops/instrumentation/ibm_machine_learning/attributes/attributes.py create mode 100644 agentops/instrumentation/ibm_machine_learning/attributes/common.py create mode 100644 agentops/instrumentation/ibm_machine_learning/instrumentor.py diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index 4a041c328..a7ab221eb 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -82,6 +82,11 @@ def get_instance(self) -> BaseInstrumentor: class_name="IBMWatsonXInstrumentor", provider_import_name="ibm_watsonx_ai", ), + InstrumentorLoader( + module_name="agentops.instrumentation.ibm_machine_learning", + class_name="IBMMachineLearningInstrumentor", + provider_import_name="ibm_watson_machine_learning", + ), ] diff --git a/agentops/instrumentation/ibm_machine_learning/__init__.py b/agentops/instrumentation/ibm_machine_learning/__init__.py new file mode 100644 index 000000000..0923660cc --- /dev/null +++ b/agentops/instrumentation/ibm_machine_learning/__init__.py @@ -0,0 +1,39 @@ +"""IBM Machine Learning API instrumentation (Deprecated). + +This module provides instrumentation for the IBM Machine Learning API (deprecated), +including text generation, embeddings, and model management. For the new WatsonX AI SDK, +use the watsonx_ai instrumentor instead. +""" + +import logging +from typing import Collection + +def get_version() -> str: + """Get the version of the IBM Machine Learning SDK, or 'unknown' if not found + + Attempts to retrieve the installed version of the IBM Machine Learning SDK using importlib.metadata. + Falls back to 'unknown' if the version cannot be determined. + + Returns: + The version string of the IBM Machine Learning SDK or 'unknown' + """ + try: + from importlib.metadata import version + return version("ibm-watson-machine-learning") + except ImportError: + logger.debug("Could not find IBM Machine Learning SDK version") + return "unknown" + +LIBRARY_NAME = "ibm-machine-learning" +LIBRARY_VERSION: str = get_version() + +logger = logging.getLogger(__name__) + +# Import after defining constants to avoid circular imports +from agentops.instrumentation.ibm_machine_learning.instrumentor import IBMMachineLearningInstrumentor # noqa: E402 + +__all__ = [ + "LIBRARY_NAME", + "LIBRARY_VERSION", + "IBMMachineLearningInstrumentor", +] \ No newline at end of file diff --git a/agentops/instrumentation/ibm_machine_learning/attributes/__init__.py b/agentops/instrumentation/ibm_machine_learning/attributes/__init__.py new file mode 100644 index 000000000..263548dd0 --- /dev/null +++ b/agentops/instrumentation/ibm_machine_learning/attributes/__init__.py @@ -0,0 +1,12 @@ +"""Attributes for IBM watsonx.ai instrumentation. + +This module provides attribute extraction functions for IBM watsonx.ai operations. +""" + +from agentops.instrumentation.ibm_watsonx_ai.attributes.attributes import ( + get_generate_attributes, +) + +__all__ = [ + "get_generate_attributes", +] \ No newline at end of file diff --git a/agentops/instrumentation/ibm_machine_learning/attributes/attributes.py b/agentops/instrumentation/ibm_machine_learning/attributes/attributes.py new file mode 100644 index 000000000..f306d1614 --- /dev/null +++ b/agentops/instrumentation/ibm_machine_learning/attributes/attributes.py @@ -0,0 +1,241 @@ +"""Attributes for IBM watsonx.ai model instrumentation. + +This module provides attribute extraction functions for IBM watsonx.ai model operations, +focusing on token usage recording. +""" +from typing import Any, Dict, Optional, Tuple +from agentops.instrumentation.common.attributes import AttributeMap +from agentops.semconv import SpanAttributes, MessageAttributes +from agentops.instrumentation.ibm_watsonx_ai.attributes.common import extract_params_attributes + +def get_generate_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None) -> AttributeMap: + """Extract token usage attributes from generate method calls. + + Args: + args: Positional arguments passed to the method + kwargs: Keyword arguments passed to the method + return_value: Return value from the method + + Returns: + Dictionary of token usage attributes to set on the span + """ + attributes = {} + + # Extract prompt from args or kwargs + prompt = None + if args and len(args) > 0: + prompt = args[0] + elif kwargs and 'prompt' in kwargs: + prompt = kwargs['prompt'] + + if prompt: + attributes[MessageAttributes.PROMPT_ROLE.format(i=0)] = "user" + attributes[MessageAttributes.PROMPT_CONTENT.format(i=0)] = prompt + attributes[MessageAttributes.PROMPT_TYPE.format(i=0)] = "text" + + # Extract parameters from args or kwargs + params = None + if args and len(args) > 1: + params = args[1] + elif kwargs and 'params' in kwargs: + params = kwargs['params'] + + if params: + attributes.update(extract_params_attributes(params)) + + # Extract response information + if return_value: + if isinstance(return_value, dict): + # Extract model information + if 'model_id' in return_value: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = return_value['model_id'] + + # Handle results + if 'results' in return_value: + for idx, result in enumerate(return_value['results']): + # Extract completion + if 'generated_text' in result: + attributes[MessageAttributes.COMPLETION_CONTENT.format(i=idx)] = result['generated_text'] + attributes[MessageAttributes.COMPLETION_ROLE.format(i=idx)] = "assistant" + attributes[MessageAttributes.COMPLETION_TYPE.format(i=idx)] = "text" + + # Extract token usage + if 'input_token_count' in result: + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = result['input_token_count'] + if 'generated_token_count' in result: + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = result['generated_token_count'] + if 'input_token_count' in result and 'generated_token_count' in result: + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = result['input_token_count'] + result['generated_token_count'] + + if 'stop_reason' in result: + attributes[SpanAttributes.LLM_RESPONSE_STOP_REASON] = result['stop_reason'] + + return attributes + +def get_tokenize_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None) -> AttributeMap: + """Extract attributes from tokenize method calls. + + Args: + args: Positional arguments passed to the method + kwargs: Keyword arguments passed to the method + return_value: Return value from the method + + Returns: + Dictionary of attributes to set on the span + """ + attributes = {} + + # Extract input from args or kwargs + prompt = None + if args and len(args) > 0: + prompt = args[0] + elif kwargs and "prompt" in kwargs: + prompt = kwargs["prompt"] + + if prompt: + attributes[MessageAttributes.PROMPT_ROLE.format(i=0)] = "user" + attributes[MessageAttributes.PROMPT_CONTENT.format(i=0)] = prompt + attributes[MessageAttributes.PROMPT_TYPE.format(i=0)] = "text" + + # Extract response information + if return_value and isinstance(return_value, dict): + if "model_id" in return_value: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = return_value["model_id"] + if "result" in return_value: + attributes["ibm.watsonx.tokenize.result"] = str(return_value["result"]) + if "token_count" in return_value["result"]: + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = return_value["result"]["token_count"] + + return attributes + +def get_model_details_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None) -> AttributeMap: + """Extract attributes from get_details method calls. + + Args: + args: Positional arguments passed to the method + kwargs: Keyword arguments passed to the method + return_value: Return value from the method + + Returns: + Dictionary of attributes to set on the span + """ + if not isinstance(return_value, dict): + return {} + + # Basic model information + attributes = { + f"ibm.watsonx.model.{key}": value + for key, value in return_value.items() + if key in ["model_id", "label", "provider", "source", "short_description", "long_description", + "number_params", "input_tier", "output_tier"] + } + + # Model functions + if "functions" in return_value: + attributes["ibm.watsonx.model.functions"] = str([func["id"] for func in return_value["functions"]]) + + # Model tasks + if "tasks" in return_value: + task_info = [ + {k: v for k, v in task.items() if k in ["id", "ratings", "tags"]} + for task in return_value["tasks"] + ] + attributes["ibm.watsonx.model.tasks"] = str(task_info) + + # Model limits + if "model_limits" in return_value: + limits = return_value["model_limits"] + attributes.update({ + f"ibm.watsonx.model.{key}": value + for key, value in limits.items() + if key in ["max_sequence_length", "max_output_tokens", "training_data_max_records"] + }) + + # Service tier limits + if "limits" in return_value: + for tier, tier_limits in return_value["limits"].items(): + attributes.update({ + f"ibm.watsonx.model.limits.{tier}.{key}": value + for key, value in tier_limits.items() + if key in ["call_time", "max_output_tokens"] + }) + + # Model lifecycle + if "lifecycle" in return_value: + attributes.update({ + f"ibm.watsonx.model.lifecycle.{stage['id']}": stage["start_date"] + for stage in return_value["lifecycle"] + if "id" in stage and "start_date" in stage + }) + + # Training parameters + if "training_parameters" in return_value: + attributes.update({ + f"ibm.watsonx.model.training.{key}": str(value) if isinstance(value, dict) else value + for key, value in return_value["training_parameters"].items() + }) + + return attributes + +def get_generate_text_stream_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None) -> AttributeMap: + """Extract token usage attributes from generate_text_stream method calls. + + Args: + args: Positional arguments passed to the method + kwargs: Keyword arguments passed to the method + return_value: Return value from the method + + Returns: + Dictionary of token usage attributes to set on the span + """ + attributes = {} + + # Extract prompt from args or kwargs + prompt = None + if args and len(args) > 0: + prompt = args[0] + elif kwargs and 'prompt' in kwargs: + prompt = kwargs['prompt'] + + if prompt: + attributes[MessageAttributes.PROMPT_ROLE.format(i=0)] = "user" + attributes[MessageAttributes.PROMPT_CONTENT.format(i=0)] = prompt + attributes[MessageAttributes.PROMPT_TYPE.format(i=0)] = "text" + + # Extract parameters from args or kwargs + params = None + if args and len(args) > 1: + params = args[1] + elif kwargs and 'params' in kwargs: + params = kwargs['params'] + + if params: + attributes.update(extract_params_attributes(params)) + + # For streaming responses, we'll update the attributes as we receive chunks + if return_value and isinstance(return_value, dict): + # Extract model information + if 'model_id' in return_value: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = return_value['model_id'] + + # Handle results + if 'results' in return_value: + for idx, result in enumerate(return_value['results']): + # Extract completion + if 'generated_text' in result: + attributes[MessageAttributes.COMPLETION_CONTENT.format(i=idx)] = result['generated_text'] + attributes[MessageAttributes.COMPLETION_ROLE.format(i=idx)] = "assistant" + attributes[MessageAttributes.COMPLETION_TYPE.format(i=idx)] = "text" + + # Extract token usage + if 'input_token_count' in result: + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = result['input_token_count'] + if 'generated_token_count' in result: + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = result['generated_token_count'] + if 'input_token_count' in result and 'generated_token_count' in result: + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = result['input_token_count'] + result['generated_token_count'] + + if 'stop_reason' in result: + attributes[SpanAttributes.LLM_RESPONSE_STOP_REASON] = result['stop_reason'] + + return attributes \ No newline at end of file diff --git a/agentops/instrumentation/ibm_machine_learning/attributes/common.py b/agentops/instrumentation/ibm_machine_learning/attributes/common.py new file mode 100644 index 000000000..9fa9cdf6e --- /dev/null +++ b/agentops/instrumentation/ibm_machine_learning/attributes/common.py @@ -0,0 +1,66 @@ +"""Common utilities and constants for IBM watsonx.ai attribute processing. + +This module contains shared constants, attribute mappings, and utility functions for processing +trace and span attributes in IBM watsonx.ai instrumentation. It provides the core functionality +for extracting and formatting attributes according to OpenTelemetry semantic conventions. +""" +from typing import Any, Dict +from agentops.instrumentation.common.attributes import AttributeMap +from agentops.semconv import SpanAttributes + +# Mapping of generation parameters to their OpenTelemetry attribute names +GENERATION_PARAM_ATTRIBUTES: AttributeMap = { + 'max_new_tokens': SpanAttributes.LLM_REQUEST_MAX_TOKENS, + 'min_new_tokens': 'ibm.watsonx.min_new_tokens', + 'temperature': SpanAttributes.LLM_REQUEST_TEMPERATURE, + 'top_p': SpanAttributes.LLM_REQUEST_TOP_P, + 'top_k': 'ibm.watsonx.top_k', + 'repetition_penalty': 'ibm.watsonx.repetition_penalty', + 'time_limit': 'ibm.watsonx.time_limit', + 'random_seed': 'ibm.watsonx.random_seed', + 'stop_sequences': 'ibm.watsonx.stop_sequences', + 'truncate_input_tokens': 'ibm.watsonx.truncate_input_tokens', + 'decoding_method': 'ibm.watsonx.decoding_method', +} + +# Mapping of guardrail parameters to their OpenTelemetry attribute names +GUARDRAIL_PARAM_ATTRIBUTES: AttributeMap = { + 'guardrails': 'ibm.watsonx.guardrails.enabled', + 'guardrails_hap_params': 'ibm.watsonx.guardrails.hap_params', + 'guardrails_pii_params': 'ibm.watsonx.guardrails.pii_params', +} + +def extract_params_attributes(params: Dict[str, Any]) -> AttributeMap: + """Extract generation parameters from a params dictionary. + + Args: + params: Dictionary of generation parameters + + Returns: + Dictionary of attributes to set on the span + """ + attributes = {} + + # Extract standard generation parameters + for param_name, attr_name in GENERATION_PARAM_ATTRIBUTES.items(): + if param_name in params: + value = params[param_name] + # Convert lists to strings for attributes + if isinstance(value, list): + value = str(value) + attributes[attr_name] = value + + # Extract guardrail parameters + for param_name, attr_name in GUARDRAIL_PARAM_ATTRIBUTES.items(): + if param_name in params: + value = params[param_name] + # Convert dicts to strings for attributes + if isinstance(value, dict): + value = str(value) + attributes[attr_name] = value + + # Extract concurrency limit + if 'concurrency_limit' in params: + attributes['ibm.watsonx.concurrency_limit'] = params['concurrency_limit'] + + return attributes \ No newline at end of file diff --git a/agentops/instrumentation/ibm_machine_learning/instrumentor.py b/agentops/instrumentation/ibm_machine_learning/instrumentor.py new file mode 100644 index 000000000..75967bc7b --- /dev/null +++ b/agentops/instrumentation/ibm_machine_learning/instrumentor.py @@ -0,0 +1,243 @@ +"""IBM Machine Learning Instrumentation for AgentOps + +This module provides instrumentation for the IBM Machine Learning API (deprecated), implementing OpenTelemetry +instrumentation for model requests and responses. This instrumentor is for the legacy IBM Machine Learning SDK. +For the new WatsonX AI SDK, use the watsonx_ai instrumentor instead. + +We focus on instrumenting the following key endpoints: +- Model.generate - Text generation API +- Model.tokenize - Tokenization API +- Model.get_details - Model details API +""" +from typing import List, Optional, Collection +from opentelemetry.trace import get_tracer, SpanKind +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.metrics import get_meter +from wrapt import wrap_function_wrapper + +from agentops.logging import logger +from agentops.instrumentation.common.wrappers import WrapConfig, wrap, unwrap +from agentops.instrumentation.ibm_machine_learning import LIBRARY_NAME, LIBRARY_VERSION +from agentops.instrumentation.ibm_machine_learning.attributes.attributes import ( + get_generate_attributes, + get_tokenize_attributes, + get_model_details_attributes, + get_generate_text_stream_attributes, +) +from agentops.semconv import ( + SpanAttributes, + Meters, + LLMRequestTypeValues, + CoreAttributes, + MessageAttributes +) + +# Methods to wrap for instrumentation +WRAPPED_METHODS: List[WrapConfig] = [ + # Model-based API methods + WrapConfig( + trace_name="ibm_ml.generate", + package="ibm_watson_machine_learning.foundation_models", + class_name="Model", + method_name="generate", + handler=get_generate_attributes, + ), + WrapConfig( + trace_name="ibm_ml.generate_text_stream", + package="ibm_watson_machine_learning.foundation_models", + class_name="Model", + method_name="generate_text_stream", + handler=get_generate_text_stream_attributes, + ), + WrapConfig( + trace_name="ibm_ml.tokenize", + package="ibm_watson_machine_learning.foundation_models", + class_name="Model", + method_name="tokenize", + handler=get_tokenize_attributes, + ), + WrapConfig( + trace_name="ibm_ml.get_details", + package="ibm_watson_machine_learning.foundation_models", + class_name="Model", + method_name="get_details", + handler=get_model_details_attributes, + ), +] + +class TracedStream: + """A wrapper for IBM Machine Learning's streaming response that adds telemetry. + + This class wraps the original stream to capture metrics about the streaming process, + including token counts, content, and errors. + """ + + def __init__(self, original_stream, span): + """Initialize with the original stream and span. + + Args: + original_stream: The IBM Machine Learning stream to wrap + span: The OpenTelemetry span to record metrics on + """ + self.original_stream = original_stream + self.span = span + self.completion_content = "" + self.input_tokens = 0 + self.output_tokens = 0 + + def __iter__(self): + """Iterate through chunks, tracking tokens and content. + + Yields: + Chunks from the original stream + """ + try: + for chunk in self.original_stream: + try: + if isinstance(chunk, dict) and 'results' in chunk: + for result in chunk['results']: + if 'generated_text' in result: + self.completion_content += result['generated_text'] + + if 'input_token_count' in result: + self.input_tokens = result['input_token_count'] + + if 'generated_token_count' in result: + self.output_tokens = result['generated_token_count'] + + # Update span attributes with current token counts + self.span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, self.input_tokens) + self.span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, self.output_tokens) + self.span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, self.input_tokens + self.output_tokens) + + # Update completion content + if self.completion_content: + self.span.set_attribute(MessageAttributes.COMPLETION_TYPE.format(i=0), "text") + self.span.set_attribute(MessageAttributes.COMPLETION_ROLE.format(i=0), "assistant") + self.span.set_attribute(MessageAttributes.COMPLETION_CONTENT.format(i=0), self.completion_content) + except Exception as e: + logger.debug(f"Error processing stream chunk: {e}") + + yield chunk + finally: + # End the span when the stream is exhausted + if self.span.is_recording(): + self.span.end() + +def generate_text_stream_wrapper(wrapped, instance, args, kwargs): + """Wrapper for the Model.generate_text_stream method. + + This wrapper creates spans for tracking stream performance and injects + a stream wrapper to capture streaming events. + + Args: + wrapped: The original stream method + instance: The instance the method is bound to + args: Positional arguments to the method + kwargs: Keyword arguments to the method + + Returns: + A wrapped stream that captures telemetry data + """ + tracer = get_tracer(LIBRARY_NAME, LIBRARY_VERSION) + span = tracer.start_span( + "ibm_ml.generate_text_stream", + kind=SpanKind.CLIENT, + attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.COMPLETION.value}, + ) + + # Extract prompt and parameters + prompt = None + if args and len(args) > 0: + prompt = args[0] + elif kwargs and 'prompt' in kwargs: + prompt = kwargs['prompt'] + + if prompt: + span.set_attribute(MessageAttributes.PROMPT_ROLE.format(i=0), "user") + span.set_attribute(MessageAttributes.PROMPT_CONTENT.format(i=0), prompt) + span.set_attribute(MessageAttributes.PROMPT_TYPE.format(i=0), "text") + + # Extract parameters from args or kwargs + params = None + if args and len(args) > 1: + params = args[1] + elif kwargs and 'params' in kwargs: + params = kwargs['params'] + + if params: + # Use common attribute extraction + from agentops.instrumentation.ibm_machine_learning.attributes.common import extract_params_attributes + span_attributes = extract_params_attributes(params) + for key, value in span_attributes.items(): + span.set_attribute(key, value) + + span.set_attribute(SpanAttributes.LLM_REQUEST_STREAMING, True) + + try: + stream = wrapped(*args, **kwargs) + return TracedStream(stream, span) + except Exception as e: + span.record_exception(e) + span.set_attribute(CoreAttributes.ERROR_MESSAGE, str(e)) + span.set_attribute(CoreAttributes.ERROR_TYPE, e.__class__.__name__) + span.end() + raise + +class IBMMachineLearningInstrumentor(BaseInstrumentor): + """An instrumentor for IBM Machine Learning API (deprecated). + + This class provides instrumentation for IBM's Machine Learning API by wrapping key methods + in the client library and capturing telemetry data. It supports both synchronous and + asynchronous API calls. This is for the legacy IBM Machine Learning SDK - for the new + WatsonX AI SDK, use the watsonx_ai instrumentor instead. + + It captures metrics including token usage, operation duration, and exceptions. + """ + + def instrumentation_dependencies(self) -> Collection[str]: + """Return packages required for instrumentation. + + Returns: + A collection of package specifications required for this instrumentation. + """ + return ["ibm-watson-machine-learning"] + + def _instrument(self, **kwargs): + """Instrument the IBM Machine Learning API. + + This method wraps key methods in the IBM Machine Learning API to add telemetry. + It sets up tracing for model operations and token usage tracking. + """ + # Wrap the generate_text_stream method separately since it needs custom handling + wrap_function_wrapper( + "ibm_watson_machine_learning.foundation_models", + "Model.generate_text_stream", + generate_text_stream_wrapper + ) + + # Wrap other methods using the standard wrapper + for method in WRAPPED_METHODS: + if method.method_name != "generate_text_stream": # Skip since we handled it above + wrap(method) + + logger.debug("Instrumented IBM Machine Learning API") + + def _uninstrument(self, **kwargs): + """Remove instrumentation from the IBM Machine Learning API. + + This method removes the telemetry wrappers from the API methods. + """ + unwrap( + "ibm_watson_machine_learning.foundation_models", + "Model.generate_text_stream" + ) + + for method in WRAPPED_METHODS: + if method.method_name != "generate_text_stream": + unwrap( + method.package, + f"{method.class_name}.{method.method_name}" + ) + + logger.debug("Uninstrumented IBM Machine Learning API") \ No newline at end of file diff --git a/agentops/instrumentation/ibm_watsonx_ai/instrumentor.py b/agentops/instrumentation/ibm_watsonx_ai/instrumentor.py index 4a2d9ab07..6899351e4 100644 --- a/agentops/instrumentation/ibm_watsonx_ai/instrumentor.py +++ b/agentops/instrumentation/ibm_watsonx_ai/instrumentor.py @@ -33,32 +33,53 @@ # Methods to wrap for instrumentation WRAPPED_METHODS: List[WrapConfig] = [ - # Model-based API methods + # Core generation methods WrapConfig( trace_name="watsonx.generate", - package="ibm_watson_machine_learning.foundation_models", - class_name="Model", + package="ibm_watsonx_ai.foundation_models.inference", + class_name="ModelInference", method_name="generate", handler=get_generate_attributes, ), + WrapConfig( + trace_name="watsonx.generate_text", + package="ibm_watsonx_ai.foundation_models.inference", + class_name="ModelInference", + method_name="generate_text", + handler=get_generate_attributes, + ), WrapConfig( trace_name="watsonx.generate_text_stream", - package="ibm_watson_machine_learning.foundation_models", - class_name="Model", + package="ibm_watsonx_ai.foundation_models.inference", + class_name="ModelInference", method_name="generate_text_stream", handler=get_generate_text_stream_attributes, ), + WrapConfig( + trace_name="watsonx.chat", + package="ibm_watsonx_ai.foundation_models.inference", + class_name="ModelInference", + method_name="chat", + handler=get_generate_attributes, + ), + WrapConfig( + trace_name="watsonx.chat_stream", + package="ibm_watsonx_ai.foundation_models.inference", + class_name="ModelInference", + method_name="chat_stream", + handler=get_generate_text_stream_attributes, + ), WrapConfig( trace_name="watsonx.tokenize", - package="ibm_watson_machine_learning.foundation_models", - class_name="Model", + package="ibm_watsonx_ai.foundation_models.inference", + class_name="ModelInference", method_name="tokenize", handler=get_tokenize_attributes, ), WrapConfig( trace_name="watsonx.get_details", - package="ibm_watson_machine_learning.foundation_models", - class_name="Model", + package="ibm_watsonx_ai.foundation_models.inference", + class_name="ModelInference", method_name="get_details", handler=get_model_details_attributes, ), From ecdc06da72d8dadd1a4422d1c2454159936722fa Mon Sep 17 00:00:00 2001 From: Dwij Patel Date: Thu, 1 May 2025 07:50:17 +0530 Subject: [PATCH 3/6] Remove deprecated IBM Machine Learning instrumentation and update IBMWatsonXInstrumentor to reflect changes in library dependencies. --- agentops/instrumentation/__init__.py | 7 +- .../ibm_machine_learning/__init__.py | 39 --- .../attributes/__init__.py | 12 - .../attributes/attributes.py | 241 ----------------- .../ibm_machine_learning/attributes/common.py | 66 ----- .../ibm_machine_learning/instrumentor.py | 243 ------------------ .../ibm_watsonx_ai/__init__.py | 5 +- .../ibm_watsonx_ai/instrumentor.py | 2 +- 8 files changed, 4 insertions(+), 611 deletions(-) delete mode 100644 agentops/instrumentation/ibm_machine_learning/__init__.py delete mode 100644 agentops/instrumentation/ibm_machine_learning/attributes/__init__.py delete mode 100644 agentops/instrumentation/ibm_machine_learning/attributes/attributes.py delete mode 100644 agentops/instrumentation/ibm_machine_learning/attributes/common.py delete mode 100644 agentops/instrumentation/ibm_machine_learning/instrumentor.py diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index a7ab221eb..53e03ae4f 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -81,12 +81,7 @@ def get_instance(self) -> BaseInstrumentor: module_name="agentops.instrumentation.ibm_watsonx_ai", class_name="IBMWatsonXInstrumentor", provider_import_name="ibm_watsonx_ai", - ), - InstrumentorLoader( - module_name="agentops.instrumentation.ibm_machine_learning", - class_name="IBMMachineLearningInstrumentor", - provider_import_name="ibm_watson_machine_learning", - ), + ) ] diff --git a/agentops/instrumentation/ibm_machine_learning/__init__.py b/agentops/instrumentation/ibm_machine_learning/__init__.py deleted file mode 100644 index 0923660cc..000000000 --- a/agentops/instrumentation/ibm_machine_learning/__init__.py +++ /dev/null @@ -1,39 +0,0 @@ -"""IBM Machine Learning API instrumentation (Deprecated). - -This module provides instrumentation for the IBM Machine Learning API (deprecated), -including text generation, embeddings, and model management. For the new WatsonX AI SDK, -use the watsonx_ai instrumentor instead. -""" - -import logging -from typing import Collection - -def get_version() -> str: - """Get the version of the IBM Machine Learning SDK, or 'unknown' if not found - - Attempts to retrieve the installed version of the IBM Machine Learning SDK using importlib.metadata. - Falls back to 'unknown' if the version cannot be determined. - - Returns: - The version string of the IBM Machine Learning SDK or 'unknown' - """ - try: - from importlib.metadata import version - return version("ibm-watson-machine-learning") - except ImportError: - logger.debug("Could not find IBM Machine Learning SDK version") - return "unknown" - -LIBRARY_NAME = "ibm-machine-learning" -LIBRARY_VERSION: str = get_version() - -logger = logging.getLogger(__name__) - -# Import after defining constants to avoid circular imports -from agentops.instrumentation.ibm_machine_learning.instrumentor import IBMMachineLearningInstrumentor # noqa: E402 - -__all__ = [ - "LIBRARY_NAME", - "LIBRARY_VERSION", - "IBMMachineLearningInstrumentor", -] \ No newline at end of file diff --git a/agentops/instrumentation/ibm_machine_learning/attributes/__init__.py b/agentops/instrumentation/ibm_machine_learning/attributes/__init__.py deleted file mode 100644 index 263548dd0..000000000 --- a/agentops/instrumentation/ibm_machine_learning/attributes/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -"""Attributes for IBM watsonx.ai instrumentation. - -This module provides attribute extraction functions for IBM watsonx.ai operations. -""" - -from agentops.instrumentation.ibm_watsonx_ai.attributes.attributes import ( - get_generate_attributes, -) - -__all__ = [ - "get_generate_attributes", -] \ No newline at end of file diff --git a/agentops/instrumentation/ibm_machine_learning/attributes/attributes.py b/agentops/instrumentation/ibm_machine_learning/attributes/attributes.py deleted file mode 100644 index f306d1614..000000000 --- a/agentops/instrumentation/ibm_machine_learning/attributes/attributes.py +++ /dev/null @@ -1,241 +0,0 @@ -"""Attributes for IBM watsonx.ai model instrumentation. - -This module provides attribute extraction functions for IBM watsonx.ai model operations, -focusing on token usage recording. -""" -from typing import Any, Dict, Optional, Tuple -from agentops.instrumentation.common.attributes import AttributeMap -from agentops.semconv import SpanAttributes, MessageAttributes -from agentops.instrumentation.ibm_watsonx_ai.attributes.common import extract_params_attributes - -def get_generate_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None) -> AttributeMap: - """Extract token usage attributes from generate method calls. - - Args: - args: Positional arguments passed to the method - kwargs: Keyword arguments passed to the method - return_value: Return value from the method - - Returns: - Dictionary of token usage attributes to set on the span - """ - attributes = {} - - # Extract prompt from args or kwargs - prompt = None - if args and len(args) > 0: - prompt = args[0] - elif kwargs and 'prompt' in kwargs: - prompt = kwargs['prompt'] - - if prompt: - attributes[MessageAttributes.PROMPT_ROLE.format(i=0)] = "user" - attributes[MessageAttributes.PROMPT_CONTENT.format(i=0)] = prompt - attributes[MessageAttributes.PROMPT_TYPE.format(i=0)] = "text" - - # Extract parameters from args or kwargs - params = None - if args and len(args) > 1: - params = args[1] - elif kwargs and 'params' in kwargs: - params = kwargs['params'] - - if params: - attributes.update(extract_params_attributes(params)) - - # Extract response information - if return_value: - if isinstance(return_value, dict): - # Extract model information - if 'model_id' in return_value: - attributes[SpanAttributes.LLM_REQUEST_MODEL] = return_value['model_id'] - - # Handle results - if 'results' in return_value: - for idx, result in enumerate(return_value['results']): - # Extract completion - if 'generated_text' in result: - attributes[MessageAttributes.COMPLETION_CONTENT.format(i=idx)] = result['generated_text'] - attributes[MessageAttributes.COMPLETION_ROLE.format(i=idx)] = "assistant" - attributes[MessageAttributes.COMPLETION_TYPE.format(i=idx)] = "text" - - # Extract token usage - if 'input_token_count' in result: - attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = result['input_token_count'] - if 'generated_token_count' in result: - attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = result['generated_token_count'] - if 'input_token_count' in result and 'generated_token_count' in result: - attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = result['input_token_count'] + result['generated_token_count'] - - if 'stop_reason' in result: - attributes[SpanAttributes.LLM_RESPONSE_STOP_REASON] = result['stop_reason'] - - return attributes - -def get_tokenize_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None) -> AttributeMap: - """Extract attributes from tokenize method calls. - - Args: - args: Positional arguments passed to the method - kwargs: Keyword arguments passed to the method - return_value: Return value from the method - - Returns: - Dictionary of attributes to set on the span - """ - attributes = {} - - # Extract input from args or kwargs - prompt = None - if args and len(args) > 0: - prompt = args[0] - elif kwargs and "prompt" in kwargs: - prompt = kwargs["prompt"] - - if prompt: - attributes[MessageAttributes.PROMPT_ROLE.format(i=0)] = "user" - attributes[MessageAttributes.PROMPT_CONTENT.format(i=0)] = prompt - attributes[MessageAttributes.PROMPT_TYPE.format(i=0)] = "text" - - # Extract response information - if return_value and isinstance(return_value, dict): - if "model_id" in return_value: - attributes[SpanAttributes.LLM_REQUEST_MODEL] = return_value["model_id"] - if "result" in return_value: - attributes["ibm.watsonx.tokenize.result"] = str(return_value["result"]) - if "token_count" in return_value["result"]: - attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = return_value["result"]["token_count"] - - return attributes - -def get_model_details_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None) -> AttributeMap: - """Extract attributes from get_details method calls. - - Args: - args: Positional arguments passed to the method - kwargs: Keyword arguments passed to the method - return_value: Return value from the method - - Returns: - Dictionary of attributes to set on the span - """ - if not isinstance(return_value, dict): - return {} - - # Basic model information - attributes = { - f"ibm.watsonx.model.{key}": value - for key, value in return_value.items() - if key in ["model_id", "label", "provider", "source", "short_description", "long_description", - "number_params", "input_tier", "output_tier"] - } - - # Model functions - if "functions" in return_value: - attributes["ibm.watsonx.model.functions"] = str([func["id"] for func in return_value["functions"]]) - - # Model tasks - if "tasks" in return_value: - task_info = [ - {k: v for k, v in task.items() if k in ["id", "ratings", "tags"]} - for task in return_value["tasks"] - ] - attributes["ibm.watsonx.model.tasks"] = str(task_info) - - # Model limits - if "model_limits" in return_value: - limits = return_value["model_limits"] - attributes.update({ - f"ibm.watsonx.model.{key}": value - for key, value in limits.items() - if key in ["max_sequence_length", "max_output_tokens", "training_data_max_records"] - }) - - # Service tier limits - if "limits" in return_value: - for tier, tier_limits in return_value["limits"].items(): - attributes.update({ - f"ibm.watsonx.model.limits.{tier}.{key}": value - for key, value in tier_limits.items() - if key in ["call_time", "max_output_tokens"] - }) - - # Model lifecycle - if "lifecycle" in return_value: - attributes.update({ - f"ibm.watsonx.model.lifecycle.{stage['id']}": stage["start_date"] - for stage in return_value["lifecycle"] - if "id" in stage and "start_date" in stage - }) - - # Training parameters - if "training_parameters" in return_value: - attributes.update({ - f"ibm.watsonx.model.training.{key}": str(value) if isinstance(value, dict) else value - for key, value in return_value["training_parameters"].items() - }) - - return attributes - -def get_generate_text_stream_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None) -> AttributeMap: - """Extract token usage attributes from generate_text_stream method calls. - - Args: - args: Positional arguments passed to the method - kwargs: Keyword arguments passed to the method - return_value: Return value from the method - - Returns: - Dictionary of token usage attributes to set on the span - """ - attributes = {} - - # Extract prompt from args or kwargs - prompt = None - if args and len(args) > 0: - prompt = args[0] - elif kwargs and 'prompt' in kwargs: - prompt = kwargs['prompt'] - - if prompt: - attributes[MessageAttributes.PROMPT_ROLE.format(i=0)] = "user" - attributes[MessageAttributes.PROMPT_CONTENT.format(i=0)] = prompt - attributes[MessageAttributes.PROMPT_TYPE.format(i=0)] = "text" - - # Extract parameters from args or kwargs - params = None - if args and len(args) > 1: - params = args[1] - elif kwargs and 'params' in kwargs: - params = kwargs['params'] - - if params: - attributes.update(extract_params_attributes(params)) - - # For streaming responses, we'll update the attributes as we receive chunks - if return_value and isinstance(return_value, dict): - # Extract model information - if 'model_id' in return_value: - attributes[SpanAttributes.LLM_REQUEST_MODEL] = return_value['model_id'] - - # Handle results - if 'results' in return_value: - for idx, result in enumerate(return_value['results']): - # Extract completion - if 'generated_text' in result: - attributes[MessageAttributes.COMPLETION_CONTENT.format(i=idx)] = result['generated_text'] - attributes[MessageAttributes.COMPLETION_ROLE.format(i=idx)] = "assistant" - attributes[MessageAttributes.COMPLETION_TYPE.format(i=idx)] = "text" - - # Extract token usage - if 'input_token_count' in result: - attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = result['input_token_count'] - if 'generated_token_count' in result: - attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = result['generated_token_count'] - if 'input_token_count' in result and 'generated_token_count' in result: - attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = result['input_token_count'] + result['generated_token_count'] - - if 'stop_reason' in result: - attributes[SpanAttributes.LLM_RESPONSE_STOP_REASON] = result['stop_reason'] - - return attributes \ No newline at end of file diff --git a/agentops/instrumentation/ibm_machine_learning/attributes/common.py b/agentops/instrumentation/ibm_machine_learning/attributes/common.py deleted file mode 100644 index 9fa9cdf6e..000000000 --- a/agentops/instrumentation/ibm_machine_learning/attributes/common.py +++ /dev/null @@ -1,66 +0,0 @@ -"""Common utilities and constants for IBM watsonx.ai attribute processing. - -This module contains shared constants, attribute mappings, and utility functions for processing -trace and span attributes in IBM watsonx.ai instrumentation. It provides the core functionality -for extracting and formatting attributes according to OpenTelemetry semantic conventions. -""" -from typing import Any, Dict -from agentops.instrumentation.common.attributes import AttributeMap -from agentops.semconv import SpanAttributes - -# Mapping of generation parameters to their OpenTelemetry attribute names -GENERATION_PARAM_ATTRIBUTES: AttributeMap = { - 'max_new_tokens': SpanAttributes.LLM_REQUEST_MAX_TOKENS, - 'min_new_tokens': 'ibm.watsonx.min_new_tokens', - 'temperature': SpanAttributes.LLM_REQUEST_TEMPERATURE, - 'top_p': SpanAttributes.LLM_REQUEST_TOP_P, - 'top_k': 'ibm.watsonx.top_k', - 'repetition_penalty': 'ibm.watsonx.repetition_penalty', - 'time_limit': 'ibm.watsonx.time_limit', - 'random_seed': 'ibm.watsonx.random_seed', - 'stop_sequences': 'ibm.watsonx.stop_sequences', - 'truncate_input_tokens': 'ibm.watsonx.truncate_input_tokens', - 'decoding_method': 'ibm.watsonx.decoding_method', -} - -# Mapping of guardrail parameters to their OpenTelemetry attribute names -GUARDRAIL_PARAM_ATTRIBUTES: AttributeMap = { - 'guardrails': 'ibm.watsonx.guardrails.enabled', - 'guardrails_hap_params': 'ibm.watsonx.guardrails.hap_params', - 'guardrails_pii_params': 'ibm.watsonx.guardrails.pii_params', -} - -def extract_params_attributes(params: Dict[str, Any]) -> AttributeMap: - """Extract generation parameters from a params dictionary. - - Args: - params: Dictionary of generation parameters - - Returns: - Dictionary of attributes to set on the span - """ - attributes = {} - - # Extract standard generation parameters - for param_name, attr_name in GENERATION_PARAM_ATTRIBUTES.items(): - if param_name in params: - value = params[param_name] - # Convert lists to strings for attributes - if isinstance(value, list): - value = str(value) - attributes[attr_name] = value - - # Extract guardrail parameters - for param_name, attr_name in GUARDRAIL_PARAM_ATTRIBUTES.items(): - if param_name in params: - value = params[param_name] - # Convert dicts to strings for attributes - if isinstance(value, dict): - value = str(value) - attributes[attr_name] = value - - # Extract concurrency limit - if 'concurrency_limit' in params: - attributes['ibm.watsonx.concurrency_limit'] = params['concurrency_limit'] - - return attributes \ No newline at end of file diff --git a/agentops/instrumentation/ibm_machine_learning/instrumentor.py b/agentops/instrumentation/ibm_machine_learning/instrumentor.py deleted file mode 100644 index 75967bc7b..000000000 --- a/agentops/instrumentation/ibm_machine_learning/instrumentor.py +++ /dev/null @@ -1,243 +0,0 @@ -"""IBM Machine Learning Instrumentation for AgentOps - -This module provides instrumentation for the IBM Machine Learning API (deprecated), implementing OpenTelemetry -instrumentation for model requests and responses. This instrumentor is for the legacy IBM Machine Learning SDK. -For the new WatsonX AI SDK, use the watsonx_ai instrumentor instead. - -We focus on instrumenting the following key endpoints: -- Model.generate - Text generation API -- Model.tokenize - Tokenization API -- Model.get_details - Model details API -""" -from typing import List, Optional, Collection -from opentelemetry.trace import get_tracer, SpanKind -from opentelemetry.instrumentation.instrumentor import BaseInstrumentor -from opentelemetry.metrics import get_meter -from wrapt import wrap_function_wrapper - -from agentops.logging import logger -from agentops.instrumentation.common.wrappers import WrapConfig, wrap, unwrap -from agentops.instrumentation.ibm_machine_learning import LIBRARY_NAME, LIBRARY_VERSION -from agentops.instrumentation.ibm_machine_learning.attributes.attributes import ( - get_generate_attributes, - get_tokenize_attributes, - get_model_details_attributes, - get_generate_text_stream_attributes, -) -from agentops.semconv import ( - SpanAttributes, - Meters, - LLMRequestTypeValues, - CoreAttributes, - MessageAttributes -) - -# Methods to wrap for instrumentation -WRAPPED_METHODS: List[WrapConfig] = [ - # Model-based API methods - WrapConfig( - trace_name="ibm_ml.generate", - package="ibm_watson_machine_learning.foundation_models", - class_name="Model", - method_name="generate", - handler=get_generate_attributes, - ), - WrapConfig( - trace_name="ibm_ml.generate_text_stream", - package="ibm_watson_machine_learning.foundation_models", - class_name="Model", - method_name="generate_text_stream", - handler=get_generate_text_stream_attributes, - ), - WrapConfig( - trace_name="ibm_ml.tokenize", - package="ibm_watson_machine_learning.foundation_models", - class_name="Model", - method_name="tokenize", - handler=get_tokenize_attributes, - ), - WrapConfig( - trace_name="ibm_ml.get_details", - package="ibm_watson_machine_learning.foundation_models", - class_name="Model", - method_name="get_details", - handler=get_model_details_attributes, - ), -] - -class TracedStream: - """A wrapper for IBM Machine Learning's streaming response that adds telemetry. - - This class wraps the original stream to capture metrics about the streaming process, - including token counts, content, and errors. - """ - - def __init__(self, original_stream, span): - """Initialize with the original stream and span. - - Args: - original_stream: The IBM Machine Learning stream to wrap - span: The OpenTelemetry span to record metrics on - """ - self.original_stream = original_stream - self.span = span - self.completion_content = "" - self.input_tokens = 0 - self.output_tokens = 0 - - def __iter__(self): - """Iterate through chunks, tracking tokens and content. - - Yields: - Chunks from the original stream - """ - try: - for chunk in self.original_stream: - try: - if isinstance(chunk, dict) and 'results' in chunk: - for result in chunk['results']: - if 'generated_text' in result: - self.completion_content += result['generated_text'] - - if 'input_token_count' in result: - self.input_tokens = result['input_token_count'] - - if 'generated_token_count' in result: - self.output_tokens = result['generated_token_count'] - - # Update span attributes with current token counts - self.span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, self.input_tokens) - self.span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, self.output_tokens) - self.span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, self.input_tokens + self.output_tokens) - - # Update completion content - if self.completion_content: - self.span.set_attribute(MessageAttributes.COMPLETION_TYPE.format(i=0), "text") - self.span.set_attribute(MessageAttributes.COMPLETION_ROLE.format(i=0), "assistant") - self.span.set_attribute(MessageAttributes.COMPLETION_CONTENT.format(i=0), self.completion_content) - except Exception as e: - logger.debug(f"Error processing stream chunk: {e}") - - yield chunk - finally: - # End the span when the stream is exhausted - if self.span.is_recording(): - self.span.end() - -def generate_text_stream_wrapper(wrapped, instance, args, kwargs): - """Wrapper for the Model.generate_text_stream method. - - This wrapper creates spans for tracking stream performance and injects - a stream wrapper to capture streaming events. - - Args: - wrapped: The original stream method - instance: The instance the method is bound to - args: Positional arguments to the method - kwargs: Keyword arguments to the method - - Returns: - A wrapped stream that captures telemetry data - """ - tracer = get_tracer(LIBRARY_NAME, LIBRARY_VERSION) - span = tracer.start_span( - "ibm_ml.generate_text_stream", - kind=SpanKind.CLIENT, - attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.COMPLETION.value}, - ) - - # Extract prompt and parameters - prompt = None - if args and len(args) > 0: - prompt = args[0] - elif kwargs and 'prompt' in kwargs: - prompt = kwargs['prompt'] - - if prompt: - span.set_attribute(MessageAttributes.PROMPT_ROLE.format(i=0), "user") - span.set_attribute(MessageAttributes.PROMPT_CONTENT.format(i=0), prompt) - span.set_attribute(MessageAttributes.PROMPT_TYPE.format(i=0), "text") - - # Extract parameters from args or kwargs - params = None - if args and len(args) > 1: - params = args[1] - elif kwargs and 'params' in kwargs: - params = kwargs['params'] - - if params: - # Use common attribute extraction - from agentops.instrumentation.ibm_machine_learning.attributes.common import extract_params_attributes - span_attributes = extract_params_attributes(params) - for key, value in span_attributes.items(): - span.set_attribute(key, value) - - span.set_attribute(SpanAttributes.LLM_REQUEST_STREAMING, True) - - try: - stream = wrapped(*args, **kwargs) - return TracedStream(stream, span) - except Exception as e: - span.record_exception(e) - span.set_attribute(CoreAttributes.ERROR_MESSAGE, str(e)) - span.set_attribute(CoreAttributes.ERROR_TYPE, e.__class__.__name__) - span.end() - raise - -class IBMMachineLearningInstrumentor(BaseInstrumentor): - """An instrumentor for IBM Machine Learning API (deprecated). - - This class provides instrumentation for IBM's Machine Learning API by wrapping key methods - in the client library and capturing telemetry data. It supports both synchronous and - asynchronous API calls. This is for the legacy IBM Machine Learning SDK - for the new - WatsonX AI SDK, use the watsonx_ai instrumentor instead. - - It captures metrics including token usage, operation duration, and exceptions. - """ - - def instrumentation_dependencies(self) -> Collection[str]: - """Return packages required for instrumentation. - - Returns: - A collection of package specifications required for this instrumentation. - """ - return ["ibm-watson-machine-learning"] - - def _instrument(self, **kwargs): - """Instrument the IBM Machine Learning API. - - This method wraps key methods in the IBM Machine Learning API to add telemetry. - It sets up tracing for model operations and token usage tracking. - """ - # Wrap the generate_text_stream method separately since it needs custom handling - wrap_function_wrapper( - "ibm_watson_machine_learning.foundation_models", - "Model.generate_text_stream", - generate_text_stream_wrapper - ) - - # Wrap other methods using the standard wrapper - for method in WRAPPED_METHODS: - if method.method_name != "generate_text_stream": # Skip since we handled it above - wrap(method) - - logger.debug("Instrumented IBM Machine Learning API") - - def _uninstrument(self, **kwargs): - """Remove instrumentation from the IBM Machine Learning API. - - This method removes the telemetry wrappers from the API methods. - """ - unwrap( - "ibm_watson_machine_learning.foundation_models", - "Model.generate_text_stream" - ) - - for method in WRAPPED_METHODS: - if method.method_name != "generate_text_stream": - unwrap( - method.package, - f"{method.class_name}.{method.method_name}" - ) - - logger.debug("Uninstrumented IBM Machine Learning API") \ No newline at end of file diff --git a/agentops/instrumentation/ibm_watsonx_ai/__init__.py b/agentops/instrumentation/ibm_watsonx_ai/__init__.py index b80a18170..34914182c 100644 --- a/agentops/instrumentation/ibm_watsonx_ai/__init__.py +++ b/agentops/instrumentation/ibm_watsonx_ai/__init__.py @@ -6,6 +6,7 @@ import logging from typing import Collection +logger = logging.getLogger(__name__) def get_version() -> str: """Get the version of the IBM watsonx.ai SDK, or 'unknown' if not found @@ -23,11 +24,9 @@ def get_version() -> str: logger.debug("Could not find IBM watsonx.ai SDK version") return "unknown" -LIBRARY_NAME = "ibm-watsonx" +LIBRARY_NAME = "ibm-watsonx-ai" LIBRARY_VERSION: str = get_version() -logger = logging.getLogger(__name__) - # Import after defining constants to avoid circular imports from agentops.instrumentation.ibm_watsonx_ai.instrumentor import IBMWatsonXInstrumentor # noqa: E402 diff --git a/agentops/instrumentation/ibm_watsonx_ai/instrumentor.py b/agentops/instrumentation/ibm_watsonx_ai/instrumentor.py index 6899351e4..dee5d46ce 100644 --- a/agentops/instrumentation/ibm_watsonx_ai/instrumentor.py +++ b/agentops/instrumentation/ibm_watsonx_ai/instrumentor.py @@ -220,7 +220,7 @@ def instrumentation_dependencies(self) -> Collection[str]: Returns: A collection of package specifications required for this instrumentation. """ - return ["ibm-watson-machine-learning >= 1.0.0"] + return ["ibm-watsonx-ai >= 1.3.11"] def _instrument(self, **kwargs): """Instrument the IBM watsonx.ai API. From c2f2c212e4281942432c7a5874f610471750f14f Mon Sep 17 00:00:00 2001 From: Dwij Patel Date: Thu, 1 May 2025 10:21:47 +0530 Subject: [PATCH 4/6] Refactor IBM WatsonX AI instrumentation. --- .../ibm_watsonx_ai/__init__.py | 29 +- .../ibm_watsonx_ai/attributes/__init__.py | 23 +- .../ibm_watsonx_ai/attributes/attributes.py | 215 +++++++------- .../ibm_watsonx_ai/attributes/common.py | 58 +++- .../ibm_watsonx_ai/instrumentor.py | 223 +++----------- .../ibm_watsonx_ai/stream_wrapper.py | 280 ++++++++++++++++++ 6 files changed, 509 insertions(+), 319 deletions(-) create mode 100644 agentops/instrumentation/ibm_watsonx_ai/stream_wrapper.py diff --git a/agentops/instrumentation/ibm_watsonx_ai/__init__.py b/agentops/instrumentation/ibm_watsonx_ai/__init__.py index 34914182c..a1ab5f1f4 100644 --- a/agentops/instrumentation/ibm_watsonx_ai/__init__.py +++ b/agentops/instrumentation/ibm_watsonx_ai/__init__.py @@ -1,37 +1,32 @@ -"""IBM watsonx.ai API instrumentation. +"""IBM WatsonX AI instrumentation for AgentOps. -This module provides instrumentation for the IBM watsonx.ai API, -including text generation, embeddings, and model management. +This package provides instrumentation for IBM's WatsonX AI foundation models, +capturing telemetry for model interactions including completions, chat, and streaming responses. """ import logging from typing import Collection + logger = logging.getLogger(__name__) def get_version() -> str: - """Get the version of the IBM watsonx.ai SDK, or 'unknown' if not found - - Attempts to retrieve the installed version of the IBM watsonx.ai SDK using importlib.metadata. - Falls back to 'unknown' if the version cannot be determined. - - Returns: - The version string of the IBM watsonx.ai SDK or 'unknown' - """ + """Get the version of the IBM watsonx.ai SDK, or 'unknown' if not found.""" try: from importlib.metadata import version - return version("ibm-watson-machine-learning") + return version("ibm-watsonx-ai") except ImportError: - logger.debug("Could not find IBM watsonx.ai SDK version") - return "unknown" + logger.debug("Could not find IBM WatsonX AI SDK version") + return "1.3.11" # Default to known supported version if not found -LIBRARY_NAME = "ibm-watsonx-ai" -LIBRARY_VERSION: str = get_version() +# Library identification for instrumentation +LIBRARY_NAME = "ibm_watsonx_ai" +LIBRARY_VERSION = get_version() # Import after defining constants to avoid circular imports from agentops.instrumentation.ibm_watsonx_ai.instrumentor import IBMWatsonXInstrumentor # noqa: E402 __all__ = [ "LIBRARY_NAME", - "LIBRARY_VERSION", + "LIBRARY_VERSION", "IBMWatsonXInstrumentor", ] \ No newline at end of file diff --git a/agentops/instrumentation/ibm_watsonx_ai/attributes/__init__.py b/agentops/instrumentation/ibm_watsonx_ai/attributes/__init__.py index 263548dd0..29a938bf3 100644 --- a/agentops/instrumentation/ibm_watsonx_ai/attributes/__init__.py +++ b/agentops/instrumentation/ibm_watsonx_ai/attributes/__init__.py @@ -1,12 +1,27 @@ -"""Attributes for IBM watsonx.ai instrumentation. - -This module provides attribute extraction functions for IBM watsonx.ai operations. -""" +"""Attribute extraction utilities for IBM watsonx.ai instrumentation.""" from agentops.instrumentation.ibm_watsonx_ai.attributes.attributes import ( get_generate_attributes, + get_chat_attributes, + get_tokenize_attributes, + get_model_details_attributes +) +from agentops.instrumentation.ibm_watsonx_ai.attributes.common import ( + extract_params_attributes, + convert_params_to_dict, + extract_prompt_from_args, + extract_messages_from_args, + extract_params_from_args ) __all__ = [ "get_generate_attributes", + "get_chat_attributes", + "get_tokenize_attributes", + "get_model_details_attributes", + "extract_params_attributes", + "convert_params_to_dict", + "extract_prompt_from_args", + "extract_messages_from_args", + "extract_params_from_args" ] \ No newline at end of file diff --git a/agentops/instrumentation/ibm_watsonx_ai/attributes/attributes.py b/agentops/instrumentation/ibm_watsonx_ai/attributes/attributes.py index f306d1614..c733dd939 100644 --- a/agentops/instrumentation/ibm_watsonx_ai/attributes/attributes.py +++ b/agentops/instrumentation/ibm_watsonx_ai/attributes/attributes.py @@ -1,47 +1,36 @@ """Attributes for IBM watsonx.ai model instrumentation. -This module provides attribute extraction functions for IBM watsonx.ai model operations, -focusing on token usage recording. +This module provides attribute extraction functions for IBM watsonx.ai model operations. """ from typing import Any, Dict, Optional, Tuple from agentops.instrumentation.common.attributes import AttributeMap from agentops.semconv import SpanAttributes, MessageAttributes -from agentops.instrumentation.ibm_watsonx_ai.attributes.common import extract_params_attributes +from agentops.instrumentation.ibm_watsonx_ai.attributes.common import ( + extract_params_attributes, + convert_params_to_dict, + extract_prompt_from_args, + extract_messages_from_args, + extract_params_from_args +) +from ibm_watsonx_ai.foundation_models.schema import TextGenParameters, TextChatParameters def get_generate_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None) -> AttributeMap: - """Extract token usage attributes from generate method calls. - - Args: - args: Positional arguments passed to the method - kwargs: Keyword arguments passed to the method - return_value: Return value from the method - - Returns: - Dictionary of token usage attributes to set on the span - """ + """Extract token usage attributes from generate method calls.""" attributes = {} - # Extract prompt from args or kwargs - prompt = None - if args and len(args) > 0: - prompt = args[0] - elif kwargs and 'prompt' in kwargs: - prompt = kwargs['prompt'] - + # Extract prompt using helper function + prompt = extract_prompt_from_args(args, kwargs) if prompt: attributes[MessageAttributes.PROMPT_ROLE.format(i=0)] = "user" attributes[MessageAttributes.PROMPT_CONTENT.format(i=0)] = prompt attributes[MessageAttributes.PROMPT_TYPE.format(i=0)] = "text" - # Extract parameters from args or kwargs - params = None - if args and len(args) > 1: - params = args[1] - elif kwargs and 'params' in kwargs: - params = kwargs['params'] - + # Extract parameters using helper functions + params = extract_params_from_args(args, kwargs) if params: - attributes.update(extract_params_attributes(params)) + params_dict = convert_params_to_dict(params) + if params_dict: + attributes.update(extract_params_attributes(params_dict)) # Extract response information if return_value: @@ -73,25 +62,11 @@ def get_generate_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] return attributes def get_tokenize_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None) -> AttributeMap: - """Extract attributes from tokenize method calls. - - Args: - args: Positional arguments passed to the method - kwargs: Keyword arguments passed to the method - return_value: Return value from the method - - Returns: - Dictionary of attributes to set on the span - """ + """Extract attributes from tokenize method calls.""" attributes = {} - # Extract input from args or kwargs - prompt = None - if args and len(args) > 0: - prompt = args[0] - elif kwargs and "prompt" in kwargs: - prompt = kwargs["prompt"] - + # Extract input from args or kwargs using helper function + prompt = extract_prompt_from_args(args, kwargs) if prompt: attributes[MessageAttributes.PROMPT_ROLE.format(i=0)] = "user" attributes[MessageAttributes.PROMPT_CONTENT.format(i=0)] = prompt @@ -109,16 +84,7 @@ def get_tokenize_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] return attributes def get_model_details_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None) -> AttributeMap: - """Extract attributes from get_details method calls. - - Args: - args: Positional arguments passed to the method - kwargs: Keyword arguments passed to the method - return_value: Return value from the method - - Returns: - Dictionary of attributes to set on the span - """ + """Extract attributes from get_details method calls.""" if not isinstance(return_value, dict): return {} @@ -177,65 +143,102 @@ def get_model_details_attributes(args: Optional[Tuple] = None, kwargs: Optional[ return attributes -def get_generate_text_stream_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None) -> AttributeMap: - """Extract token usage attributes from generate_text_stream method calls. - - Args: - args: Positional arguments passed to the method - kwargs: Keyword arguments passed to the method - return_value: Return value from the method - - Returns: - Dictionary of token usage attributes to set on the span - """ +def get_chat_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None) -> AttributeMap: + """Extract attributes from chat method calls.""" attributes = {} - # Extract prompt from args or kwargs - prompt = None - if args and len(args) > 0: - prompt = args[0] - elif kwargs and 'prompt' in kwargs: - prompt = kwargs['prompt'] - - if prompt: - attributes[MessageAttributes.PROMPT_ROLE.format(i=0)] = "user" - attributes[MessageAttributes.PROMPT_CONTENT.format(i=0)] = prompt - attributes[MessageAttributes.PROMPT_TYPE.format(i=0)] = "text" + # Extract messages using helper function + messages = extract_messages_from_args(args, kwargs) + if messages: + # Process each message in the conversation + for i, message in enumerate(messages): + if not isinstance(message, dict): + continue + + # Extract role and content + role = message.get('role', '') + content = message.get('content', []) + + # Handle content which can be a list of different types (text, image_url) + if isinstance(content, list): + # Combine all text content + text_content = [] + image_urls = [] + + for content_item in content: + if isinstance(content_item, dict): + if content_item.get('type') == 'text': + text_content.append(content_item.get('text', '')) + elif content_item.get('type') == 'image_url': + image_url = content_item.get('image_url', {}) + if isinstance(image_url, dict) and 'url' in image_url: + url = image_url['url'] + # Only store URLs that start with http, otherwise use placeholder + if url and isinstance(url, str) and url.startswith(('http://', 'https://')): + image_urls.append(url) + else: + image_urls.append("[IMAGE_PLACEHOLDER]") + + # Set text content if any + if text_content: + attributes[MessageAttributes.PROMPT_CONTENT.format(i=i)] = ' '.join(text_content) + attributes[MessageAttributes.PROMPT_TYPE.format(i=i)] = "text" + attributes[MessageAttributes.PROMPT_ROLE.format(i=i)] = role + + # Set image URLs if any + if image_urls: + attributes[f"ibm.watsonx.chat.message.{i}.images"] = str(image_urls) + else: + # Handle string content + attributes[MessageAttributes.PROMPT_CONTENT.format(i=i)] = str(content) + attributes[MessageAttributes.PROMPT_TYPE.format(i=i)] = "text" + attributes[MessageAttributes.PROMPT_ROLE.format(i=i)] = role - # Extract parameters from args or kwargs - params = None - if args and len(args) > 1: - params = args[1] - elif kwargs and 'params' in kwargs: - params = kwargs['params'] - + # Extract parameters using helper functions + params = extract_params_from_args(args, kwargs) if params: - attributes.update(extract_params_attributes(params)) - - # For streaming responses, we'll update the attributes as we receive chunks + params_dict = convert_params_to_dict(params) + if params_dict: + attributes.update(extract_params_attributes(params_dict)) + + # Extract response information if return_value and isinstance(return_value, dict): # Extract model information if 'model_id' in return_value: attributes[SpanAttributes.LLM_REQUEST_MODEL] = return_value['model_id'] + elif 'model' in return_value: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = return_value['model'] - # Handle results - if 'results' in return_value: - for idx, result in enumerate(return_value['results']): - # Extract completion - if 'generated_text' in result: - attributes[MessageAttributes.COMPLETION_CONTENT.format(i=idx)] = result['generated_text'] - attributes[MessageAttributes.COMPLETION_ROLE.format(i=idx)] = "assistant" - attributes[MessageAttributes.COMPLETION_TYPE.format(i=idx)] = "text" - - # Extract token usage - if 'input_token_count' in result: - attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = result['input_token_count'] - if 'generated_token_count' in result: - attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = result['generated_token_count'] - if 'input_token_count' in result and 'generated_token_count' in result: - attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = result['input_token_count'] + result['generated_token_count'] - - if 'stop_reason' in result: - attributes[SpanAttributes.LLM_RESPONSE_STOP_REASON] = result['stop_reason'] + # Extract completion from choices + if 'choices' in return_value: + for idx, choice in enumerate(return_value['choices']): + if isinstance(choice, dict) and 'message' in choice: + message = choice['message'] + if isinstance(message, dict): + if 'content' in message: + attributes[MessageAttributes.COMPLETION_CONTENT.format(i=idx)] = message['content'] + attributes[MessageAttributes.COMPLETION_ROLE.format(i=idx)] = message.get('role', 'assistant') + attributes[MessageAttributes.COMPLETION_TYPE.format(i=idx)] = "text" + if 'finish_reason' in choice: + attributes[SpanAttributes.LLM_RESPONSE_STOP_REASON] = choice['finish_reason'] + + # Extract token usage + if 'usage' in return_value: + usage = return_value['usage'] + if isinstance(usage, dict): + if 'prompt_tokens' in usage: + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = usage['prompt_tokens'] + if 'completion_tokens' in usage: + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = usage['completion_tokens'] + if 'total_tokens' in usage: + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = usage['total_tokens'] + + # Extract additional metadata + if 'id' in return_value: + attributes['ibm.watsonx.chat.id'] = return_value['id'] + if 'model_version' in return_value: + attributes['ibm.watsonx.model.version'] = return_value['model_version'] + if 'created_at' in return_value: + attributes['ibm.watsonx.chat.created_at'] = return_value['created_at'] return attributes \ No newline at end of file diff --git a/agentops/instrumentation/ibm_watsonx_ai/attributes/common.py b/agentops/instrumentation/ibm_watsonx_ai/attributes/common.py index 9fa9cdf6e..ea8269785 100644 --- a/agentops/instrumentation/ibm_watsonx_ai/attributes/common.py +++ b/agentops/instrumentation/ibm_watsonx_ai/attributes/common.py @@ -1,12 +1,13 @@ """Common utilities and constants for IBM watsonx.ai attribute processing. This module contains shared constants, attribute mappings, and utility functions for processing -trace and span attributes in IBM watsonx.ai instrumentation. It provides the core functionality -for extracting and formatting attributes according to OpenTelemetry semantic conventions. +trace and span attributes in IBM watsonx.ai instrumentation. """ -from typing import Any, Dict +from typing import Any, Dict, Optional, Tuple, List from agentops.instrumentation.common.attributes import AttributeMap -from agentops.semconv import SpanAttributes +from agentops.semconv import SpanAttributes, MessageAttributes +from agentops.logging import logger +from ibm_watsonx_ai.foundation_models.schema import TextGenParameters, TextChatParameters # Mapping of generation parameters to their OpenTelemetry attribute names GENERATION_PARAM_ATTRIBUTES: AttributeMap = { @@ -30,22 +31,52 @@ 'guardrails_pii_params': 'ibm.watsonx.guardrails.pii_params', } -def extract_params_attributes(params: Dict[str, Any]) -> AttributeMap: - """Extract generation parameters from a params dictionary. - - Args: - params: Dictionary of generation parameters +def extract_prompt_from_args(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None) -> Optional[str]: + """Extract prompt from method arguments.""" + if args and len(args) > 0: + return args[0] + elif kwargs and 'prompt' in kwargs: + return kwargs['prompt'] + return None + +def extract_messages_from_args(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None) -> Optional[List[Dict[str, Any]]]: + """Extract messages from method arguments.""" + if args and len(args) > 0: + return args[0] + elif kwargs and 'messages' in kwargs: + return kwargs['messages'] + return None + +def extract_params_from_args(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None) -> Optional[Any]: + """Extract parameters from method arguments.""" + if args and len(args) > 1: + return args[1] + elif kwargs and 'params' in kwargs: + return kwargs['params'] + return None + +def convert_params_to_dict(params: Any) -> Dict[str, Any]: + """Convert parameter objects to dictionaries.""" + if not params: + return {} - Returns: - Dictionary of attributes to set on the span - """ + if isinstance(params, (TextGenParameters, TextChatParameters)): + try: + return params.to_dict() + except Exception as e: + logger.debug(f"Could not convert params object to dict: {e}") + return {} + + return params if isinstance(params, dict) else {} + +def extract_params_attributes(params: Dict[str, Any]) -> AttributeMap: + """Extract generation parameters from a params dictionary.""" attributes = {} # Extract standard generation parameters for param_name, attr_name in GENERATION_PARAM_ATTRIBUTES.items(): if param_name in params: value = params[param_name] - # Convert lists to strings for attributes if isinstance(value, list): value = str(value) attributes[attr_name] = value @@ -54,7 +85,6 @@ def extract_params_attributes(params: Dict[str, Any]) -> AttributeMap: for param_name, attr_name in GUARDRAIL_PARAM_ATTRIBUTES.items(): if param_name in params: value = params[param_name] - # Convert dicts to strings for attributes if isinstance(value, dict): value = str(value) attributes[attr_name] = value diff --git a/agentops/instrumentation/ibm_watsonx_ai/instrumentor.py b/agentops/instrumentation/ibm_watsonx_ai/instrumentor.py index dee5d46ce..4885e2b87 100644 --- a/agentops/instrumentation/ibm_watsonx_ai/instrumentor.py +++ b/agentops/instrumentation/ibm_watsonx_ai/instrumentor.py @@ -3,13 +3,16 @@ This module provides instrumentation for the IBM watsonx.ai API, implementing OpenTelemetry instrumentation for model requests and responses. -We focus on instrumenting the following key endpoints: +Key endpoints instrumented: - Model.generate - Text generation API +- Model.generate_text_stream - Streaming text generation API +- Model.chat - Chat completion API +- Model.chat_stream - Streaming chat completion API - Model.tokenize - Tokenization API - Model.get_details - Model details API """ from typing import List, Optional, Collection -from opentelemetry.trace import get_tracer, SpanKind +from opentelemetry.trace import get_tracer from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.metrics import get_meter from wrapt import wrap_function_wrapper @@ -21,19 +24,16 @@ get_generate_attributes, get_tokenize_attributes, get_model_details_attributes, - get_generate_text_stream_attributes, + get_chat_attributes, ) -from agentops.semconv import ( - SpanAttributes, - Meters, - LLMRequestTypeValues, - CoreAttributes, - MessageAttributes +from agentops.instrumentation.ibm_watsonx_ai.stream_wrapper import ( + generate_text_stream_wrapper, + chat_stream_wrapper ) +from agentops.semconv import Meters # Methods to wrap for instrumentation WRAPPED_METHODS: List[WrapConfig] = [ - # Core generation methods WrapConfig( trace_name="watsonx.generate", package="ibm_watsonx_ai.foundation_models.inference", @@ -41,33 +41,26 @@ method_name="generate", handler=get_generate_attributes, ), - WrapConfig( - trace_name="watsonx.generate_text", - package="ibm_watsonx_ai.foundation_models.inference", - class_name="ModelInference", - method_name="generate_text", - handler=get_generate_attributes, - ), WrapConfig( trace_name="watsonx.generate_text_stream", package="ibm_watsonx_ai.foundation_models.inference", class_name="ModelInference", method_name="generate_text_stream", - handler=get_generate_text_stream_attributes, + handler=None, ), WrapConfig( trace_name="watsonx.chat", package="ibm_watsonx_ai.foundation_models.inference", class_name="ModelInference", method_name="chat", - handler=get_generate_attributes, + handler=get_chat_attributes, ), WrapConfig( trace_name="watsonx.chat_stream", package="ibm_watsonx_ai.foundation_models.inference", class_name="ModelInference", method_name="chat_stream", - handler=get_generate_text_stream_attributes, + handler=None, ), WrapConfig( trace_name="watsonx.tokenize", @@ -85,153 +78,15 @@ ), ] -class TracedStream: - """A wrapper for IBM watsonx.ai's streaming response that adds telemetry. - - This class wraps the original stream to capture metrics about the streaming process, - including token counts, content, and errors. - """ - - def __init__(self, original_stream, span): - """Initialize with the original stream and span. - - Args: - original_stream: The IBM watsonx.ai stream to wrap - span: The OpenTelemetry span to record metrics on - """ - self.original_stream = original_stream - self.span = span - self.completion_content = "" - self.input_tokens = 0 - self.output_tokens = 0 - - def __iter__(self): - """Iterate through chunks, tracking tokens and content. - - Yields: - Chunks from the original stream - """ - try: - for chunk in self.original_stream: - try: - if isinstance(chunk, dict) and 'results' in chunk: - for result in chunk['results']: - if 'generated_text' in result: - self.completion_content += result['generated_text'] - - if 'input_token_count' in result: - self.input_tokens = result['input_token_count'] - - if 'generated_token_count' in result: - self.output_tokens = result['generated_token_count'] - - # Update span attributes with current token counts - self.span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, self.input_tokens) - self.span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, self.output_tokens) - self.span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, self.input_tokens + self.output_tokens) - - # Update completion content - if self.completion_content: - self.span.set_attribute(MessageAttributes.COMPLETION_TYPE.format(i=0), "text") - self.span.set_attribute(MessageAttributes.COMPLETION_ROLE.format(i=0), "assistant") - self.span.set_attribute(MessageAttributes.COMPLETION_CONTENT.format(i=0), self.completion_content) - except Exception as e: - logger.debug(f"Error processing stream chunk: {e}") - - yield chunk - finally: - # End the span when the stream is exhausted - if self.span.is_recording(): - self.span.end() - -def generate_text_stream_wrapper(wrapped, instance, args, kwargs): - """Wrapper for the Model.generate_text_stream method. - - This wrapper creates spans for tracking stream performance and injects - a stream wrapper to capture streaming events. - - Args: - wrapped: The original stream method - instance: The instance the method is bound to - args: Positional arguments to the method - kwargs: Keyword arguments to the method - - Returns: - A wrapped stream that captures telemetry data - """ - tracer = get_tracer(LIBRARY_NAME, LIBRARY_VERSION) - span = tracer.start_span( - "watsonx.generate_text_stream", - kind=SpanKind.CLIENT, - attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.COMPLETION.value}, - ) - - # Extract prompt and parameters - prompt = None - if args and len(args) > 0: - prompt = args[0] - elif kwargs and 'prompt' in kwargs: - prompt = kwargs['prompt'] - - if prompt: - span.set_attribute(MessageAttributes.PROMPT_ROLE.format(i=0), "user") - span.set_attribute(MessageAttributes.PROMPT_CONTENT.format(i=0), prompt) - span.set_attribute(MessageAttributes.PROMPT_TYPE.format(i=0), "text") - - # Extract parameters from args or kwargs - params = None - if args and len(args) > 1: - params = args[1] - elif kwargs and 'params' in kwargs: - params = kwargs['params'] - - if params: - # Use common attribute extraction - from agentops.instrumentation.ibm_watsonx_ai.attributes.common import extract_params_attributes - span_attributes = extract_params_attributes(params) - for key, value in span_attributes.items(): - span.set_attribute(key, value) - - span.set_attribute(SpanAttributes.LLM_REQUEST_STREAMING, True) - - try: - stream = wrapped(*args, **kwargs) - return TracedStream(stream, span) - except Exception as e: - span.record_exception(e) - span.set_attribute(CoreAttributes.ERROR_MESSAGE, str(e)) - span.set_attribute(CoreAttributes.ERROR_TYPE, e.__class__.__name__) - span.end() - raise - class IBMWatsonXInstrumentor(BaseInstrumentor): - """An instrumentor for IBM watsonx.ai API. - - This class provides instrumentation for IBM's watsonx.ai API by wrapping key methods - in the client library and capturing telemetry data. It supports both synchronous and - asynchronous API calls. - - It captures metrics including token usage, operation duration, and exceptions. - """ + """An instrumentor for IBM watsonx.ai API.""" def instrumentation_dependencies(self) -> Collection[str]: - """Return packages required for instrumentation. - - Returns: - A collection of package specifications required for this instrumentation. - """ + """Return packages required for instrumentation.""" return ["ibm-watsonx-ai >= 1.3.11"] def _instrument(self, **kwargs): - """Instrument the IBM watsonx.ai API. - - This method wraps the key methods in the IBM watsonx.ai client to capture - telemetry data for API calls. It sets up tracers, meters, and wraps the appropriate - methods for instrumentation. - - Args: - **kwargs: Configuration options for instrumentation. - """ + """Instrument the IBM watsonx.ai API.""" tracer_provider = kwargs.get("tracer_provider") tracer = get_tracer(LIBRARY_NAME, LIBRARY_VERSION, tracer_provider) @@ -259,27 +114,39 @@ def _instrument(self, **kwargs): # Standard method wrapping approach for regular methods for wrap_config in WRAPPED_METHODS: try: - if wrap_config.method_name == "generate_text_stream": - wrap_function_wrapper( - wrap_config.package, - f"{wrap_config.class_name}.{wrap_config.method_name}", - generate_text_stream_wrapper, - ) - else: - wrap(wrap_config, tracer) + # Skip stream methods handled by dedicated wrappers + if wrap_config.method_name in ["generate_text_stream", "chat_stream"]: + continue + wrap(wrap_config, tracer) logger.debug(f"Wrapped {wrap_config.package}.{wrap_config.class_name}.{wrap_config.method_name}") except (AttributeError, ModuleNotFoundError) as e: logger.debug(f"Could not wrap {wrap_config.package}.{wrap_config.class_name}.{wrap_config.method_name}: {e}") - - def _uninstrument(self, **kwargs): - """Remove instrumentation from IBM watsonx.ai API. - This method unwraps all methods that were wrapped during instrumentation, - restoring the original behavior of the IBM watsonx.ai API. - - Args: - **kwargs: Configuration options for uninstrumentation. - """ + # Dedicated wrappers for stream methods + try: + generate_text_stream_config = next(wc for wc in WRAPPED_METHODS if wc.method_name == "generate_text_stream") + wrap_function_wrapper( + generate_text_stream_config.package, + f"{generate_text_stream_config.class_name}.{generate_text_stream_config.method_name}", + generate_text_stream_wrapper, + ) + logger.debug(f"Wrapped {generate_text_stream_config.package}.{generate_text_stream_config.class_name}.{generate_text_stream_config.method_name} with dedicated wrapper") + except (StopIteration, AttributeError, ModuleNotFoundError) as e: + logger.debug(f"Could not wrap generate_text_stream with dedicated wrapper: {e}") + + try: + chat_stream_config = next(wc for wc in WRAPPED_METHODS if wc.method_name == "chat_stream") + wrap_function_wrapper( + chat_stream_config.package, + f"{chat_stream_config.class_name}.{chat_stream_config.method_name}", + chat_stream_wrapper, + ) + logger.debug(f"Wrapped {chat_stream_config.package}.{chat_stream_config.class_name}.{chat_stream_config.method_name} with dedicated wrapper") + except (StopIteration, AttributeError, ModuleNotFoundError) as e: + logger.debug(f"Could not wrap chat_stream with dedicated wrapper: {e}") + + def _uninstrument(self, **kwargs): + """Remove instrumentation from IBM watsonx.ai API.""" # Unwrap standard methods for wrap_config in WRAPPED_METHODS: try: diff --git a/agentops/instrumentation/ibm_watsonx_ai/stream_wrapper.py b/agentops/instrumentation/ibm_watsonx_ai/stream_wrapper.py new file mode 100644 index 000000000..382eeff48 --- /dev/null +++ b/agentops/instrumentation/ibm_watsonx_ai/stream_wrapper.py @@ -0,0 +1,280 @@ +"""Stream wrappers for IBM watsonx.ai. + +This module provides stream wrapper classes and functions for IBM watsonx.ai's streaming +responses, implementing telemetry tracking for streaming content. +""" +import json +from opentelemetry.trace import get_tracer, SpanKind +from agentops.logging import logger +from agentops.instrumentation.ibm_watsonx_ai import LIBRARY_NAME, LIBRARY_VERSION +from agentops.instrumentation.ibm_watsonx_ai.attributes.common import ( + extract_params_attributes, + convert_params_to_dict, + extract_prompt_from_args, + extract_messages_from_args, + extract_params_from_args +) +from agentops.semconv import ( + SpanAttributes, + LLMRequestTypeValues, + CoreAttributes, + MessageAttributes +) + +class TracedStream: + """A wrapper for IBM watsonx.ai's streaming response that adds telemetry.""" + + def __init__(self, original_stream, span): + """Initialize with the original stream and span.""" + self.original_stream = original_stream + self.span = span + self.completion_content = "" + self.input_tokens = 0 + self.output_tokens = 0 + self.model_id = None + + def __iter__(self): + """Iterate through chunks, tracking content and attempting to extract token data.""" + try: + for yielded_chunk in self.original_stream: + # Initialize data for this chunk + generated_text_chunk = "" + input_token_chunk = 0 + output_token_chunk = 0 + model_id_chunk = None + + try: + # Attempt to access internal frame local variable 'chunk' for full data + internal_chunk_data_str = getattr(self.original_stream, 'gi_frame', {}).f_locals.get('chunk') + + if isinstance(internal_chunk_data_str, str) and internal_chunk_data_str.startswith('data: '): + try: + # Remove 'data: ' prefix and parse JSON + json_payload_str = internal_chunk_data_str[len('data: '):] + json_payload = json.loads(json_payload_str) + + # Determine if it's generate_text_stream or chat_stream structure + if 'results' in json_payload: # Likely generate_text_stream + model_id_chunk = json_payload.get('model_id') + if isinstance(json_payload['results'], list): + for result in json_payload['results']: + if isinstance(result, dict): + # Use yielded_chunk for generated_text as internal one might be partial + if isinstance(yielded_chunk, str): + generated_text_chunk = yielded_chunk + # Use the first non-zero input token count found + if self.input_tokens == 0 and result.get('input_token_count', 0) > 0: + self.input_tokens = result.get('input_token_count', 0) + input_token_chunk = self.input_tokens + # Accumulate output tokens + self.output_tokens += result.get('generated_token_count', 0) + output_token_chunk = result.get('generated_token_count', 0) + + elif 'choices' in json_payload: # Likely chat_stream + # model_id might be at top level or within choices in other APIs, check top first + model_id_chunk = json_payload.get('model_id') or json_payload.get('model') + if isinstance(json_payload['choices'], list) and json_payload['choices']: + choice = json_payload['choices'][0] + if isinstance(choice, dict): + delta = choice.get('delta', {}) + if isinstance(delta, dict): + generated_text_chunk = delta.get('content', '') + + # Check for finish reason to potentially get final usage + finish_reason = choice.get('finish_reason') + if finish_reason == 'stop': + try: + final_response_data = getattr(self.original_stream, 'gi_frame', {}).f_locals.get('parsed_response') + if isinstance(final_response_data, dict) and 'usage' in final_response_data: + usage = final_response_data['usage'] + if isinstance(usage, dict): + # Update token counts with final values + self.input_tokens = usage.get('prompt_tokens', self.input_tokens) + self.output_tokens = usage.get('completion_tokens', self.output_tokens) + # Update span immediately with final counts + if self.input_tokens is not None: + self.span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, self.input_tokens) + if self.output_tokens is not None: + self.span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, self.output_tokens) + if self.input_tokens is not None and self.output_tokens is not None: + self.span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, self.input_tokens + self.output_tokens) + + except AttributeError as final_attr_err: + logger.debug(f"Could not access internal generator state for final response: {final_attr_err}") + except Exception as final_err: + logger.debug(f"Error accessing or processing final response data: {final_err}") + + except json.JSONDecodeError as json_err: + logger.debug(f"Failed to parse JSON from internal chunk data: {json_err}") + # Fallback to using the yielded chunk directly + if isinstance(yielded_chunk, dict): # chat_stream yields dicts + if 'choices' in yielded_chunk and yielded_chunk['choices']: + delta = yielded_chunk['choices'][0].get('delta', {}) + generated_text_chunk = delta.get('content', '') + elif isinstance(yielded_chunk, str): # generate_text_stream yields strings + generated_text_chunk = yielded_chunk + except Exception as parse_err: + logger.debug(f"Error processing internal chunk data: {parse_err}") + if isinstance(yielded_chunk, dict): # Fallback for chat + if 'choices' in yielded_chunk and yielded_chunk['choices']: + delta = yielded_chunk['choices'][0].get('delta', {}) + generated_text_chunk = delta.get('content', '') + elif isinstance(yielded_chunk, str): # Fallback for generate + generated_text_chunk = yielded_chunk + else: + # If internal data not found or not in expected format, use yielded chunk + if isinstance(yielded_chunk, dict): # chat_stream yields dicts + if 'choices' in yielded_chunk and yielded_chunk['choices']: + delta = yielded_chunk['choices'][0].get('delta', {}) + generated_text_chunk = delta.get('content', '') + elif isinstance(yielded_chunk, str): # generate_text_stream yields strings + generated_text_chunk = yielded_chunk + + except AttributeError as attr_err: + logger.debug(f"Could not access internal generator state (gi_frame.f_locals): {attr_err}") + if isinstance(yielded_chunk, dict): # Fallback for chat + if 'choices' in yielded_chunk and yielded_chunk['choices']: + delta = yielded_chunk['choices'][0].get('delta', {}) + generated_text_chunk = delta.get('content', '') + elif isinstance(yielded_chunk, str): # Fallback for generate + generated_text_chunk = yielded_chunk + except Exception as e: + logger.debug(f"Error accessing or processing internal generator state: {e}") + if isinstance(yielded_chunk, dict): # Fallback for chat + if 'choices' in yielded_chunk and yielded_chunk['choices']: + delta = yielded_chunk['choices'][0].get('delta', {}) + generated_text_chunk = delta.get('content', '') + elif isinstance(yielded_chunk, str): # Fallback for generate + generated_text_chunk = yielded_chunk + + # Accumulate completion content regardless of where it came from + self.completion_content += generated_text_chunk + + # Update span attributes within the loop if data is available + if model_id_chunk and not self.model_id: + self.model_id = model_id_chunk + self.span.set_attribute(SpanAttributes.LLM_REQUEST_MODEL, self.model_id) + + if self.input_tokens is not None: + self.span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, self.input_tokens) + if self.output_tokens is not None: + self.span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, self.output_tokens) + if self.input_tokens is not None and self.output_tokens is not None: + self.span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, self.input_tokens + self.output_tokens) + + # Yield the original chunk that the user expects + yield yielded_chunk + finally: + # Update final completion content attribute after stream finishes + if self.completion_content: + self.span.set_attribute(MessageAttributes.COMPLETION_TYPE.format(i=0), "text") + self.span.set_attribute(MessageAttributes.COMPLETION_ROLE.format(i=0), "assistant") + self.span.set_attribute(MessageAttributes.COMPLETION_CONTENT.format(i=0), self.completion_content) + + # Final update for token counts + if self.input_tokens is not None: + self.span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, self.input_tokens) + if self.output_tokens is not None: + self.span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, self.output_tokens) + if self.input_tokens is not None and self.output_tokens is not None: + self.span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, self.input_tokens + self.output_tokens) + + # End the span when the stream is exhausted + if self.span.is_recording(): + self.span.end() + +def generate_text_stream_wrapper(wrapped, instance, args, kwargs): + """Wrapper for the Model.generate_text_stream method.""" + tracer = get_tracer(LIBRARY_NAME, LIBRARY_VERSION) + span = tracer.start_span( + "watsonx.generate_text_stream", + kind=SpanKind.CLIENT, + attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.COMPLETION.value}, + ) + + # Extract prompt using helper function + prompt = extract_prompt_from_args(args, kwargs) + if prompt: + span.set_attribute(MessageAttributes.PROMPT_ROLE.format(i=0), "user") + span.set_attribute(MessageAttributes.PROMPT_CONTENT.format(i=0), prompt) + span.set_attribute(MessageAttributes.PROMPT_TYPE.format(i=0), "text") + + # Extract parameters using helper function + params = extract_params_from_args(args, kwargs) + if params: + params_dict = convert_params_to_dict(params) + if params_dict: + try: + span_attributes = extract_params_attributes(params_dict) + for key, value in span_attributes.items(): + span.set_attribute(key, value) + except Exception as e: + logger.debug(f"Error extracting attributes from params dict: {e}") + + span.set_attribute(SpanAttributes.LLM_REQUEST_STREAMING, True) + + try: + stream = wrapped(*args, **kwargs) + return TracedStream(stream, span) + except Exception as e: + span.record_exception(e) + span.set_attribute(CoreAttributes.ERROR_MESSAGE, str(e)) + span.set_attribute(CoreAttributes.ERROR_TYPE, e.__class__.__name__) + span.end() + raise + +def chat_stream_wrapper(wrapped, instance, args, kwargs): + """Wrapper for the Model.chat_stream method.""" + tracer = get_tracer(LIBRARY_NAME, LIBRARY_VERSION) + span = tracer.start_span( + "watsonx.chat_stream", + kind=SpanKind.CLIENT, + attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value}, + ) + + # Extract messages using helper function + messages = extract_messages_from_args(args, kwargs) + if messages and isinstance(messages, list): + for i, message in enumerate(messages): + if isinstance(message, dict): + role = message.get('role') + content = message.get('content') + # Handle complex content (list of dicts) vs simple string + if isinstance(content, list): + text_content = [] + for item in content: + if isinstance(item, dict) and item.get('type') == 'text': + text_content.append(item.get('text', '')) + content_str = ' '.join(text_content) + else: + content_str = str(content) + + if role: + span.set_attribute(MessageAttributes.PROMPT_ROLE.format(i=i), role) + if content_str: + span.set_attribute(MessageAttributes.PROMPT_CONTENT.format(i=i), content_str) + span.set_attribute(MessageAttributes.PROMPT_TYPE.format(i=i), "text") + + # Extract parameters using helper function + params = extract_params_from_args(args, kwargs) + if params: + params_dict = convert_params_to_dict(params) + if params_dict: + try: + span_attributes = extract_params_attributes(params_dict) + for key, value in span_attributes.items(): + span.set_attribute(key, value) + except Exception as e: + logger.debug(f"Error extracting attributes from params dict: {e}") + + span.set_attribute(SpanAttributes.LLM_REQUEST_STREAMING, True) + + try: + stream = wrapped(*args, **kwargs) + return TracedStream(stream, span) + except Exception as e: + span.record_exception(e) + span.set_attribute(CoreAttributes.ERROR_MESSAGE, str(e)) + span.set_attribute(CoreAttributes.ERROR_TYPE, e.__class__.__name__) + span.end() + raise \ No newline at end of file From e22f1d90a2bf7f1abb5eb0025bacbaeba28c9a77 Mon Sep 17 00:00:00 2001 From: Dwij Patel Date: Sat, 3 May 2025 03:30:46 +0530 Subject: [PATCH 5/6] Fix --- agentops/instrumentation/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index b627d6aa3..02c3ea45e 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -82,6 +82,7 @@ def get_instance(self) -> BaseInstrumentor: class_name="IBMWatsonXInstrumentor", provider_import_name="ibm_watsonx_ai", ), + InstrumentorLoader( module_name="agentops.instrumentation.ag2", class_name="AG2Instrumentor", provider_import_name="autogen", From 64a50eaa4b961a81a2751bbef4ee1f97a95375c5 Mon Sep 17 00:00:00 2001 From: Dwij Patel Date: Sat, 3 May 2025 05:01:45 +0530 Subject: [PATCH 6/6] Added Exmaples --- examples/watsonx_examples/README.md | 65 +++++ .../watsonx_examples/watsonx-streaming.ipynb | 250 +++++++++++++++++ .../watsonx_examples/watsonx-text-chat.ipynb | 195 +++++++++++++ .../watsonx-tokeniation-model.ipynb | 256 ++++++++++++++++++ 4 files changed, 766 insertions(+) create mode 100644 examples/watsonx_examples/README.md create mode 100644 examples/watsonx_examples/watsonx-streaming.ipynb create mode 100644 examples/watsonx_examples/watsonx-text-chat.ipynb create mode 100644 examples/watsonx_examples/watsonx-tokeniation-model.ipynb diff --git a/examples/watsonx_examples/README.md b/examples/watsonx_examples/README.md new file mode 100644 index 000000000..85749981f --- /dev/null +++ b/examples/watsonx_examples/README.md @@ -0,0 +1,65 @@ +# IBM Watson.x AI Examples with AgentOps + +This directory contains examples of using IBM Watson.x AI with AgentOps instrumentation for various natural language processing tasks. + +## Prerequisites + +- IBM Watson.x AI account with API key +- Python 3.10+ +- Install required dependencies: + ``` + pip install agentops ibm-watsonx-ai python-dotenv + ``` + +## Environment Setup + +Create a `.env` file in your project root with the following values: + +``` +WATSONX_URL=https://your-region.ml.cloud.ibm.com +WATSONX_API_KEY=your-api-key-here +WATSONX_PROJECT_ID=your-project-id-here +``` + +## Examples + +### 1. Basic Text Generation and Chat Completion + +File: `watsonx-example-text-and-chat.ipynb` + +This notebook demonstrates: +- Basic text generation with IBM Watson.x AI +- Chat completion with system and user messages +- Multiple examples of chat interactions + +### 2. Streaming Generation + +File: `watsonx-example-streaming.ipynb` + +This notebook demonstrates: +- Streaming text generation +- Streaming chat completion +- Processing streaming responses + +### 3. Tokenization and Model Details + +File: `watsonx-example-tokenization-model.ipynb` + +This notebook demonstrates: +- Tokenizing text with IBM Watson.x AI models +- Retrieving model details +- Comparing tokenization between different models + +## IBM Watson.x AI Models + +The examples use the following IBM Watson.x AI models: +- `google/flan-ul2`: A text generation model +- `meta-llama/llama-3-3-70b-instruct`: A chat completion model + +You can explore other available models through the IBM Watson.x platform. + +## AgentOps Integration + +These examples show how to use AgentOps to monitor and analyze your AI applications. AgentOps automatically instruments your IBM Watson.x AI calls to provide insights into performance, usage patterns, and model behavior. + +To learn more about AgentOps, visit [https://www.agentops.ai](https://www.agentops.ai) \ No newline at end of file diff --git a/examples/watsonx_examples/watsonx-streaming.ipynb b/examples/watsonx_examples/watsonx-streaming.ipynb new file mode 100644 index 000000000..8f6d2bf70 --- /dev/null +++ b/examples/watsonx_examples/watsonx-streaming.ipynb @@ -0,0 +1,250 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# IBM Watson.x AI Streaming with AgentOps\n", + "\n", + "This notebook demonstrates how to use IBM Watson.x AI for streaming text generation and streaming chat completion with AgentOps instrumentation." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup\n", + "\n", + "First, let's import the necessary libraries and initialize AgentOps:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import agentops\n", + "from ibm_watsonx_ai import Credentials\n", + "from ibm_watsonx_ai.foundation_models import ModelInference\n", + "from dotenv import load_dotenv\n", + "import os\n", + "\n", + "# Load environment variables\n", + "load_dotenv()\n", + "\n", + "# Initialize AgentOps\n", + "agentops.init()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Initialize IBM Watson.x AI Credentials\n", + "\n", + "To use IBM Watson.x AI, you need to set up your credentials and project ID." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "# Initialize credentials - replace with your own API key\n", + "# Best practice: Store API keys in environment variables\n", + "credentials = Credentials(\n", + " url=os.getenv(\"WATSONX_URL\", \"https://eu-de.ml.cloud.ibm.com\"),\n", + " api_key=os.getenv(\"WATSONX_API_KEY\", \"your-api-key-here\"),\n", + ")\n", + "\n", + "# Project ID for your IBM Watson.x project\n", + "project_id = os.getenv(\"WATSONX_PROJECT_ID\", \"your-project-id-here\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Initialize Models\n", + "\n", + "Let's initialize models for our streaming examples:" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "# Initialize text generation model\n", + "gen_model = ModelInference(\n", + " model_id=\"google/flan-ul2\",\n", + " credentials=credentials,\n", + " project_id=project_id\n", + ")\n", + "\n", + "# Initialize chat model\n", + "chat_model = ModelInference(\n", + " model_id=\"meta-llama/llama-3-3-70b-instruct\",\n", + " credentials=credentials,\n", + " project_id=project_id\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Streaming Text Generation\n", + "\n", + "Let's use IBM Watson.x AI to generate streaming text:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Streaming text generation\n", + "prompt = \"List 3 benefits of machine learning:\"\n", + "stream_response = gen_model.generate_text_stream(prompt)\n", + "\n", + "print(\"Streaming Response:\")\n", + "full_stream_response = \"\"\n", + "for chunk in stream_response:\n", + " if isinstance(chunk, str):\n", + " print(chunk, end=\"\", flush=True)\n", + " full_stream_response += chunk\n", + "print(\"\\n\\nComplete Response:\")\n", + "print(full_stream_response)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Streaming Chat Completion\n", + "\n", + "Now, let's try streaming chat completion:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Format messages for chat\n", + "chat_stream_messages = [\n", + " {\"role\": \"system\", \"content\": \"You are a concise assistant.\"},\n", + " {\"role\": \"user\", \"content\": \"Explain the concept of photosynthesis in one sentence.\"}\n", + "]\n", + "\n", + "# Get streaming chat response\n", + "chat_stream_response_gen = chat_model.chat_stream(messages=chat_stream_messages)\n", + "\n", + "print(\"Chat Stream Response:\")\n", + "full_chat_stream_response = \"\"\n", + "for chunk in chat_stream_response_gen:\n", + " try:\n", + " # Check structure based on SDK docstring example\n", + " if chunk and 'choices' in chunk and chunk['choices']:\n", + " delta = chunk['choices'][0].get('delta', {})\n", + " content_chunk = delta.get('content')\n", + " if content_chunk:\n", + " print(content_chunk, end=\"\", flush=True)\n", + " full_chat_stream_response += content_chunk\n", + " except Exception as e:\n", + " print(f\"Error processing chat stream chunk: {e}, Chunk: {chunk}\")\n", + "\n", + "print(\"\\n\\nComplete Chat Response:\")\n", + "print(full_chat_stream_response)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Another Streaming Chat Example\n", + "\n", + "Let's try another example with a more complex query:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# New chat messages for streaming\n", + "chat_stream_messages = [\n", + " {\"role\": \"system\", \"content\": \"You are a helpful assistant that provides step-by-step explanations.\"},\n", + " {\"role\": \"user\", \"content\": \"Explain how to make a simple chocolate cake.\"}\n", + "]\n", + "\n", + "# Get streaming chat response\n", + "chat_stream_response_gen = chat_model.chat_stream(messages=chat_stream_messages)\n", + "\n", + "print(\"Chat Stream Response:\")\n", + "full_chat_stream_response = \"\"\n", + "for chunk in chat_stream_response_gen:\n", + " try:\n", + " if chunk and 'choices' in chunk and chunk['choices']:\n", + " delta = chunk['choices'][0].get('delta', {})\n", + " content_chunk = delta.get('content')\n", + " if content_chunk:\n", + " print(content_chunk, end=\"\", flush=True)\n", + " full_chat_stream_response += content_chunk\n", + " except Exception as e:\n", + " print(f\"Error processing chat stream chunk: {e}, Chunk: {chunk}\")\n", + "\n", + "print(\"\\n\\nComplete Chat Response:\")\n", + "print(full_chat_stream_response)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Clean Up\n", + "\n", + "Finally, let's close the persistent connection with the models:" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "# Close connections\n", + "gen_model.close_persistent_connection()\n", + "chat_model.close_persistent_connection()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.12" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/examples/watsonx_examples/watsonx-text-chat.ipynb b/examples/watsonx_examples/watsonx-text-chat.ipynb new file mode 100644 index 000000000..665dd5ec6 --- /dev/null +++ b/examples/watsonx_examples/watsonx-text-chat.ipynb @@ -0,0 +1,195 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# IBM Watson.x AI Text Generation and Chat with AgentOps\n", + "\n", + "This notebook demonstrates how to use IBM Watson.x AI for basic text generation and chat completion tasks with AgentOps instrumentation." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup\n", + "\n", + "First, let's import the necessary libraries and initialize AgentOps:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import agentops\n", + "from ibm_watsonx_ai import Credentials\n", + "from ibm_watsonx_ai.foundation_models import ModelInference\n", + "from dotenv import load_dotenv\n", + "import os\n", + "\n", + "# Load environment variables\n", + "load_dotenv()\n", + "\n", + "# Initialize AgentOps\n", + "agentops.init()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Initialize IBM Watson.x AI Credentials\n", + "\n", + "To use IBM Watson.x AI, you need to set up your credentials and project ID." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "# Initialize credentials - replace with your own API key\n", + "# Best practice: Store API keys in environment variables\n", + "credentials = Credentials(\n", + " url=os.getenv(\"WATSONX_URL\", \"https://eu-de.ml.cloud.ibm.com\"),\n", + " api_key=os.getenv(\"WATSONX_API_KEY\", \"your-api-key-here\"),\n", + ")\n", + "\n", + "# Project ID for your IBM Watson.x project\n", + "project_id = os.getenv(\"WATSONX_PROJECT_ID\", \"your-project-id-here\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Text Generation\n", + "\n", + "Let's use IBM Watson.x AI to generate text based on a prompt:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Initialize text generation model\n", + "gen_model = ModelInference(\n", + " model_id=\"google/flan-ul2\",\n", + " credentials=credentials,\n", + " project_id=project_id\n", + ")\n", + "\n", + "# Generate text with a prompt\n", + "prompt = \"Write a short poem about artificial intelligence:\"\n", + "response = gen_model.generate_text(prompt)\n", + "print(f\"Generated Text:\\n{response}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Chat Completion\n", + "\n", + "Now, let's use a different model for chat completion:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Initialize chat model\n", + "chat_model = ModelInference(\n", + " model_id=\"meta-llama/llama-3-3-70b-instruct\",\n", + " credentials=credentials,\n", + " project_id=project_id\n", + ")\n", + "\n", + "# Format messages for chat\n", + "messages = [\n", + " {\"role\": \"system\", \"content\": \"You are a helpful AI assistant.\"},\n", + " {\"role\": \"user\", \"content\": \"What are the three laws of robotics?\"}\n", + "]\n", + "\n", + "# Get chat response\n", + "chat_response = chat_model.chat(messages)\n", + "print(f\"Chat Response:\\n{chat_response['choices'][0]['message']['content']}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Another Chat Example\n", + "\n", + "Let's try a different type of query:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# New chat messages\n", + "messages = [\n", + " {\"role\": \"system\", \"content\": \"You are an expert in machine learning.\"},\n", + " {\"role\": \"user\", \"content\": \"Explain the difference between supervised and unsupervised learning in simple terms.\"}\n", + "]\n", + "\n", + "# Get chat response\n", + "chat_response = chat_model.chat(messages)\n", + "print(f\"Chat Response:\\n{chat_response['choices'][0]['message']['content']}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Clean Up\n", + "\n", + "Finally, let's close the persistent connection with the models:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "# Close connections\n", + "gen_model.close_persistent_connection()\n", + "chat_model.close_persistent_connection()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.12" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/examples/watsonx_examples/watsonx-tokeniation-model.ipynb b/examples/watsonx_examples/watsonx-tokeniation-model.ipynb new file mode 100644 index 000000000..d0d181af8 --- /dev/null +++ b/examples/watsonx_examples/watsonx-tokeniation-model.ipynb @@ -0,0 +1,256 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# IBM Watson.x AI Tokenization and Model Details with AgentOps\n", + "\n", + "This notebook demonstrates how to use IBM Watson.x AI for tokenization and retrieving model details with AgentOps instrumentation." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup\n", + "\n", + "First, let's import the necessary libraries and initialize AgentOps:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import agentops\n", + "from ibm_watsonx_ai import Credentials\n", + "from ibm_watsonx_ai.foundation_models import ModelInference\n", + "from dotenv import load_dotenv\n", + "import os\n", + "\n", + "# Load environment variables\n", + "load_dotenv()\n", + "\n", + "# Initialize AgentOps\n", + "agentops.init()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Initialize IBM Watson.x AI Credentials\n", + "\n", + "To use IBM Watson.x AI, you need to set up your credentials and project ID." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "# Initialize credentials - replace with your own API key\n", + "# Best practice: Store API keys in environment variables\n", + "credentials = Credentials(\n", + " url=os.getenv(\"WATSONX_URL\", \"https://eu-de.ml.cloud.ibm.com\"),\n", + " api_key=os.getenv(\"WATSONX_API_KEY\", \"your-api-key-here\"),\n", + ")\n", + "\n", + "# Project ID for your IBM Watson.x project\n", + "project_id = os.getenv(\"WATSONX_PROJECT_ID\", \"your-project-id-here\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Initialize Model\n", + "\n", + "Let's initialize a model to work with:" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "# Initialize model\n", + "model = ModelInference(\n", + " model_id=\"google/flan-ul2\",\n", + " credentials=credentials,\n", + " project_id=project_id\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Tokenization\n", + "\n", + "Let's use IBM Watson.x AI to tokenize text:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Example text to tokenize\n", + "text_to_tokenize = \"Hello, how are you today?\"\n", + "tokens = model.tokenize(text_to_tokenize)\n", + "print(f\"Tokenization Result:\\n{tokens}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Tokenizing Longer Text\n", + "\n", + "Let's try tokenizing a longer piece of text:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Longer text to tokenize\n", + "longer_text = \"\"\"Artificial intelligence (AI) is intelligence demonstrated by machines, \n", + "as opposed to intelligence displayed by humans or other animals. \n", + "Example tasks in which this is done include speech recognition, computer vision, \n", + "translation between languages, and decision-making.\"\"\"\n", + "\n", + "tokens = model.tokenize(longer_text)\n", + "print(f\"Tokens: {tokens}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Model Details\n", + "\n", + "Let's retrieve and display details about the model we're using:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get model details\n", + "model_details = model.get_details()\n", + "print(f\"Model Details:\\n{model_details}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Exploring a Different Model\n", + "\n", + "Let's initialize a different model and get its details:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Initialize another model\n", + "llama_model = ModelInference(\n", + " model_id=\"meta-llama/llama-3-3-70b-instruct\",\n", + " credentials=credentials,\n", + " project_id=project_id\n", + ")\n", + "\n", + "# Get details of the new model\n", + "llama_model_details = llama_model.get_details()\n", + "print(f\"Llama Model Details:\\n{llama_model_details}\")\n", + "\n", + "# Example text tokenization with the new model\n", + "example_text = \"Let's see how this model tokenizes text.\"\n", + "llama_tokens = llama_model.tokenize(example_text)\n", + "print(f\"\\nTokenization with Llama model:\\n{llama_tokens}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Comparing Tokenization Between Models\n", + "\n", + "Let's compare how different models tokenize the same text:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Text to compare tokenization\n", + "comparison_text = \"The quick brown fox jumps over the lazy dog.\"\n", + "\n", + "# Tokenize with first model\n", + "flan_tokens = model.tokenize(comparison_text)\n", + "print(f\"FLAN-UL2 tokens: {flan_tokens}\")\n", + "\n", + "# Tokenize with second model\n", + "llama_tokens = llama_model.tokenize(comparison_text)\n", + "print(f\"\\nLlama tokens: {llama_tokens}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Clean Up\n", + "\n", + "Finally, let's close the persistent connection with the models:" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "# Close connections\n", + "model.close_persistent_connection()\n", + "llama_model.close_persistent_connection()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.12" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}