diff --git a/util/opentelemetry-util-genai/CHANGELOG-loongsuite.md b/util/opentelemetry-util-genai/CHANGELOG-loongsuite.md index 0929fe9e9..569b33e84 100644 --- a/util/opentelemetry-util-genai/CHANGELOG-loongsuite.md +++ b/util/opentelemetry-util-genai/CHANGELOG-loongsuite.md @@ -16,3 +16,5 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add `gen_ai.usage.total_tokens` attribute for LLM, Agent, and Embedding operations. ([#108](https://github.com/alibaba/loongsuite-python-agent/pull/108)) - Add `gen_ai.response.time_to_first_token` attribute for LLM operations. ([#113](https://github.com/alibaba/loongsuite-python-agent/pull/113)) + +- Enhance multimodal pre-upload pipeline with Data URI and local path support, add AgentInvocation multimodal data handling, introduce configurable pre-upload hooks and uploader entry points, add graceful shutdown processor for GenAI components, improve multimodal metadata extraction and docs. ([#119](https://github.com/alibaba/loongsuite-python-agent/pull/119)) \ No newline at end of file diff --git a/util/opentelemetry-util-genai/README-loongsuite.rst b/util/opentelemetry-util-genai/README-loongsuite.rst index e49ed4e7a..ba4caad5f 100644 --- a/util/opentelemetry-util-genai/README-loongsuite.rst +++ b/util/opentelemetry-util-genai/README-loongsuite.rst @@ -8,6 +8,7 @@ OpenTelemetry Util for GenAI - LoongSuite 扩展 LoongSuite 扩展为 OpenTelemetry GenAI Util 包提供了额外的 Generative AI 操作支持,包括: +- **llm**: 增强了多模态数据处理,支持异步上传图片、音频、视频等多模态内容到配置的存储后端 - **invoke_agent**: Agent 调用操作,支持消息、工具定义和系统指令 - **create_agent**: Agent 创建操作 - **embedding**: 向量嵌入生成操作 @@ -46,6 +47,82 @@ LoongSuite 扩展为 OpenTelemetry GenAI Util 包提供了额外的 Generative A - ``true``: 启用事件发出(当内容捕获模式为 ``EVENT_ONLY`` 或 ``SPAN_AND_EVENT`` 时) - ``false``: 禁用事件发出(默认) +多模态上传控制 +~~~~~~~~~~~~~~ + +多模态内容(图片/音频/视频)通常体积较大,如果直接保留在 span/event 中,会带来链路负担和存储压力。 +因此探针提供“多模态剥离上传”能力:将原始多模态数据上传到外部存储,并在消息中保留可引用的 URI。 + +关键组件 + +- ``PreUploader``(预处理器):负责“识别 + 改写”,不负责真正写存储 + - 识别 ``Base64Blob`` / ``Blob`` / ``Uri``,生成 ``UploadItem`` 列表 + - 按 ``{base_path}/{date}/{md5}.{ext}`` 生成目标 URI + - 原地修改消息,把可处理的多模态 part 替换为新的 ``Uri`` +- ``Uploader``(上传器):负责“实际上传” + - 接收 ``UploadItem`` 后异步入队上传(不阻塞业务线程) + - 支持幂等跳过(相同内容不重复上传),失败只记日志,不向业务抛异常 +- 固定调用顺序:先 ``pre_uploader.pre_upload(...)``,再对返回的每个 item 调用 ``uploader.upload(...)`` +- 两者成对工作:如果任一 hook 加载失败或返回 ``None``,会整体降级为禁用多模态上传(``uploader/pre-uploader`` 同时为 ``None``) + +必需参数 + +- ``OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE``: 控制处理方向(默认 ``none``) + - ``none``: 不处理任何多模态内容(完全关闭上传链路) + - ``input``: 仅处理请求入参中的多模态内容(用户输入) + - ``output``: 仅处理模型输出中的多模态内容(模型返回) + - ``both``: 同时处理输入与输出 + - 选择建议:只关心上行用 ``input``;只关心下行用 ``output``;全链路统一存储用 ``both`` +- ``OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_STORAGE_BASE_PATH``: 指定上传目标存储根路径 + - 当 ``UPLOAD_MODE=none`` 时不生效 + - 当 ``UPLOAD_MODE`` 不是 ``none`` 时必需配置,否则无法完成上传 + +支持的存储协议包括: + +- ``file:///path/to/dir``: 本地文件系统 +- ``memory://``: 内存文件系统 +- ``oss://bucket-name/prefix``: 阿里云 OSS +- ``sls://project/logstore``: 阿里云 SLS +- 其他 fsspec 支持的协议 + +可选参数: + +- Hook 选择(默认一般不需要改): + - ``OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOADER``: uploader hook 名称(默认 ``fs``) + - ``OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_PRE_UPLOADER``: pre-uploader hook 名称(默认 ``fs``) +- 处理行为开关: + - ``OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_ENABLED``: 是否将外部 URI 资源下载后再上传到配置存储(``true`` / ``false``,默认 ``false``) + - ``OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_SSL_VERIFY``: 下载时是否校验 SSL 证书(``true`` / ``false``,默认 ``true``) + - ``OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_AUDIO_CONVERSION_ENABLED``: 是否启用音频转码(当前支持 PCM16/L16/PCM 转 WAV,``true`` / ``false``,默认 ``false``) + - ``OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_LOCAL_FILE_ENABLED``: 是否允许读取并上传本地文件(支持 ``file://`` URI、绝对路径和相对路径,``true`` / ``false``,默认 ``false``) + - ``OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_ALLOWED_ROOT_PATHS``: 允许访问的本地文件根目录列表(逗号分隔,启用本地文件处理时必需配置) + +``pyproject.toml`` entry point 配置(插件扩展方式):: + + [project.entry-points.opentelemetry_genai_multimodal_uploader] + fs = "opentelemetry.util.genai._multimodal_upload.fs_uploader:fs_uploader_hook" + + [project.entry-points.opentelemetry_genai_multimodal_pre_uploader] + fs = "opentelemetry.util.genai._multimodal_upload.pre_uploader:fs_pre_uploader_hook" + +运行时示例配置:: + + export OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE=both + export OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_STORAGE_BASE_PATH=file:///var/log/genai/multimodal + +如果启用了多模态上传,``ExtendedTelemetryHandler`` 会在首次初始化时注册 ``atexit`` 回调, +并在进程退出时按顺序关闭 ``ExtendedTelemetryHandler`` / ``PreUploader`` / ``Uploader``。 + +如需在应用生命周期中主动关闭(例如服务框架 shutdown hook): + + from opentelemetry.util.genai.extended_handler import ExtendedTelemetryHandler + + ExtendedTelemetryHandler.shutdown() + +依赖要求: + 多模态上传功能需要安装 ``fsspec`` 和 ``httpx`` 包(必需),以及 ``numpy`` 和 ``soundfile`` 包(可选,用于音频格式pcm - wav 转换,且需 ``OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_AUDIO_CONVERSION_ENABLED=true`` 才会启用)。 + 可以通过 ``pip install opentelemetry-util-genai[multimodal_upload]`` 安装必需依赖; ``pip install opentelemetry-util-genai[audio_conversion]`` 安装音频格式转换依赖。 + 示例配置 ~~~~~~~~ @@ -54,12 +131,59 @@ LoongSuite 扩展为 OpenTelemetry GenAI Util 包提供了额外的 Generative A export OTEL_SEMCONV_STABILITY_OPT_IN=gen_ai_latest_experimental export OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=SPAN_AND_EVENT export OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT=true - + export OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE=both + export OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_STORAGE_BASE_PATH=file:///var/log/genai/multimodal 支持的操作 ---------- -1. Agent 调用 (invoke_agent) +1. LLM 调用 (llm) +~~~~~~~~~~~~~~~~~~ + +用于跟踪大语言模型(LLM)的聊天补全调用操作。LoongSuite 扩展增强了多模态数据处理能力,支持图片、音频、视频等多模态内容的自动上传和管理。 + +**支持的多模态 Part 类型:** + +消息中的 ``parts`` 字段支持以下类型: + +- ``Text``: 文本内容 +- ``Base64Blob``: Base64 编码的二进制数据(图片、音频、视频) +- ``Blob``: 原始二进制数据 +- ``Uri``: 引用远程资源的 URI(http/https URL 或已上传的文件路径) + +多模态数据处理流程: + +1. ``Base64Blob`` 和 ``Blob`` 会被自动解码并上传到配置的存储后端 +2. ``Uri`` 中的 http/https URL 会被下载并上传(如启用下载功能) +3. 上传后,原始的 ``Base64Blob``/``Blob``/``Uri`` 会被替换为指向新存储位置的 ``Uri`` +4. 消息内容在 span/event 中序列化时会包含替换后的 ``Uri`` + +**增强的属性:** + +消息内容(受内容捕获模式控制): + - ``gen_ai.input.messages``: 输入消息(包含多模态 parts,经过上传处理后的内容) + - ``gen_ai.output.messages``: 输出消息(包含多模态 parts,经过上传处理后的内容) + +多模态元数据(LoongSuite 扩展属性): + - ``gen_ai.input.multimodal_metadata``: 输入消息的多模态元数据,记录处理的多模态内容信息(JSON 格式) + - ``gen_ai.output.multimodal_metadata``: 输出消息的多模态元数据,记录处理的多模态内容信息(JSON 格式) + +**多模态元数据示例:** + +当处理包含多模态内容的消息时,会自动生成元数据记录处理信息:: + + # gen_ai.input.multimodal_metadata 属性值示例 + [ + { + "modality": "image", + "mime_type": "image/png", + "uri": "oss://bucket/20260107/abc123.png", # 上传后的路径 + "type": "uri" # 类型 + } + ] + + +2. Agent 调用 (invoke_agent) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 用于跟踪 AI Agent 的调用操作,支持完整的消息流、工具定义和系统指令。 @@ -139,7 +263,7 @@ Token 使用: invocation.output_tokens = 20 -2. Agent 创建 (create_agent) +3. Agent 创建 (create_agent) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 用于跟踪 AI Agent 的创建操作。 @@ -166,7 +290,7 @@ Token 使用: invocation.request_model = "gpt-4" -3. 向量嵌入 (embedding) +4. 向量嵌入 (embedding) ~~~~~~~~~~~~~~~~~~~~~~~~ 用于跟踪向量嵌入生成操作。 @@ -197,7 +321,7 @@ Token 使用: invocation.input_tokens = 50 -4. 工具执行 (execute_tool) +5. 工具执行 (execute_tool) ~~~~~~~~~~~~~~~~~~~~~~~~~~~ 用于跟踪工具或函数的执行操作。 @@ -226,7 +350,7 @@ Token 使用: invocation.tool_call_result = result -5. 文档检索 (retrieve) +6. 文档检索 (retrieve) ~~~~~~~~~~~~~~~~~~~~~~~ 用于跟踪从向量数据库或搜索系统检索文档的操作。 @@ -255,7 +379,7 @@ Token 使用: ] -6. 文档重排序 (rerank) +7. 文档重排序 (rerank) ~~~~~~~~~~~~~~~~~~~~~~~ 用于跟踪文档重排序操作,支持基于模型和基于 LLM 的重排序器。 @@ -312,7 +436,7 @@ Token 使用: invocation.rerank_output_documents = [...] -7. 记忆操作 (memory) +8. 记忆操作 (memory) ~~~~~~~~~~~~~~~~~~~~ 用于跟踪 AI Agent 的记忆操作,支持记忆的增删改查、搜索和历史查询等功能。 diff --git a/util/opentelemetry-util-genai/pyproject.toml b/util/opentelemetry-util-genai/pyproject.toml index da0ed9b22..2debf48cb 100644 --- a/util/opentelemetry-util-genai/pyproject.toml +++ b/util/opentelemetry-util-genai/pyproject.toml @@ -33,10 +33,17 @@ dependencies = [ [project.entry-points.opentelemetry_genai_completion_hook] upload = "opentelemetry.util.genai._upload:upload_completion_hook" +[project.entry-points.opentelemetry_genai_multimodal_uploader] +fs = "opentelemetry.util.genai._multimodal_upload.fs_uploader:fs_uploader_hook" + +[project.entry-points.opentelemetry_genai_multimodal_pre_uploader] +fs = "opentelemetry.util.genai._multimodal_upload.pre_uploader:fs_pre_uploader_hook" + [project.optional-dependencies] test = ["pytest>=7.0.0"] upload = ["fsspec>=2025.9.0"] -multimodal_upload = ["httpx", "fsspec>=2025.9.0", "numpy", "soundfile"] +multimodal_upload = ["httpx", "fsspec>=2025.9.0"] +audio_conversion = ["numpy", "soundfile"] [project.urls] Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/util/opentelemetry-util-genai" diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_processing.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_processing.py index 051b5348c..d57edfadd 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_processing.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_processing.py @@ -30,6 +30,7 @@ def __init__(self, ...): from __future__ import annotations +import atexit import logging import os import queue @@ -46,16 +47,21 @@ def __init__(self, ...): Literal, Optional, Tuple, + Union, + cast, ) from opentelemetry import context as otel_context +from opentelemetry._logs import Logger as OtelLogger from opentelemetry.trace import Span from opentelemetry.util.genai._extended_semconv import ( gen_ai_extended_attributes as GenAIEx, ) -from opentelemetry.util.genai.extended_environment_variables import ( - OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE, +from opentelemetry.util.genai.extended_span_utils import ( + _apply_invoke_agent_finish_attributes, + _maybe_emit_invoke_agent_event, ) +from opentelemetry.util.genai.extended_types import InvokeAgentInvocation from opentelemetry.util.genai.span_utils import ( _apply_error_attributes, _apply_llm_finish_attributes, @@ -64,6 +70,7 @@ def __init__(self, ...): from opentelemetry.util.genai.types import ( Base64Blob, Blob, + ContentCapturingMode, Error, InputMessage, LLMInvocation, @@ -72,6 +79,9 @@ def __init__(self, ...): ) from opentelemetry.util.genai.utils import ( gen_ai_json_dumps, + get_content_capturing_mode, + get_multimodal_upload_mode, + is_experimental_mode, ) if TYPE_CHECKING: @@ -85,13 +95,19 @@ def __init__(self, ...): # Async queue maximum length _MAX_ASYNC_QUEUE_SIZE = 1000 +# Invocation types that carry multimodal messages +_MultimodalInvocation = Union[LLMInvocation, InvokeAgentInvocation] + +# Task method literals +_TaskMethod = Literal["stop_llm", "fail_llm", "stop_agent", "fail_agent"] + @dataclass class _MultimodalAsyncTask: """Async multimodal processing task""" - invocation: LLMInvocation - method: Literal["stop", "fail"] + invocation: _MultimodalInvocation + method: _TaskMethod error: Optional[Error] = None handler: Optional[Any] = None # TelemetryHandler instance @@ -115,33 +131,55 @@ class MultimodalProcessingMixin: _async_worker: ClassVar[Optional[threading.Thread]] = None _async_lock: ClassVar[threading.Lock] = threading.Lock() _atexit_handler: ClassVar[Optional[object]] = None + _shutdown_atexit_lock: ClassVar[threading.Lock] = threading.Lock() + _shutdown_lock: ClassVar[threading.Lock] = threading.Lock() + _shutdown_called: ClassVar[bool] = False # Instance-level attributes (initialized by _init_multimodal) _multimodal_enabled: bool def _init_multimodal(self) -> None: """Initialize multimodal-related instance attributes, called in subclass __init__""" - upload_mode = os.getenv( - OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE, "both" - ).lower() + self._multimodal_enabled = False + + if get_multimodal_upload_mode() == "none": + return + + try: + capture_enabled = ( + is_experimental_mode() + and get_content_capturing_mode() + in ( + ContentCapturingMode.SPAN_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ) + ) + except ValueError: + # get_content_capturing_mode raises ValueError when GEN_AI stability mode is DEFAULT + capture_enabled = False + + if not capture_enabled: + return uploader, pre_uploader = self._get_uploader_and_pre_uploader() - self._multimodal_enabled = ( - upload_mode != "none" - and uploader is not None - and pre_uploader is not None - ) + if uploader is not None and pre_uploader is not None: + self._multimodal_enabled = True # ==================== Public Methods ==================== - def process_multimodal_stop(self, invocation: LLMInvocation) -> bool: - """Process multimodal stop_llm request + def process_multimodal_stop( + self, + invocation: _MultimodalInvocation, + method: _TaskMethod, + ) -> bool: + """Process multimodal stop request Args: - invocation: LLM invocation object + invocation: LLM or Agent invocation object + method: Task method for dispatch ("stop_llm" or "stop_agent") Returns: - bool: Whether handled (True means async processed, caller doesn't need to continue; False means no multimodal, need sync path) + bool: Whether handled (True = async processed, False = no multimodal) """ if invocation.context_token is None or invocation.span is None: return False @@ -149,7 +187,7 @@ def process_multimodal_stop(self, invocation: LLMInvocation) -> bool: if not self._should_async_process(invocation): return False - # 1. First detach context (let user code continue execution) + # 1. Detach context immediately (let user code continue) otel_context.detach(invocation.context_token) # 2. Ensure worker is started @@ -158,31 +196,36 @@ def process_multimodal_stop(self, invocation: LLMInvocation) -> bool: # 3. Try to put into queue (non-blocking) async_queue = self.__class__._async_queue if async_queue is None: - self._fallback_end_span(invocation) + self._fallback_stop(invocation, method) return True try: async_queue.put_nowait( _MultimodalAsyncTask( - invocation=invocation, method="stop", handler=self + invocation=invocation, + method=method, + handler=self, ) ) except queue.Full: - # Queue full: sync degradation, skip multimodal processing _logger.warning( "Multimodal queue full, skipping multimodal processing" ) - self._fallback_end_span(invocation) + self._fallback_stop(invocation, method) return True def process_multimodal_fail( - self, invocation: LLMInvocation, error: Error + self, + invocation: _MultimodalInvocation, + error: Error, + method: _TaskMethod, ) -> bool: - """Process multimodal fail_llm request + """Process multimodal fail request Args: - invocation: LLM invocation object + invocation: LLM or Agent invocation object error: Error information + method: Task method for dispatch ("fail_llm" or "fail_agent") Returns: bool: Whether handled @@ -198,13 +241,13 @@ def process_multimodal_fail( async_queue = self.__class__._async_queue if async_queue is None: - self._fallback_fail_span(invocation, error) + self._fallback_fail(invocation, error, method) return True try: async_queue.put_nowait( _MultimodalAsyncTask( invocation=invocation, - method="fail", + method=method, error=error, handler=self, ) @@ -213,7 +256,7 @@ def process_multimodal_fail( _logger.warning( "Multimodal queue full, skipping multimodal processing" ) - self._fallback_fail_span(invocation, error) + self._fallback_fail(invocation, error, method) return True @@ -221,7 +264,7 @@ def process_multimodal_fail( def shutdown_multimodal_worker(cls, timeout: float = 5.0) -> None: """Gracefully shutdown async worker - Called by ArmsShutdownProcessor, no need to call other components' shutdown internally. + Called by shutdown during graceful exit. Strategy: 1. Try to send None signal to queue within timeout @@ -256,6 +299,64 @@ def shutdown_multimodal_worker(cls, timeout: float = 5.0) -> None: cls._async_worker = None cls._async_queue = None + @classmethod + def _ensure_multimodal_shutdown_atexit_registered(cls) -> None: + """Register a single process-level atexit shutdown callback.""" + if cls._atexit_handler is not None: + return + with cls._shutdown_atexit_lock: + if cls._atexit_handler is not None: + return + cls._atexit_handler = atexit.register(cls._shutdown_for_exit) + + @classmethod + def _shutdown_for_exit(cls) -> None: + """atexit callback entrypoint for multimodal graceful shutdown.""" + cls.shutdown() + + @classmethod + def shutdown( + cls, + worker_timeout: float = 5.0, + pre_uploader_timeout: float = 2.0, + uploader_timeout: float = 5.0, + ) -> None: + """Shutdown multimodal worker, pre-uploader and uploader in order.""" + with cls._shutdown_lock: + if cls._shutdown_called: + return + cls._shutdown_called = True + + cls.shutdown_multimodal_worker(worker_timeout) + cls._shutdown_pre_uploader(pre_uploader_timeout) + cls._shutdown_uploader(uploader_timeout) + + @classmethod + def _shutdown_pre_uploader(cls, timeout: float) -> None: + try: + from opentelemetry.util.genai._multimodal_upload import ( # pylint: disable=import-outside-toplevel # noqa: PLC0415 + get_pre_uploader, + ) + + pre_uploader = get_pre_uploader() + if pre_uploader is not None and hasattr(pre_uploader, "shutdown"): + pre_uploader.shutdown(timeout=timeout) + except Exception as exc: # pylint: disable=broad-except + _logger.warning("Error shutting down PreUploader: %s", exc) + + @classmethod + def _shutdown_uploader(cls, timeout: float) -> None: + try: + from opentelemetry.util.genai._multimodal_upload import ( # pylint: disable=import-outside-toplevel # noqa: PLC0415 + get_uploader, + ) + + uploader = get_uploader() + if uploader is not None and hasattr(uploader, "shutdown"): + uploader.shutdown(timeout=timeout) + except Exception as exc: # pylint: disable=broad-except + _logger.warning("Error shutting down Uploader: %s", exc) + @classmethod def _at_fork_reinit(cls) -> None: """Reset class-level state in child process after fork""" @@ -269,7 +370,7 @@ def _at_fork_reinit(cls) -> None: # ==================== Internal Methods ==================== - def _should_async_process(self, invocation: LLMInvocation) -> bool: + def _should_async_process(self, invocation: _MultimodalInvocation) -> bool: """Determine whether async processing is needed Condition: Has multimodal data and multimodal upload switch is not 'none' @@ -280,7 +381,7 @@ def _should_async_process(self, invocation: LLMInvocation) -> bool: return MultimodalProcessingMixin._quick_has_multimodal(invocation) @staticmethod - def _quick_has_multimodal(invocation: LLMInvocation) -> bool: + def _quick_has_multimodal(invocation: _MultimodalInvocation) -> bool: """Quick detection of multimodal data (O(n), no network)""" def _check_messages( @@ -349,10 +450,7 @@ def _async_worker_loop(cls) -> None: continue try: - if task.method == "stop": - handler._async_stop_llm(task) - elif task.method == "fail": - handler._async_fail_llm(task) + handler._dispatch_task(task) except ( AttributeError, TypeError, @@ -389,6 +487,19 @@ def _async_worker_loop(cls) -> None: # Use local variable to avoid race condition async_queue.task_done() + def _dispatch_task(self, task: _MultimodalAsyncTask) -> None: + """Dispatch task to the appropriate handler method based on task.method""" + if task.method == "stop_llm": + self._async_stop_llm(task) + elif task.method == "fail_llm": + self._async_fail_llm(task) + elif task.method == "stop_agent": + self._async_stop_invoke_agent(task) + elif task.method == "fail_agent": + self._async_fail_invoke_agent(task) + + # ==================== LLM Async Methods ==================== + def _async_stop_llm(self, task: _MultimodalAsyncTask) -> None: """Async stop LLM invocation (executed in worker thread)""" invocation = task.invocation @@ -399,26 +510,12 @@ def _async_stop_llm(self, task: _MultimodalAsyncTask) -> None: # 1. Get uploader and process multimodal data uploader, pre_uploader = self._get_uploader_and_pre_uploader() if uploader is not None and pre_uploader is not None: - self._separate_and_upload(span, invocation, uploader, pre_uploader) - # Extract and set multimodal metadata - input_metadata, output_metadata = ( - MultimodalProcessingMixin._extract_multimodal_metadata( - invocation.input_messages, invocation.output_messages - ) + self._upload_and_set_metadata( + span, invocation, uploader, pre_uploader ) - if input_metadata: - span.set_attribute( - GenAIEx.GEN_AI_INPUT_MULTIMODAL_METADATA, - gen_ai_json_dumps(input_metadata), - ) - if output_metadata: - span.set_attribute( - GenAIEx.GEN_AI_OUTPUT_MULTIMODAL_METADATA, - gen_ai_json_dumps(output_metadata), - ) # 2. Execute original attribute setting - _apply_llm_finish_attributes(span, invocation) + _apply_llm_finish_attributes(span, invocation) # type: ignore[arg-type] # 3. Record metrics (using TelemetryHandler's method) self._record_llm_metrics(invocation, span) # type: ignore[attr-defined] @@ -443,25 +540,12 @@ def _async_fail_llm(self, task: _MultimodalAsyncTask) -> None: # 1. Get uploader and process multimodal data uploader, pre_uploader = self._get_uploader_and_pre_uploader() if uploader is not None and pre_uploader is not None: - self._separate_and_upload(span, invocation, uploader, pre_uploader) - input_metadata, output_metadata = ( - MultimodalProcessingMixin._extract_multimodal_metadata( - invocation.input_messages, invocation.output_messages - ) + self._upload_and_set_metadata( + span, invocation, uploader, pre_uploader ) - if input_metadata: - span.set_attribute( - GenAIEx.GEN_AI_INPUT_MULTIMODAL_METADATA, - gen_ai_json_dumps(input_metadata), - ) - if output_metadata: - span.set_attribute( - GenAIEx.GEN_AI_OUTPUT_MULTIMODAL_METADATA, - gen_ai_json_dumps(output_metadata), - ) # 2. Set attributes - _apply_llm_finish_attributes(span, invocation) + _apply_llm_finish_attributes(span, invocation) # type: ignore[arg-type] _apply_error_attributes(span, error) # 3. Record metrics @@ -477,38 +561,170 @@ def _async_fail_llm(self, task: _MultimodalAsyncTask) -> None: ) span.end(end_time=end_time_ns) - def _fallback_end_span(self, invocation: LLMInvocation) -> None: - """Sync degradation: skip multimodal, follow original logic to end span""" + # ==================== Agent Async Methods ==================== + + def _async_stop_invoke_agent(self, task: _MultimodalAsyncTask) -> None: + """Async stop Agent invocation (executed in worker thread)""" + invocation = task.invocation + if not isinstance(invocation, InvokeAgentInvocation): + return span = invocation.span if span is None: return - _apply_llm_finish_attributes(span, invocation) - self._record_llm_metrics(invocation, span) # type: ignore[attr-defined] - _maybe_emit_llm_event(self._logger, span, invocation) # type: ignore[attr-defined] + + # 1. Get uploader and process multimodal data + uploader, pre_uploader = self._get_uploader_and_pre_uploader() + if uploader is not None and pre_uploader is not None: + self._upload_and_set_metadata( + span, invocation, uploader, pre_uploader + ) + + # 2. Execute attribute setting + _apply_invoke_agent_finish_attributes(span, invocation) + + # 3. Record metrics + cast(Any, self)._record_extended_metrics(span, invocation) + + # 4. Send event + event_logger = cast( + Optional[OtelLogger], getattr(self, "_logger", None) + ) + _maybe_emit_invoke_agent_event( + event_logger, + span, + invocation, + ) + + # 5. Calculate correct end time and end span + end_time_ns = MultimodalProcessingMixin._compute_end_time_ns( + invocation + ) + span.end(end_time=end_time_ns) + + def _async_fail_invoke_agent(self, task: _MultimodalAsyncTask) -> None: + """Async fail Agent invocation (executed in worker thread)""" + invocation = task.invocation + if not isinstance(invocation, InvokeAgentInvocation): + return + error = task.error + span = invocation.span + if span is None or error is None: + return + + # 1. Get uploader and process multimodal data + uploader, pre_uploader = self._get_uploader_and_pre_uploader() + if uploader is not None and pre_uploader is not None: + self._upload_and_set_metadata( + span, invocation, uploader, pre_uploader + ) + + # 2. Set attributes + _apply_invoke_agent_finish_attributes(span, invocation) + _apply_error_attributes(span, error) + + # 3. Record metrics + error_type = getattr(error.type, "__qualname__", None) + cast(Any, self)._record_extended_metrics( + span, invocation, error_type=error_type + ) + + # 4. Send event + event_logger = cast( + Optional[OtelLogger], getattr(self, "_logger", None) + ) + _maybe_emit_invoke_agent_event( + event_logger, + span, + invocation, + error, + ) + + # 5. End span end_time_ns = MultimodalProcessingMixin._compute_end_time_ns( invocation ) span.end(end_time=end_time_ns) - def _fallback_fail_span( - self, invocation: LLMInvocation, error: Error + # ==================== Fallback Methods ==================== + + def _fallback_stop( + self, + invocation: _MultimodalInvocation, + method: _TaskMethod, ) -> None: - """Sync degradation: skip multimodal, follow original logic to end span (with error)""" + """Sync degradation for stop: skip multimodal, end span with attributes""" + span = invocation.span + if span is None: + return + if method == "stop_llm": + if not isinstance(invocation, LLMInvocation): + return + _apply_llm_finish_attributes(span, invocation) + cast(Any, self)._record_llm_metrics(invocation, span) + event_logger = cast( + Optional[OtelLogger], getattr(self, "_logger", None) + ) + _maybe_emit_llm_event(event_logger, span, invocation) + elif method == "stop_agent": + if not isinstance(invocation, InvokeAgentInvocation): + return + _apply_invoke_agent_finish_attributes(span, invocation) + cast(Any, self)._record_extended_metrics(span, invocation) + event_logger = cast( + Optional[OtelLogger], getattr(self, "_logger", None) + ) + _maybe_emit_invoke_agent_event(event_logger, span, invocation) + end_time_ns = MultimodalProcessingMixin._compute_end_time_ns( + invocation + ) + span.end(end_time=end_time_ns) + + def _fallback_fail( + self, + invocation: _MultimodalInvocation, + error: Error, + method: _TaskMethod, + ) -> None: + """Sync degradation for fail: skip multimodal, end span with error""" span = invocation.span if span is None: return - _apply_llm_finish_attributes(span, invocation) - _apply_error_attributes(span, error) error_type = getattr(error.type, "__qualname__", None) - self._record_llm_metrics(invocation, span, error_type=error_type) # type: ignore[attr-defined] - _maybe_emit_llm_event(self._logger, span, invocation, error) # type: ignore[attr-defined] + if method == "fail_llm": + if not isinstance(invocation, LLMInvocation): + return + _apply_llm_finish_attributes(span, invocation) + _apply_error_attributes(span, error) + cast(Any, self)._record_llm_metrics( + invocation, span, error_type=error_type + ) + event_logger = cast( + Optional[OtelLogger], getattr(self, "_logger", None) + ) + _maybe_emit_llm_event(event_logger, span, invocation, error) + elif method == "fail_agent": + if not isinstance(invocation, InvokeAgentInvocation): + return + _apply_invoke_agent_finish_attributes(span, invocation) + _apply_error_attributes(span, error) + cast(Any, self)._record_extended_metrics( + span, invocation, error_type=error_type + ) + event_logger = cast( + Optional[OtelLogger], getattr(self, "_logger", None) + ) + _maybe_emit_invoke_agent_event( + event_logger, span, invocation, error + ) end_time_ns = MultimodalProcessingMixin._compute_end_time_ns( invocation ) span.end(end_time=end_time_ns) + # ==================== Timing Helpers ==================== + @staticmethod - def _compute_end_time_ns(invocation: LLMInvocation) -> int: + def _compute_end_time_ns(invocation: _MultimodalInvocation) -> int: """Calculate absolute time (nanoseconds) based on monotonic time""" if not invocation.monotonic_end_s or not invocation.monotonic_start_s: return time_ns() @@ -535,18 +751,43 @@ def _get_uploader_and_pre_uploader( # pylint: disable=no-self-use """ try: from opentelemetry.util.genai._multimodal_upload import ( # pylint: disable=import-outside-toplevel # noqa: PLC0415 - get_pre_uploader, - get_uploader, + get_or_load_uploader_pair, ) - return get_uploader(), get_pre_uploader() + return get_or_load_uploader_pair() except ImportError: return None, None + def _upload_and_set_metadata( + self, + span: Span, + invocation: _MultimodalInvocation, + uploader: "Uploader", + pre_uploader: "PreUploader", + ) -> None: + """Upload multimodal data and set metadata attributes on span""" + self._separate_and_upload(span, invocation, uploader, pre_uploader) + + input_metadata, output_metadata = ( + MultimodalProcessingMixin._extract_multimodal_metadata( + invocation.input_messages, invocation.output_messages + ) + ) + if input_metadata: + span.set_attribute( + GenAIEx.GEN_AI_INPUT_MULTIMODAL_METADATA, + gen_ai_json_dumps(input_metadata), + ) + if output_metadata: + span.set_attribute( + GenAIEx.GEN_AI_OUTPUT_MULTIMODAL_METADATA, + gen_ai_json_dumps(output_metadata), + ) + def _separate_and_upload( # pylint: disable=no-self-use self, span: Span, - invocation: LLMInvocation, + invocation: _MultimodalInvocation, uploader: "Uploader", pre_uploader: "PreUploader", ) -> None: @@ -585,7 +826,14 @@ def _extract_multimodal_metadata( input_messages: Optional[List[InputMessage]], output_messages: Optional[List[OutputMessage]], ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: - """Extract multimodal metadata from messages""" + """Extract multimodal metadata from messages. + + Important: + - URI metadata extraction is based on the final message parts. + - It is independent from download/replace success in pre-uploader. + - When URI replacement is skipped (e.g. download disabled) or fails, + the original URI should still remain in messages and be reported here. + """ def _extract_from_messages( messages: Optional[List[InputMessage] | List[OutputMessage]], diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/__init__.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/__init__.py index a3974a4ed..3d3ef9c35 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/__init__.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/__init__.py @@ -12,30 +12,26 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Multimodal Upload Module - -Provides upload support for multimodal data (images, audio, video). - -Responsibilities: -1. Define and manage global Uploader/PreUploader singletons -2. Provide set_*/get_* interfaces for external initialization and retrieval -3. extended_handler.py retrieves instances via get_uploader()/get_pre_uploader() - -Note: This module does not create concrete instances, only manages singletons. -Concrete instances are created by external modules like ARMS storage.py and registered via set_*(). -""" +"""Multimodal upload public exports.""" from __future__ import annotations -from typing import Optional - -from opentelemetry.util._once import Once from opentelemetry.util.genai._multimodal_upload._base import ( PreUploader, PreUploadItem, Uploader, UploadItem, ) +from opentelemetry.util.genai._multimodal_upload.multimodal_upload_hook import ( + get_or_load_pre_uploader, + get_or_load_uploader, + get_or_load_uploader_pair, + get_pre_uploader, + get_uploader, + get_uploader_pair, + load_pre_uploader_hook, + load_uploader_hook, +) try: from opentelemetry.util.genai._multimodal_upload.fs_uploader import ( @@ -51,50 +47,18 @@ except ImportError: MultimodalPreUploader = None -_uploader: Optional[Uploader] = None -_uploader_set_once = Once() -_preuploader: Optional[PreUploader] = None -_preuploader_set_once = Once() - - -def set_uploader(uploader: Uploader) -> None: - """Set global Uploader instance (can only be set once)""" - - def _set() -> None: - global _uploader # pylint: disable=global-statement - _uploader = uploader - - _uploader_set_once.do_once(_set) - - -def get_uploader() -> Optional[Uploader]: - """Get global Uploader instance""" - return _uploader - - -def set_pre_uploader(pre_uploader: PreUploader) -> None: - """Set global PreUploader instance (can only be set once)""" - - def _set() -> None: - global _preuploader # pylint: disable=global-statement - _preuploader = pre_uploader - - _preuploader_set_once.do_once(_set) - - -def get_pre_uploader() -> Optional[PreUploader]: - """Get global PreUploader instance""" - return _preuploader - - __all__ = [ "UploadItem", "PreUploadItem", "Uploader", "PreUploader", - "set_uploader", + "load_uploader_hook", + "load_pre_uploader_hook", + "get_uploader_pair", + "get_or_load_uploader", + "get_or_load_pre_uploader", + "get_or_load_uploader_pair", "get_uploader", - "set_pre_uploader", "get_pre_uploader", ] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/_base.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/_base.py index d71bed878..a8714e799 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/_base.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/_base.py @@ -126,3 +126,16 @@ def pre_upload( - Passed messages will be modified in-place, replacing BlobPart with UriPart - Returned PreUploadItem needs to be uploaded via Uploader.upload() """ + + def shutdown( # pylint: disable=no-self-use,unused-argument,useless-return + self, timeout: float = 5.0 + ) -> None: + """Gracefully shutdown the pre-uploader. + + Default implementation is no-op for lightweight/stateless implementations. + + Args: + timeout: Maximum wait time (seconds) + """ + # Default no-op for stateless implementations. + return None diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/fs_uploader.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/fs_uploader.py index bcba19582..884ee6528 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/fs_uploader.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/fs_uploader.py @@ -44,6 +44,7 @@ ) from opentelemetry.util.genai.extended_environment_variables import ( OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_SSL_VERIFY, + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_STORAGE_BASE_PATH, ) _logger = logging.getLogger(__name__) @@ -59,6 +60,20 @@ def hash_content(content: bytes | str) -> str: return hashlib.sha256(content, usedforsecurity=False).hexdigest() +def fs_uploader_hook() -> Optional[Uploader]: + """Create default FsUploader from environment variables.""" + base_path = os.environ.get( + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_STORAGE_BASE_PATH + ) + if not base_path: + _logger.warning( + "%s is required but not set, multimodal uploader disabled", + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_STORAGE_BASE_PATH, + ) + return None + return FsUploader(base_path=base_path) + + @dataclass class _Task: path: str @@ -73,13 +88,39 @@ class _Task: class FsUploader(Uploader): - """fsspec-based generic file uploader. + """An fsspec-based generic file uploader for multimodal data + + This class handles actual file upload operations for upload items derived from + :class:`~opentelemetry.util.genai._multimodal_upload.PreUploader` + + Supports multiple storage backends via fsspec protocols: + - Local filesystem (file://) + - Alibaba Cloud OSS (oss://) + - Alibaba Cloud SLS (sls://) + - Other fsspec-compatible backends - Supports multiple storage backends: local filesystem, OSS, SLS, etc. + Both the ``fsspec`` and ``httpx`` packages should be installed for full functionality. + For SSL verification control, set :envvar:`OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_SSL_VERIFY` + to ``false`` to disable SSL verification (default is ``true``). + Features: - Enqueue via upload(path, content, skip_if_exists=True) - - Background pool writes to fsspec filesystem. - - LRU cache avoids re-upload when filename already derived from content hash. + - Background thread pool writes to fsspec filesystem + - LRU cache avoids re-upload when filename already derived from content hash + - Supports download-and-upload mode for remote URIs + - Automatic retry on upload failure + + Args: + base_path: Complete base path including protocol (e.g., 'oss://bucket', 'sls://project/logstore', 'file:///path') + max_workers: Maximum number of concurrent upload workers (default: 4) + max_queue_size: Maximum number of tasks in upload queue (default: 1024) + max_queue_bytes: Maximum total bytes in queue, 0 for unlimited (default: 0) + lru_cache_max_size: Maximum size of LRU cache for uploaded files (default: 2048) + auto_mkdirs: Automatically create parent directories (default: True) + content_type: Default content type for uploaded files (default: None) + storage_options: Additional options passed to fsspec (e.g., credentials) (default: None) + max_upload_retries: Maximum retry attempts for failed uploads, 0 for infinite (default: 10) + upload_retry_delay: Delay in seconds between retries (default: 1.0) """ def __init__( diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/multimodal_upload_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/multimodal_upload_hook.py new file mode 100644 index 000000000..ace63e201 --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/multimodal_upload_hook.py @@ -0,0 +1,174 @@ +from __future__ import annotations + +import logging +from importlib import metadata +from typing import Any, Optional, Protocol, runtime_checkable + +from opentelemetry.util._once import Once +from opentelemetry.util.genai.utils import ( + get_multimodal_pre_uploader_hook_name, + get_multimodal_upload_mode, + get_multimodal_uploader_hook_name, +) + +from ._base import PreUploader, Uploader + +_logger = logging.getLogger(__name__) + +_MULTIMODAL_UPLOADER_ENTRY_POINT_GROUP = ( + "opentelemetry_genai_multimodal_uploader" +) +_MULTIMODAL_PRE_UPLOADER_ENTRY_POINT_GROUP = ( + "opentelemetry_genai_multimodal_pre_uploader" +) + +_UPLOAD_MODE_NONE = "none" + +_uploader: Optional[Uploader] = None +_pre_uploader: Optional[PreUploader] = None +_load_once = Once() + + +def _iter_entry_points(group: str) -> list[Any]: + eps = metadata.entry_points() + if hasattr(eps, "select"): + return list(eps.select(group=group)) # pyright: ignore [reportUnknownMemberType, reportUnknownArgumentType, reportAttributeAccessIssue] + legacy_group_eps = eps[group] if group in eps else [] + return list(legacy_group_eps) + + +@runtime_checkable +class UploaderHook(Protocol): + def __call__(self) -> Optional[Uploader]: ... + + +@runtime_checkable +class PreUploaderHook(Protocol): + def __call__(self) -> Optional[PreUploader]: ... + + +def _load_by_name( + *, + hook_name: str, + group: str, +) -> Optional[object]: + for entry_point in _iter_entry_points(group): + name = str(entry_point.name) + if name != hook_name: + continue + try: + return entry_point.load()() + except Exception: # pylint: disable=broad-except + _logger.exception("%s hook %s configuration failed", group, name) + return None + return None + + +def load_uploader_hook() -> Optional[Uploader]: + """Load multimodal uploader hook from entry points. + + Mechanism: + - read hook name from env var + `OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOADER` + - resolve hook factory from entry-point group + `opentelemetry_genai_multimodal_uploader` + - call zero-arg hook factory to build uploader instance + - validate returned object type (`Uploader`) + """ + upload_mode = get_multimodal_upload_mode() + if upload_mode == _UPLOAD_MODE_NONE: + return None + + hook_name = get_multimodal_uploader_hook_name() + if not hook_name: + return None + + uploader = _load_by_name( + hook_name=hook_name, group=_MULTIMODAL_UPLOADER_ENTRY_POINT_GROUP + ) + if uploader is None: + return None + if not isinstance(uploader, Uploader): + _logger.debug("%s is not a valid Uploader", hook_name) + return None + _logger.debug("Using multimodal uploader hook %s", hook_name) + return uploader + + +def load_pre_uploader_hook() -> Optional[PreUploader]: + """Load multimodal pre-uploader hook from entry points. + + Mechanism: + - read hook name from env var + `OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_PRE_UPLOADER` + (default: `fs`) + - resolve hook factory from entry-point group + `opentelemetry_genai_multimodal_pre_uploader` + - call zero-arg hook factory to build pre-uploader instance + - validate returned object type (`PreUploader`) + """ + upload_mode = get_multimodal_upload_mode() + if upload_mode == _UPLOAD_MODE_NONE: + return None + + hook_name = get_multimodal_pre_uploader_hook_name() + if not hook_name: + return None + pre_uploader = _load_by_name( + hook_name=hook_name, + group=_MULTIMODAL_PRE_UPLOADER_ENTRY_POINT_GROUP, + ) + if pre_uploader is None: + return None + if not isinstance(pre_uploader, PreUploader): + _logger.debug("%s is not a valid PreUploader", hook_name) + return None + _logger.debug("Using multimodal pre-uploader hook %s", hook_name) + return pre_uploader + + +def get_or_load_uploader_pair() -> tuple[ + Optional[Uploader], Optional[PreUploader] +]: + """Get lazily loaded singleton uploader/pre-uploader pair. + + First call performs one-time loading; subsequent calls return cache. + If either side fails to load, both are downgraded to `(None, None)`. + """ + + def _load() -> None: + global _uploader # pylint: disable=global-statement + global _pre_uploader # pylint: disable=global-statement + _uploader = load_uploader_hook() + _pre_uploader = load_pre_uploader_hook() + if _uploader is None or _pre_uploader is None: + _uploader = None + _pre_uploader = None + + _load_once.do_once(_load) + return _uploader, _pre_uploader + + +def get_uploader_pair() -> tuple[Optional[Uploader], Optional[PreUploader]]: + """Return cached uploader pair without triggering lazy loading.""" + return _uploader, _pre_uploader + + +def get_or_load_uploader() -> Optional[Uploader]: + """Get uploader and trigger lazy loading when needed.""" + return get_or_load_uploader_pair()[0] + + +def get_or_load_pre_uploader() -> Optional[PreUploader]: + """Get pre-uploader and trigger lazy loading when needed.""" + return get_or_load_uploader_pair()[1] + + +def get_uploader() -> Optional[Uploader]: + """Return cached uploader without triggering lazy loading.""" + return _uploader + + +def get_pre_uploader() -> Optional[PreUploader]: + """Return cached pre-uploader without triggering lazy loading.""" + return _pre_uploader diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/pre_uploader.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/pre_uploader.py index 2f135af50..63884cc4f 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/pre_uploader.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_multimodal_upload/pre_uploader.py @@ -17,6 +17,7 @@ Processes Base64Blob/Blob/Uri, generates PreUploadItem list. Actual upload is completed by Uploader implementation class. """ +# pylint: disable=too-many-lines from __future__ import annotations @@ -25,13 +26,17 @@ import concurrent.futures import io import logging +import mimetypes import os import re import threading import time +import weakref from dataclasses import dataclass from datetime import datetime -from typing import Any, ClassVar, Dict, List, Optional, Tuple, Union, get_args +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, Union, get_args +from urllib.parse import urlparse import httpx @@ -46,11 +51,19 @@ PreUploadItem, ) from opentelemetry.util.genai.extended_environment_variables import ( - OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_ENABLED, - OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_SSL_VERIFY, - OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE, + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_STORAGE_BASE_PATH, ) from opentelemetry.util.genai.types import Base64Blob, Blob, Modality, Uri +from opentelemetry.util.genai.utils import ( + get_multimodal_allowed_root_paths, + get_multimodal_storage_base_path, + is_multimodal_audio_conversion_enabled, + is_multimodal_download_enabled, + is_multimodal_local_file_enabled, + should_process_multimodal_input, + should_process_multimodal_output, + should_verify_multimodal_download_ssl, +) # Try importing audio processing libraries (optional dependencies) try: @@ -104,11 +117,13 @@ class MultimodalPreUploader(PreUploader): Environment variables for configuration: - :envvar:`OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE`: Controls which messages to process - ("input", "output", or "both", default: "both") + ("input", "output", or "both", default: "none") - :envvar:`OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_ENABLED`: Enable downloading remote URIs - (default: "true") + (default: "false") - :envvar:`OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_SSL_VERIFY`: Enable SSL verification for downloads (default: "true") + - :envvar:`OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_AUDIO_CONVERSION_ENABLED`: Enable audio conversion + (currently PCM16/L16/PCM to WAV, default: "false") The ``httpx`` package (for URI metadata fetching) should be installed. For audio format conversion support, install ``numpy`` and ``soundfile``. @@ -122,67 +137,80 @@ class MultimodalPreUploader(PreUploader): extra_meta: Additional metadata to include in each upload item (e.g., workspaceId, serviceId for ARMS) """ - # Class-level event loop and dedicated thread - _loop: ClassVar[Optional[asyncio.AbstractEventLoop]] = None - _loop_thread: ClassVar[Optional[threading.Thread]] = None - _loop_lock: ClassVar[threading.Lock] = threading.Lock() - _shutdown_called: ClassVar[bool] = False - # Active task counter (for graceful shutdown wait) - _active_tasks: ClassVar[int] = 0 - _active_cond: ClassVar[threading.Condition] = threading.Condition() - def __init__( self, base_path: str, extra_meta: Optional[Dict[str, str]] = None ) -> None: self._base_path = base_path self._extra_meta = extra_meta or {} + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._loop_thread: Optional[threading.Thread] = None + self._loop_lock = threading.Lock() + self._shutdown_called = False + # Active task counter (for graceful shutdown wait) + self._active_tasks = 0 + self._active_cond = threading.Condition() # Read multimodal upload configuration (static config, read once only) - upload_mode = os.getenv( - OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE, "both" - ).lower() - self._process_input = upload_mode in ("input", "both") - self._process_output = upload_mode in ("output", "both") - self._download_enabled = os.getenv( - OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_ENABLED, "true" - ).lower() in ("true", "1", "yes") - self._ssl_verify = os.getenv( - OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_SSL_VERIFY, "true" - ).lower() not in ("false", "0", "no") + self._process_input = should_process_multimodal_input() + self._process_output = should_process_multimodal_output() + self._download_enabled = is_multimodal_download_enabled() + self._ssl_verify = should_verify_multimodal_download_ssl() + self._audio_conversion_enabled = ( + is_multimodal_audio_conversion_enabled() + ) + + # Local file configuration + self._local_file_enabled = is_multimodal_local_file_enabled() + self._allowed_root_paths = get_multimodal_allowed_root_paths() + + if self._local_file_enabled and not self._allowed_root_paths: + _logger.warning( + "Local file processing enabled but no allowed root paths configured. " + "Local file uploads will be blocked for security." + ) + + # register_at_fork: Reset state in child process + # Use weak reference to avoid preventing instance from being GC'd + if hasattr(os, "register_at_fork"): + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + os.register_at_fork( + after_in_child=lambda: (ref := weak_reinit()) and ref and ref() + ) @property def base_path(self) -> str: return self._base_path - @classmethod - def _ensure_loop(cls) -> asyncio.AbstractEventLoop: + def _ensure_loop(self) -> asyncio.AbstractEventLoop: """Ensure event loop exists and is running (thread-safe)""" # Fast path: loop exists and thread is alive if ( - cls._loop is not None - and cls._loop_thread is not None - and cls._loop_thread.is_alive() + self._loop is not None + and self._loop_thread is not None + and self._loop_thread.is_alive() ): - return cls._loop + return self._loop # Slow path: need to create or rebuild (within lock) - with cls._loop_lock: + with self._loop_lock: + if self._shutdown_called: + raise RuntimeError("MultimodalPreUploader already shutdown") # Double check: check if loop exists and thread is alive if ( - cls._loop is not None - and cls._loop_thread is not None - and cls._loop_thread.is_alive() + self._loop is not None + and self._loop_thread is not None + and self._loop_thread.is_alive() ): - return cls._loop + return self._loop # Clean up old loop (if thread is dead) - if cls._loop is not None: + if self._loop is not None: try: - cls._loop.call_soon_threadsafe(cls._loop.stop) + self._loop.call_soon_threadsafe(self._loop.stop) except RuntimeError: pass # Loop already stopped - cls._loop = None - cls._loop_thread = None + self._loop = None + self._loop_thread = None # Create new event loop loop = asyncio.new_event_loop() @@ -205,12 +233,11 @@ def run_loop(): break threading.Event().wait(0.001) - cls._loop_thread = thread - cls._loop = loop - return cls._loop + self._loop_thread = thread + self._loop = loop + return self._loop - @classmethod - def shutdown(cls, timeout: float = 5.0) -> None: + def shutdown(self, timeout: float = 5.0) -> None: """ Gracefully shutdown event loop. @@ -219,64 +246,74 @@ def shutdown(cls, timeout: float = 5.0) -> None: 2. Wait for active tasks to complete first (wait for _active_tasks == 0) 3. Stop event loop and exit after timeout """ - if cls._shutdown_called: - return - cls._shutdown_called = True + with self._loop_lock: + if self._shutdown_called: + return + self._shutdown_called = True deadline = time.time() + timeout # Phase 1: Wait for active tasks to complete - with cls._active_cond: - while cls._active_tasks > 0: + with self._active_cond: + while self._active_tasks > 0: remaining = deadline - time.time() if remaining <= 0: _logger.warning( "MultimodalPreUploader shutdown timeout, %d tasks still active", - cls._active_tasks, + self._active_tasks, ) break - cls._active_cond.wait(timeout=remaining) + self._active_cond.wait(timeout=remaining) - with cls._loop_lock: - if cls._loop is None or cls._loop_thread is None: + with self._loop_lock: + if self._loop is None or self._loop_thread is None: return # Phase 2: Stop event loop try: - cls._loop.call_soon_threadsafe(cls._loop.stop) + self._loop.call_soon_threadsafe(self._loop.stop) except RuntimeError: pass # Loop already stopped # Phase 3: Wait for thread to exit remaining = max(0.0, deadline - time.time()) - cls._loop_thread.join(timeout=remaining) + self._loop_thread.join(timeout=remaining) # Phase 4: Clean up state - cls._loop = None - cls._loop_thread = None + self._loop = None + self._loop_thread = None - @classmethod - def _at_fork_reinit(cls) -> None: - """Reset class-level state in child process after fork""" + def _at_fork_reinit(self) -> None: + """Reset instance state in child process after fork""" _logger.debug( "[_at_fork_reinit] MultimodalPreUploader reinitializing after fork" ) - cls._loop_lock = threading.Lock() - cls._loop = None - cls._loop_thread = None - cls._shutdown_called = False - cls._active_tasks = 0 - cls._active_cond = threading.Condition() + self._loop_lock = threading.Lock() + self._loop = None + self._loop_thread = None + self._shutdown_called = False + self._active_tasks = 0 + self._active_cond = threading.Condition() def _run_async( self, coro: Any, timeout: float = 0.3 ) -> Dict[str, UriMetadata]: - """Execute coroutine in class-level event loop (thread-safe)""" - cls = self.__class__ + """Execute coroutine in instance event loop (thread-safe)""" + + if self._shutdown_called: + close = getattr(coro, "close", None) + if callable(close): + close() + return {} # Increase active task count - with cls._active_cond: - cls._active_tasks += 1 + with self._active_cond: + if self._shutdown_called: + close = getattr(coro, "close", None) + if callable(close): + close() + return {} + self._active_tasks += 1 try: loop = self._ensure_loop() @@ -287,11 +324,16 @@ def _run_async( except concurrent.futures.TimeoutError: future.cancel() return {} # Return empty result on timeout + except RuntimeError: + close = getattr(coro, "close", None) + if callable(close): + close() + return {} finally: # Decrease active task count and notify - with cls._active_cond: - cls._active_tasks -= 1 - cls._active_cond.notify_all() + with self._active_cond: + self._active_tasks -= 1 + self._active_cond.notify_all() @staticmethod def _strip_query_params(uri: str) -> str: @@ -575,11 +617,50 @@ def _convert_pcm16_to_wav( _logger.error("Failed to convert PCM16 to WAV: %s", exc) return None + def _normalize_audio_data( + self, data: bytes, mime_type: str, source_desc: str = "" + ) -> Tuple[bytes, str]: + """Detect and convert audio format if needed""" + if mime_type in ("audio/unknown", "audio/*", "audio"): + detected_mime = MultimodalPreUploader._detect_audio_format(data) + if detected_mime: + _logger.debug( + "Auto-detected audio format%s: %s -> %s", + source_desc, + mime_type, + detected_mime, + ) + mime_type = detected_mime + + if self._audio_conversion_enabled and mime_type in ( + "audio/pcm16", + "audio/l16", + "audio/pcm", + ): + wav_data = MultimodalPreUploader._convert_pcm16_to_wav(data) + if wav_data: + _logger.debug( + "Converted%s PCM16 to WAV, original: %d, new: %d", + source_desc, + len(data), + len(wav_data), + ) + mime_type = "audio/wav" + data = wav_data + else: + _logger.warning( + "Failed to convert%s PCM16 to WAV, using original format", + source_desc, + ) + + return data, mime_type + def _create_upload_item( self, data: bytes, mime_type: str, modality: Union[Modality, str], + *, timestamp: int, trace_id: Optional[str], span_id: Optional[str], @@ -629,6 +710,7 @@ def _create_download_upload_item( source_uri: str, metadata: UriMetadata, modality: Union[Modality, str], + *, timestamp: int, trace_id: Optional[str], span_id: Optional[str], @@ -673,12 +755,199 @@ def _is_http_uri(uri: str) -> bool: """Check if URI starts with http:// or https://""" return uri.startswith("http://") or uri.startswith("https://") + @staticmethod + def _infer_mime_type_from_uri(uri: str) -> Optional[str]: + """Infer MIME type from URI path suffix without downloading data.""" + parsed = urlparse(uri) + # For plain paths without scheme, fall back to the raw uri. + path = parsed.path or uri + mime_type, _ = mimetypes.guess_type(path) + return mime_type + + @staticmethod + def _ensure_uri_mime_type(part: Uri) -> None: + """Best-effort MIME fill for Uri parts when mime_type is missing.""" + if part.mime_type: + return + inferred = MultimodalPreUploader._infer_mime_type_from_uri(part.uri) + if inferred: + part.mime_type = inferred + + def _is_local_file_uri(self, uri: str) -> bool: + """Check if URI is a local file path or file:// URI""" + if uri.startswith("file://"): + return True + + # If local file processing is enabled, treat paths starting with /, ./, ../ or no scheme as local files + if self._local_file_enabled: + # Check if it has a scheme (like http://, data:, etc.) + # If no scheme, or starts with common path prefixes, treat as file + if "://" in uri: + return False # Has scheme other than file:// (handled above) + if uri.startswith("data:"): + return False + + # Assume anything else without a scheme is a potential local file path + return True + + return False + + def _read_local_file(self, uri: str) -> Optional[bytes]: + """Read content from local file with size limit and security checks. + + Args: + uri: Local file URI (file://...) or path + + Returns: + File content as bytes, or None if read fails, security check fails, or file too large + """ + try: + # Normalize path + if uri.startswith("file://"): + file_path = uri[7:] + else: + file_path = uri + + # Security check: must be absolute and within allowed root paths + abs_path = Path(file_path).resolve(strict=False) + + allowed = False + for root_str in self._allowed_root_paths: + root_path = Path(root_str).resolve() + try: + abs_path.relative_to(root_path) + allowed = True + break + except ValueError: + continue + + if not allowed: + _logger.warning( + "Local file access blocked: %s is not in allowed root paths %s", + abs_path, + self._allowed_root_paths, + ) + return None + + if not os.path.exists(abs_path): + _logger.debug("Local file not found: %s", abs_path) + return None + + file_size = os.path.getsize(abs_path) + if not MultimodalPreUploader._check_size( + file_size, f" local file {abs_path}" + ): + return None + + with open(abs_path, "rb") as file_obj: + return file_obj.read() + except (OSError, IOError) as exc: + _logger.debug("Failed to read local file %s: %s", uri, exc) + return None + + @staticmethod + def _resolve_mime_type( + source_mime: Optional[str] = None, object_mime: Optional[str] = None + ) -> str: + """Resolve MIME type from source (detected) and object (provided).""" + generic_types = ("text/plain", "application/octet-stream", None, "") + + # If source is specific, prefer it + if source_mime and source_mime not in generic_types: + return source_mime + + # If source is generic/missing, use object mime if specific + if object_mime and object_mime not in generic_types: + return object_mime + + # Fallback to source if it was generic (but present), or object (if generic), or default + return source_mime or object_mime or "application/octet-stream" + + @staticmethod + def _estimate_base64_size(b64_data: str) -> int: + """Estimate decoded size of base64 string""" + return len(b64_data) * 3 // 4 - b64_data.count("=", -2) + + @staticmethod + def _check_size(size: int, description: str = "") -> bool: + """ + Check if size exceeds limit. + Returns True if size is within limit, False otherwise. + """ + if size > _MAX_MULTIMODAL_DATA_SIZE: + _logger.debug( + "Skip%s: size %d exceeds limit %d", + description, + size, + _MAX_MULTIMODAL_DATA_SIZE, + ) + return False + return True + + @staticmethod + def _is_data_uri(uri: str) -> bool: + """Check if URI is a data URI""" + return uri.startswith("data:") + + @staticmethod + def _parse_data_uri(uri: str) -> Tuple[Optional[str], Optional[bytes]]: + """Parse data URI to extract mime_type and decoded data + + Format: data:[][;base64], + """ + parsed_mime_type: Optional[str] = None + parsed_data: Optional[bytes] = None + + if not uri.startswith("data:"): + return parsed_mime_type, parsed_data + + try: + header, encoded_data = uri.split(",", 1) + except ValueError: + return parsed_mime_type, parsed_data + + # Parse header + # parts[0] is "data:[]" + # subsequent parts are parameters, e.g. "base64" or "charset=..." + parts = header.split(";") + + mime_type = "text/plain" # RFC 2397 default + if len(parts) > 0 and len(parts[0]) > 5: + mime_type = parts[0][5:] or "text/plain" + + is_base64 = "base64" in parts[1:] + + try: + if is_base64: + # Size check optimization + approx_size = MultimodalPreUploader._estimate_base64_size( + encoded_data + ) + if MultimodalPreUploader._check_size( + approx_size, " data URI (approx)" + ): + decoded_data = base64.b64decode(encoded_data) + + # Precise check after decode + if MultimodalPreUploader._check_size( + len(decoded_data), " data URI" + ): + parsed_mime_type = mime_type + parsed_data = decoded_data + # Only support base64 data URIs for now. + # Non-base64 branch intentionally keeps defaults. + except Exception as exc: # pylint: disable=broad-except + _logger.debug("Failed to parse data URI: %s", exc) + + return parsed_mime_type, parsed_data + def _process_message_parts( # pylint: disable=too-many-locals,too-many-branches,too-many-statements self, parts: List[Any], trace_id: Optional[str], span_id: Optional[str], timestamp: int, + *, uri_to_metadata: Dict[str, UriMetadata], uploads: List[PreUploadItem], ) -> None: @@ -686,10 +955,18 @@ def _process_message_parts( # pylint: disable=too-many-locals,too-many-branches # Step 1: Traverse and extract potential multimodal parts (max 10) blob_parts: List[Tuple[int, Union[Base64Blob, Blob]]] = [] - uri_parts: List[Tuple[int, Uri]] = [] + local_file_parts: List[Tuple[int, Uri]] = [] # file:// URIs + http_uri_parts: List[Tuple[int, Uri]] = [] # http/https URIs + data_uri_parts: List[Tuple[int, Uri]] = [] # data: URIs for idx, part in enumerate(parts): - if len(blob_parts) + len(uri_parts) >= _MAX_MULTIMODAL_PARTS: + total_parts = ( + len(blob_parts) + + len(local_file_parts) + + len(http_uri_parts) + + len(data_uri_parts) + ) + if total_parts >= _MAX_MULTIMODAL_PARTS: _logger.debug( "Reached max multimodal parts limit (%d), skipping remaining", _MAX_MULTIMODAL_PARTS, @@ -698,72 +975,50 @@ def _process_message_parts( # pylint: disable=too-many-locals,too-many-branches if isinstance(part, (Base64Blob, Blob)): blob_parts.append((idx, part)) - elif isinstance(part, Uri) and self._download_enabled: - # Only process Uri when download feature is enabled + elif isinstance(part, Uri): + self._ensure_uri_mime_type(part) modality_str = part.modality if modality_str in _SUPPORTED_MODALITIES: - uri_parts.append((idx, part)) + # Categorize by URI type + if self._is_local_file_uri(part.uri): + # Local file:// URIs - always process (no download needed) + local_file_parts.append((idx, part)) + elif self._is_data_uri(part.uri): + # data: URIs - always process (decoding needed) + data_uri_parts.append((idx, part)) + elif self._is_http_uri(part.uri): + # Always keep HTTP/HTTPS URI parts visible to follow-up + # metadata extraction. Replacement is controlled later + # by _download_enabled in Step 3. + http_uri_parts.append((idx, part)) # Step 2: Process Blob (data already in memory) for idx, part in blob_parts: try: - mime_type = part.mime_type or "application/octet-stream" - # Size limit check + mime_type = self._resolve_mime_type(None, part.mime_type) + # Check size limit if isinstance(part, Base64Blob): b64data = part.content - datalen = len(b64data) * 3 // 4 - b64data.count("=", -2) - if datalen > _MAX_MULTIMODAL_DATA_SIZE: - _logger.debug( - "Skip Base64Blob: decoded size %d exceeds limit %d", - datalen, - _MAX_MULTIMODAL_DATA_SIZE, - ) + datalen = self._estimate_base64_size(b64data) + if not self._check_size(datalen, " Base64Blob"): continue data = base64.b64decode(b64data) else: data = part.content - if len(data) > _MAX_MULTIMODAL_DATA_SIZE: - _logger.debug( - "Skip Blob: size %d exceeds limit %d, mime_type: %s", - len(data), - _MAX_MULTIMODAL_DATA_SIZE, - mime_type, - ) + if not self._check_size( + len(data), f" Blob (mime_type: {mime_type})" + ): continue - # If audio/unknown or other unknown audio formats, try auto-detection - if mime_type in ("audio/unknown", "audio/*", "audio"): - detected_mime = self._detect_audio_format(data) - if detected_mime: - _logger.debug( - "Auto-detected audio format: %s -> %s", - mime_type, - detected_mime, - ) - mime_type = detected_mime - # If PCM16 audio format, convert to WAV - if mime_type in ("audio/pcm16", "audio/l16", "audio/pcm"): - wav_data = self._convert_pcm16_to_wav(data) - if wav_data: - _logger.debug( - "Converted PCM16 to WAV format, original size: %d, new size: %d", - len(data), - len(wav_data), - ) - mime_type = "audio/wav" - data = wav_data - else: - _logger.warning( - "Failed to convert PCM16 to WAV, using original format" - ) + data, mime_type = self._normalize_audio_data(data, mime_type) upload_item, uri_part = self._create_upload_item( data, mime_type, part.modality, - timestamp, - trace_id, - span_id, + timestamp=timestamp, + trace_id=trace_id, + span_id=span_id, ) uploads.append(upload_item) parts[idx] = uri_part @@ -778,14 +1033,88 @@ def _process_message_parts( # pylint: disable=too-many-locals,too-many-branches ) # Keep original, don't replace - # Step 3: Process Uri (create download task based on metadata) - for idx, part in uri_parts: - # Non-http/https URIs (like already processed file://, etc.) skip directly - if not self._is_http_uri(part.uri): + # Step 2.5: Process local file:// URIs (read file content, similar to Blob) + for idx, part in local_file_parts: + try: + # Pass full URI/path to _read_local_file, which handles security checks + data = self._read_local_file(part.uri) + if data is None: + # File not found, too large, or security check failed -> keep original URI + continue + + mime_type = self._resolve_mime_type(None, part.mime_type) + + data, mime_type = self._normalize_audio_data( + data, mime_type, " for local file" + ) + + upload_item, uri_part = self._create_upload_item( + data, + mime_type, + part.modality, + timestamp=timestamp, + trace_id=trace_id, + span_id=span_id, + ) + uploads.append(upload_item) + parts[idx] = uri_part _logger.debug( - "Skip non-http URI (already processed or local): %s", + "Local file processed: %s -> %s", + part.uri, + uri_part.uri, + ) + except (ValueError, TypeError, KeyError, OSError) as exc: + _logger.error( + "Failed to process local file URI, skip: %s, uri: %s", + exc, part.uri, ) + # Keep original, don't replace + + # Step 2.6: Process data: URIs (decode base64/url-encoded) + for idx, part in data_uri_parts: + try: + mime_type, data = self._parse_data_uri(part.uri) + if data is None: + _logger.debug( + "Failed to parse data URI, skip: %s", part.uri[:50] + ) + continue + + mime_type = self._resolve_mime_type(mime_type, part.mime_type) + + data, mime_type = self._normalize_audio_data( + data, mime_type, " for data URI" + ) + + upload_item, uri_part = self._create_upload_item( + data, + mime_type, + part.modality, + timestamp=timestamp, + trace_id=trace_id, + span_id=span_id, + ) + uploads.append(upload_item) + parts[idx] = uri_part + _logger.debug( + "Data URI processed: %s -> %s", + part.uri[:50], + uri_part.uri, + ) + except (ValueError, TypeError, KeyError) as exc: + _logger.error( + "Failed to process data URI, skip: %s, uri: %s", + exc, + part.uri[:50], + ) + # Keep original, don't replace + + # Step 3: Process HTTP/HTTPS URIs (create download task based on metadata) + for idx, part in http_uri_parts: + if not self._download_enabled: + # Download disabled: keep original URI in-place, so metadata + # extraction still observes this URI in final messages. continue metadata = uri_to_metadata.get(part.uri) @@ -795,26 +1124,29 @@ def _process_message_parts( # pylint: disable=too-many-locals,too-many-branches "No metadata for URI (timeout/error/missing), skip: %s", part.uri, ) + # Keep original URI in-place, so metadata extraction still + # observes this URI in final messages. continue # Size limit check - if metadata.content_length > _MAX_MULTIMODAL_DATA_SIZE: - _logger.debug( - "Skip Uri: size %d exceeds limit %d, uri: %s", - metadata.content_length, - _MAX_MULTIMODAL_DATA_SIZE, - part.uri, - ) + if not self._check_size( + metadata.content_length, f" Uri {part.uri}" + ): continue + # Resolve MIME type + metadata.content_type = self._resolve_mime_type( + metadata.content_type, part.mime_type + ) + try: upload_item, uri_part = self._create_download_upload_item( part.uri, metadata, part.modality, - timestamp, - trace_id, - span_id, + timestamp=timestamp, + trace_id=trace_id, + span_id=span_id, ) uploads.append(upload_item) parts[idx] = uri_part @@ -931,8 +1263,8 @@ def pre_upload( # pylint: disable=too-many-branches trace_id, span_id, timestamp, - uri_to_metadata, - uploads, + uri_to_metadata=uri_to_metadata, + uploads=uploads, ) if self._process_output and output_messages: @@ -943,13 +1275,20 @@ def pre_upload( # pylint: disable=too-many-branches trace_id, span_id, timestamp, - uri_to_metadata, - uploads, + uri_to_metadata=uri_to_metadata, + uploads=uploads, ) return uploads -# Module-level fork handler registration -if hasattr(os, "register_at_fork"): - os.register_at_fork(after_in_child=MultimodalPreUploader._at_fork_reinit) +def fs_pre_uploader_hook() -> Optional[PreUploader]: + """Create file-system pre-uploader from environment variables.""" + base_path = get_multimodal_storage_base_path() + if not base_path: + _logger.warning( + "%s is required but not set, multimodal pre-uploader disabled", + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_STORAGE_BASE_PATH, + ) + return None + return MultimodalPreUploader(base_path=base_path) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/extended_environment_variables.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/extended_environment_variables.py index 52596765e..d976eb317 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/extended_environment_variables.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/extended_environment_variables.py @@ -56,7 +56,7 @@ .. envvar:: OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE Upload mode for multimodal data. Must be one of ``none``, ``input``, ``output``, or ``both``. -Defaults to ``both``. +Defaults to ``none``. """ OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_ENABLED = ( @@ -65,8 +65,8 @@ """ .. envvar:: OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_ENABLED -Whether to download and re-upload external URI references. Set to ``true`` or ``false``. -Defaults to ``true``. +Whether to download from external URI and re-upload to storage. Set to ``true`` or ``false``. +Defaults to ``false``. """ OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_SSL_VERIFY = ( @@ -79,3 +79,58 @@ Set to ``true`` or ``false``. Defaults to ``true``. Disabling SSL verification may expose to man-in-the-middle attacks. """ + +OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_AUDIO_CONVERSION_ENABLED = ( + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_AUDIO_CONVERSION_ENABLED" +) +""" +.. envvar:: OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_AUDIO_CONVERSION_ENABLED + +Whether to enable audio transcoding in multimodal pre-processing +(currently PCM16/L16/PCM to WAV). +Set to ``true`` or ``false``. Defaults to ``false``. +""" + +OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_LOCAL_FILE_ENABLED = ( + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_LOCAL_FILE_ENABLED" +) +""" +.. envvar:: OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_LOCAL_FILE_ENABLED + +Whether to allow the multimodal pipeline to read and upload files directly +from the local file system (supports ``file://`` URIs, absolute paths, and +relative paths). +Set to ``true`` or ``false``. Defaults to ``false``. +""" + +OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_ALLOWED_ROOT_PATHS = ( + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_ALLOWED_ROOT_PATHS" +) +""" +.. envvar:: OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_ALLOWED_ROOT_PATHS + +List of allowed root paths for local file access (comma separated). +Only files within these paths will be allowed for upload. +""" + +OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOADER = ( + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOADER" +) +""" +.. envvar:: OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOADER + +Select multimodal uploader hook name from entry point group +``opentelemetry_genai_multimodal_uploader``. +Defaults to ``fs`` when unset. +""" + +OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_PRE_UPLOADER = ( + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_PRE_UPLOADER" +) +""" +.. envvar:: OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_PRE_UPLOADER + +Select multimodal pre-uploader hook name from entry point group +``opentelemetry_genai_multimodal_pre_uploader``. +Defaults to ``fs`` when unset. +""" diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/extended_handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/extended_handler.py index 2bddcc43f..7f5e040ef 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/extended_handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/extended_handler.py @@ -65,7 +65,7 @@ import timeit from contextlib import contextmanager -from typing import Iterator, Optional +from typing import Iterator, Optional, Union from opentelemetry import context as otel_context from opentelemetry._logs import LoggerProvider @@ -73,7 +73,12 @@ from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAI, ) -from opentelemetry.trace import SpanKind, TracerProvider, set_span_in_context +from opentelemetry.trace import ( + Span, + SpanKind, + TracerProvider, + set_span_in_context, +) from opentelemetry.util.genai._extended_memory import ( MemoryInvocation, _apply_memory_finish_attributes, @@ -138,6 +143,33 @@ def __init__( # Initialize multimodal processing (from Mixin) self._init_multimodal() + self.__class__._ensure_multimodal_shutdown_atexit_registered() + + # ==================== Metrics Helper ==================== + + def _record_extended_metrics( + self, + span: Span, + invocation: Union[ + LLMInvocation, + EmbeddingInvocation, + ExecuteToolInvocation, + InvokeAgentInvocation, + CreateAgentInvocation, + RetrieveInvocation, + RerankInvocation, + MemoryInvocation, + ], + *, + error_type: str | None = None, + ) -> None: + """Record extended metrics for any invocation type.""" + if self._metrics_recorder is not None and isinstance( + self._metrics_recorder, ExtendedInvocationMetricsRecorder + ): + self._metrics_recorder.record_extended( + span, invocation, error_type=error_type + ) # ==================== LLM Operations Override (Async Multimodal) ==================== # Note: start_llm is inherited from TelemetryHandler. @@ -160,7 +192,7 @@ def stop_llm(self, invocation: LLMInvocation) -> LLMInvocation: invocation.monotonic_end_s = timeit.default_timer() # Try async multimodal processing - if self.process_multimodal_stop(invocation): + if self.process_multimodal_stop(invocation, method="stop_llm"): return invocation # No multimodal: use parent's sync path @@ -180,19 +212,12 @@ def fail_llm( invocation.monotonic_end_s = timeit.default_timer() # Try async multimodal processing - if self.process_multimodal_fail(invocation, error): + if self.process_multimodal_fail(invocation, error, method="fail_llm"): return invocation # No multimodal: use parent's sync path return super().fail_llm(invocation, error) - # ==================== Shutdown ==================== - - @classmethod - def shutdown_async_worker(cls, timeout: float = 5.0) -> None: - """Gracefully shutdown async worker""" - cls.shutdown_multimodal_worker(timeout) - # ==================== Create Agent Operations ==================== def start_create_agent( @@ -226,12 +251,7 @@ def stop_create_agent( # pylint: disable=no-self-use return invocation _apply_create_agent_finish_attributes(invocation.span, invocation) - - # Record metrics - if self._metrics_recorder is not None and isinstance( - self._metrics_recorder, ExtendedInvocationMetricsRecorder - ): - self._metrics_recorder.record_extended(invocation.span, invocation) + self._record_extended_metrics(invocation.span, invocation) otel_context.detach(invocation.context_token) invocation.span.end() @@ -246,14 +266,9 @@ def fail_create_agent( # pylint: disable=no-self-use _apply_create_agent_finish_attributes(invocation.span, invocation) _apply_error_attributes(invocation.span, error) - - # Record metrics with error type - if self._metrics_recorder is not None and isinstance( - self._metrics_recorder, ExtendedInvocationMetricsRecorder - ): - self._metrics_recorder.record_extended( - invocation.span, invocation, error_type=error.type.__qualname__ - ) + self._record_extended_metrics( + invocation.span, invocation, error_type=error.type.__qualname__ + ) otel_context.detach(invocation.context_token) invocation.span.end() @@ -303,12 +318,7 @@ def stop_embedding( # pylint: disable=no-self-use return invocation _apply_embedding_finish_attributes(invocation.span, invocation) - - # Record metrics - if self._metrics_recorder is not None and isinstance( - self._metrics_recorder, ExtendedInvocationMetricsRecorder - ): - self._metrics_recorder.record_extended(invocation.span, invocation) + self._record_extended_metrics(invocation.span, invocation) otel_context.detach(invocation.context_token) invocation.span.end() @@ -323,14 +333,9 @@ def fail_embedding( # pylint: disable=no-self-use _apply_embedding_finish_attributes(invocation.span, invocation) _apply_error_attributes(invocation.span, error) - - # Record metrics with error type - if self._metrics_recorder is not None and isinstance( - self._metrics_recorder, ExtendedInvocationMetricsRecorder - ): - self._metrics_recorder.record_extended( - invocation.span, invocation, error_type=error.type.__qualname__ - ) + self._record_extended_metrics( + invocation.span, invocation, error_type=error.type.__qualname__ + ) otel_context.detach(invocation.context_token) invocation.span.end() @@ -380,12 +385,7 @@ def stop_execute_tool( # pylint: disable=no-self-use return invocation _apply_execute_tool_finish_attributes(invocation.span, invocation) - - # Record metrics - if self._metrics_recorder is not None and isinstance( - self._metrics_recorder, ExtendedInvocationMetricsRecorder - ): - self._metrics_recorder.record_extended(invocation.span, invocation) + self._record_extended_metrics(invocation.span, invocation) otel_context.detach(invocation.context_token) invocation.span.end() @@ -400,14 +400,9 @@ def fail_execute_tool( # pylint: disable=no-self-use _apply_execute_tool_finish_attributes(invocation.span, invocation) _apply_error_attributes(invocation.span, error) - - # Record metrics with error type - if self._metrics_recorder is not None and isinstance( - self._metrics_recorder, ExtendedInvocationMetricsRecorder - ): - self._metrics_recorder.record_extended( - invocation.span, invocation, error_type=error.type.__qualname__ - ) + self._record_extended_metrics( + invocation.span, invocation, error_type=error.type.__qualname__ + ) otel_context.detach(invocation.context_token) invocation.span.end() @@ -463,16 +458,19 @@ def stop_invoke_agent( if invocation.context_token is None or invocation.span is None: return invocation + # Record actual end time + invocation.monotonic_end_s = timeit.default_timer() + + # Try async multimodal processing + if self.process_multimodal_stop(invocation, method="stop_agent"): + return invocation + + # No multimodal: sync path _apply_invoke_agent_finish_attributes(invocation.span, invocation) _maybe_emit_invoke_agent_event( self._logger, invocation.span, invocation ) - - # Record metrics - if self._metrics_recorder is not None and isinstance( - self._metrics_recorder, ExtendedInvocationMetricsRecorder - ): - self._metrics_recorder.record_extended(invocation.span, invocation) + self._record_extended_metrics(invocation.span, invocation) otel_context.detach(invocation.context_token) invocation.span.end() @@ -485,18 +483,23 @@ def fail_invoke_agent( # pylint: disable=no-self-use if invocation.context_token is None or invocation.span is None: return invocation + # Record actual end time + invocation.monotonic_end_s = timeit.default_timer() + + # Try async multimodal processing + if self.process_multimodal_fail( + invocation, error, method="fail_agent" + ): + return invocation + + # No multimodal: sync path span = invocation.span _apply_invoke_agent_finish_attributes(span, invocation) _apply_error_attributes(span, error) _maybe_emit_invoke_agent_event(self._logger, span, invocation, error) # pylint: disable=too-many-function-args - - # Record metrics with error type - if self._metrics_recorder is not None and isinstance( - self._metrics_recorder, ExtendedInvocationMetricsRecorder - ): - self._metrics_recorder.record_extended( - span, invocation, error_type=error.type.__qualname__ - ) + self._record_extended_metrics( + span, invocation, error_type=error.type.__qualname__ + ) otel_context.detach(invocation.context_token) span.end() @@ -546,12 +549,7 @@ def stop_retrieve( # pylint: disable=no-self-use return invocation _apply_retrieve_finish_attributes(invocation.span, invocation) - - # Record metrics - if self._metrics_recorder is not None and isinstance( - self._metrics_recorder, ExtendedInvocationMetricsRecorder - ): - self._metrics_recorder.record_extended(invocation.span, invocation) + self._record_extended_metrics(invocation.span, invocation) otel_context.detach(invocation.context_token) invocation.span.end() @@ -566,14 +564,9 @@ def fail_retrieve( # pylint: disable=no-self-use _apply_retrieve_finish_attributes(invocation.span, invocation) _apply_error_attributes(invocation.span, error) - - # Record metrics with error type - if self._metrics_recorder is not None and isinstance( - self._metrics_recorder, ExtendedInvocationMetricsRecorder - ): - self._metrics_recorder.record_extended( - invocation.span, invocation, error_type=error.type.__qualname__ - ) + self._record_extended_metrics( + invocation.span, invocation, error_type=error.type.__qualname__ + ) otel_context.detach(invocation.context_token) invocation.span.end() @@ -619,12 +612,7 @@ def stop_rerank(self, invocation: RerankInvocation) -> RerankInvocation: # pyli return invocation _apply_rerank_finish_attributes(invocation.span, invocation) - - # Record metrics - if self._metrics_recorder is not None and isinstance( - self._metrics_recorder, ExtendedInvocationMetricsRecorder - ): - self._metrics_recorder.record_extended(invocation.span, invocation) + self._record_extended_metrics(invocation.span, invocation) otel_context.detach(invocation.context_token) invocation.span.end() @@ -639,14 +627,9 @@ def fail_rerank( # pylint: disable=no-self-use _apply_rerank_finish_attributes(invocation.span, invocation) _apply_error_attributes(invocation.span, error) - - # Record metrics with error type - if self._metrics_recorder is not None and isinstance( - self._metrics_recorder, ExtendedInvocationMetricsRecorder - ): - self._metrics_recorder.record_extended( - invocation.span, invocation, error_type=error.type.__qualname__ - ) + self._record_extended_metrics( + invocation.span, invocation, error_type=error.type.__qualname__ + ) otel_context.detach(invocation.context_token) invocation.span.end() @@ -696,12 +679,7 @@ def stop_memory(self, invocation: MemoryInvocation) -> MemoryInvocation: # pyli _apply_memory_finish_attributes(invocation.span, invocation) _maybe_emit_memory_event(self._logger, invocation.span, invocation) - - # Record metrics - if self._metrics_recorder is not None and isinstance( - self._metrics_recorder, ExtendedInvocationMetricsRecorder - ): - self._metrics_recorder.record_extended(invocation.span, invocation) + self._record_extended_metrics(invocation.span, invocation) otel_context.detach(invocation.context_token) invocation.span.end() diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/extended_types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/extended_types.py index f36a28b3e..0fd1ecbac 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/extended_types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/extended_types.py @@ -180,6 +180,7 @@ class InvokeAgentInvocation: server_address: str | None = None server_port: int | None = None monotonic_start_s: float | None = None + monotonic_end_s: float | None = None @dataclass diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/utils.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/utils.py index 77661704f..6aacae98f 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/utils.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/utils.py @@ -15,9 +15,10 @@ import json import logging import os +import re from base64 import b64encode from functools import partial -from typing import Any +from typing import Any, List, Optional from opentelemetry.instrumentation._semconv import ( _OpenTelemetrySemanticConventionStability, @@ -28,6 +29,17 @@ OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT, ) +from opentelemetry.util.genai.extended_environment_variables import ( + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_ALLOWED_ROOT_PATHS, + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_AUDIO_CONVERSION_ENABLED, + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_ENABLED, + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_SSL_VERIFY, + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_LOCAL_FILE_ENABLED, + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_PRE_UPLOADER, + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_STORAGE_BASE_PATH, + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE, + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOADER, +) from opentelemetry.util.genai.types import ContentCapturingMode logger = logging.getLogger(__name__) @@ -87,6 +99,88 @@ def should_emit_event() -> bool: return False +def _parse_env_bool(value: Optional[str], default: bool) -> bool: + if not value: + return default + return value.lower() in ("true", "1", "yes") + + +def get_multimodal_upload_mode() -> str: + return os.getenv( + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE, "none" + ).lower() + + +def should_process_multimodal_input() -> bool: + return get_multimodal_upload_mode() in ("input", "both") + + +def should_process_multimodal_output() -> bool: + return get_multimodal_upload_mode() in ("output", "both") + + +def is_multimodal_download_enabled() -> bool: + return _parse_env_bool( + os.getenv(OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_ENABLED), + default=False, + ) + + +def should_verify_multimodal_download_ssl() -> bool: + value = os.getenv( + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_SSL_VERIFY + ) + if not value: + return True + return value.lower() not in ("false", "0", "no") + + +def is_multimodal_local_file_enabled() -> bool: + return _parse_env_bool( + os.getenv(OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_LOCAL_FILE_ENABLED), + default=False, + ) + + +def is_multimodal_audio_conversion_enabled() -> bool: + return _parse_env_bool( + os.getenv( + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_AUDIO_CONVERSION_ENABLED + ), + default=False, + ) + + +def get_multimodal_allowed_root_paths() -> List[str]: + allowed_roots_str = os.getenv( + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_ALLOWED_ROOT_PATHS, + "", + ) + if not allowed_roots_str: + return [] + + paths = [ + p.strip() for p in re.split(r"[,]", allowed_roots_str) if p.strip() + ] + return [os.path.abspath(p) for p in paths] + + +def get_multimodal_uploader_hook_name() -> Optional[str]: + hook_name = os.getenv(OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOADER, "fs") + return hook_name or None + + +def get_multimodal_pre_uploader_hook_name() -> Optional[str]: + hook_name = os.getenv( + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_PRE_UPLOADER, "fs" + ) + return hook_name or None + + +def get_multimodal_storage_base_path() -> Optional[str]: + return os.getenv(OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_STORAGE_BASE_PATH) + + class _GenAiJsonEncoder(json.JSONEncoder): def default(self, o: Any) -> Any: if isinstance(o, bytes): diff --git a/util/opentelemetry-util-genai/tests/_multimodal_upload/test_default_hooks.py b/util/opentelemetry-util-genai/tests/_multimodal_upload/test_default_hooks.py new file mode 100644 index 000000000..b3242d09e --- /dev/null +++ b/util/opentelemetry-util-genai/tests/_multimodal_upload/test_default_hooks.py @@ -0,0 +1,79 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from unittest import TestCase +from unittest.mock import patch + +from opentelemetry.util.genai._multimodal_upload.fs_uploader import ( + FsUploader, + fs_uploader_hook, +) +from opentelemetry.util.genai._multimodal_upload.pre_uploader import ( + MultimodalPreUploader, + fs_pre_uploader_hook, +) +from opentelemetry.util.genai.extended_environment_variables import ( + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_STORAGE_BASE_PATH, +) + + +class TestDefaultHooks(TestCase): + @patch.dict("os.environ", {}, clear=True) + def test_fs_uploader_hook_returns_none_without_base_path(self): + with self.assertLogs(level=logging.WARNING) as logs: + uploader = fs_uploader_hook() + self.assertIsNone(uploader) + self.assertTrue( + any( + "multimodal uploader disabled" in message + for message in logs.output + ) + ) + + @patch.dict( + "os.environ", + { + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_STORAGE_BASE_PATH: "file:///tmp" + }, + clear=True, + ) + def test_fs_uploader_hook_returns_uploader(self): + uploader = fs_uploader_hook() + self.assertIsInstance(uploader, FsUploader) + uploader.shutdown(timeout=0.1) + + @patch.dict("os.environ", {}, clear=True) + def test_fs_pre_uploader_hook_returns_none_without_base_path(self): + with self.assertLogs(level=logging.WARNING) as logs: + pre_uploader = fs_pre_uploader_hook() + self.assertIsNone(pre_uploader) + self.assertTrue( + any( + "multimodal pre-uploader disabled" in message + for message in logs.output + ) + ) + + @patch.dict( + "os.environ", + { + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_STORAGE_BASE_PATH: "file:///tmp" + }, + clear=True, + ) + def test_fs_pre_uploader_hook_returns_pre_uploader(self): + pre_uploader = fs_pre_uploader_hook() + self.assertIsInstance(pre_uploader, MultimodalPreUploader) + pre_uploader.shutdown(timeout=0.1) diff --git a/util/opentelemetry-util-genai/tests/_multimodal_upload/test_fs_uploader.py b/util/opentelemetry-util-genai/tests/_multimodal_upload/test_fs_uploader.py index 3d96a2c6b..9a0ab7a80 100644 --- a/util/opentelemetry-util-genai/tests/_multimodal_upload/test_fs_uploader.py +++ b/util/opentelemetry-util-genai/tests/_multimodal_upload/test_fs_uploader.py @@ -71,8 +71,8 @@ def test_upload_local_binary(): def test_upload_oss_binary_env(): region_id = os.getenv("ARMS_REGION_ID", "") endpoint = "https://oss-" + region_id + ".aliyuncs.com" - key = os.getenv("APSARA_APM_COLLECTOR_MULTIMODAL_OSS_ACCESS_KEY") - secret = os.getenv("APSARA_APM_COLLECTOR_MULTIMODAL_OSS_ACCESS_SECRET_KEY") + key = os.getenv("APSARA_APM_COLLECTOR_MULTIMODAL_OSS_ACCESS_KEY_ID") + secret = os.getenv("APSARA_APM_COLLECTOR_MULTIMODAL_OSS_ACCESS_KEY_SECRET") storage_base_path = os.getenv( "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_STORAGE_BASE_PATH", "" ) diff --git a/util/opentelemetry-util-genai/tests/_multimodal_upload/test_multimodal_upload_hook.py b/util/opentelemetry-util-genai/tests/_multimodal_upload/test_multimodal_upload_hook.py new file mode 100644 index 000000000..6798b7dac --- /dev/null +++ b/util/opentelemetry-util-genai/tests/_multimodal_upload/test_multimodal_upload_hook.py @@ -0,0 +1,231 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import importlib +from dataclasses import dataclass +from typing import Any, Callable, Optional +from unittest import TestCase +from unittest.mock import patch + +from opentelemetry.util.genai._multimodal_upload._base import ( + PreUploader, + PreUploadItem, + Uploader, + UploadItem, +) +from opentelemetry.util.genai.extended_environment_variables import ( + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_PRE_UPLOADER, + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE, + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOADER, +) + +HOOK_MODULE = ( + "opentelemetry.util.genai._multimodal_upload.multimodal_upload_hook" +) + + +class FakeUploader(Uploader): + def upload(self, item: UploadItem, *, skip_if_exists: bool = True) -> bool: + return True + + def shutdown(self, timeout: float = 10.0) -> None: + return None + + +class FakePreUploader(PreUploader): + def pre_upload( + self, + span_context: Optional[Any], + start_time_utc_nano: int, + input_messages: Optional[list[Any]], + output_messages: Optional[list[Any]], + ) -> list[PreUploadItem]: + return [] + + +class InvalidHookResult: + pass + + +@dataclass +class FakeEntryPoint: + name: str + load: Callable[[], Callable[[], Any]] + + +class TestMultimodalUploadHook(TestCase): + @staticmethod + def _reload_module(): + module = importlib.import_module(HOOK_MODULE) + return importlib.reload(module) + + @patch.dict("os.environ", {}, clear=True) + def test_get_or_load_without_uploader_env(self): + module = self._reload_module() + uploader, pre_uploader = module.get_or_load_uploader_pair() + self.assertIsNone(uploader) + self.assertIsNone(pre_uploader) + + @patch.dict( + "os.environ", + { + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE: "both", + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOADER: "fs", + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_PRE_UPLOADER: "fs", + }, + clear=True, + ) + def test_getters_do_not_trigger_loading(self): + module = self._reload_module() + with patch.object(module, "_iter_entry_points") as mock_iter: + self.assertIsNone(module.get_uploader()) + self.assertIsNone(module.get_pre_uploader()) + self.assertEqual(module.get_uploader_pair(), (None, None)) + mock_iter.assert_not_called() + + @patch.dict( + "os.environ", + { + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE: "both", + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOADER: "fs", + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_PRE_UPLOADER: "fs", + }, + clear=True, + ) + def test_load_hooks_success(self): + module = self._reload_module() + calls = {"uploader": 0, "pre": 0} + + def uploader_hook(): + calls["uploader"] += 1 + return FakeUploader() + + def pre_hook(): + calls["pre"] += 1 + return FakePreUploader() + + def fake_entry_points(group: str): + if group == "opentelemetry_genai_multimodal_uploader": + return [FakeEntryPoint("fs", lambda: uploader_hook)] + if group == "opentelemetry_genai_multimodal_pre_uploader": + return [FakeEntryPoint("fs", lambda: pre_hook)] + return [] + + with patch.object( + module, "_iter_entry_points", side_effect=fake_entry_points + ): + uploader, pre_uploader = module.get_or_load_uploader_pair() + self.assertIsInstance(uploader, FakeUploader) + self.assertIsInstance(pre_uploader, FakePreUploader) + + uploader2, pre_uploader2 = module.get_or_load_uploader_pair() + self.assertIs(uploader2, uploader) + self.assertIs(pre_uploader2, pre_uploader) + self.assertEqual(calls["uploader"], 1) + self.assertEqual(calls["pre"], 1) + self.assertIs(module.get_uploader(), uploader) + self.assertIs(module.get_pre_uploader(), pre_uploader) + self.assertEqual(module.get_uploader_pair(), (uploader, pre_uploader)) + + @patch.dict( + "os.environ", + { + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE: "both", + }, + clear=True, + ) + def test_load_uploader_and_pre_uploader_default_to_fs(self): + module = self._reload_module() + + def uploader_factory(): + return FakeUploader() + + def pre_uploader_factory(): + return FakePreUploader() + + def load_uploader_factory(): + return uploader_factory + + def load_pre_uploader_factory(): + return pre_uploader_factory + + def fake_entry_points(group: str): + if group == "opentelemetry_genai_multimodal_uploader": + return [FakeEntryPoint("fs", load_uploader_factory)] + if group == "opentelemetry_genai_multimodal_pre_uploader": + return [FakeEntryPoint("fs", load_pre_uploader_factory)] + return [] + + with patch.object( + module, "_iter_entry_points", side_effect=fake_entry_points + ): + uploader, pre_uploader = module.get_or_load_uploader_pair() + self.assertIsInstance(uploader, FakeUploader) + self.assertIsInstance(pre_uploader, FakePreUploader) + + @patch.dict( + "os.environ", + { + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE: "both", + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOADER: "fs", + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_PRE_UPLOADER: "fs", + }, + clear=True, + ) + def test_invalid_hook_result_fallback(self): + module = self._reload_module() + + def invalid_factory(): + return InvalidHookResult() + + def pre_uploader_factory(): + return FakePreUploader() + + def load_invalid_factory(): + return invalid_factory + + def load_pre_uploader_factory(): + return pre_uploader_factory + + def fake_entry_points(group: str): + if group == "opentelemetry_genai_multimodal_uploader": + return [FakeEntryPoint("fs", load_invalid_factory)] + if group == "opentelemetry_genai_multimodal_pre_uploader": + return [FakeEntryPoint("fs", load_pre_uploader_factory)] + return [] + + with patch.object( + module, "_iter_entry_points", side_effect=fake_entry_points + ): + uploader, pre_uploader = module.get_or_load_uploader_pair() + self.assertIsNone(uploader) + self.assertIsNone(pre_uploader) + + @patch.dict( + "os.environ", + { + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE: "none", + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOADER: "fs", + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_PRE_UPLOADER: "fs", + }, + clear=True, + ) + def test_upload_mode_none_disables_hooks(self): + module = self._reload_module() + + with patch.object(module, "_iter_entry_points") as mock_iter: + uploader, pre_uploader = module.get_or_load_uploader_pair() + self.assertIsNone(uploader) + self.assertIsNone(pre_uploader) + mock_iter.assert_not_called() diff --git a/util/opentelemetry-util-genai/tests/_multimodal_upload/test_pre_uploader.py b/util/opentelemetry-util-genai/tests/_multimodal_upload/test_pre_uploader.py index ee08ea762..99854dcce 100644 --- a/util/opentelemetry-util-genai/tests/_multimodal_upload/test_pre_uploader.py +++ b/util/opentelemetry-util-genai/tests/_multimodal_upload/test_pre_uploader.py @@ -2,7 +2,11 @@ Test general functionality of MultimodalPreUploader Includes extension mapping, URL generation, meta processing, message handling, async metadata fetching, etc. """ +# pylint: disable=too-many-lines +import asyncio +import base64 +import os from pathlib import Path from unittest.mock import AsyncMock, MagicMock, Mock, patch @@ -10,18 +14,33 @@ import pytest import respx +from opentelemetry.util.genai._multimodal_processing import ( + MultimodalProcessingMixin, +) from opentelemetry.util.genai._multimodal_upload.pre_uploader import ( _MAX_MULTIMODAL_DATA_SIZE, _MAX_MULTIMODAL_PARTS, MultimodalPreUploader, UriMetadata, ) -from opentelemetry.util.genai.types import Blob, InputMessage, Uri +from opentelemetry.util.genai.types import Base64Blob, Blob, InputMessage, Uri # Test audio file directory for integration tests TEST_AUDIO_DIR = Path(__file__).parent / "test_audio_samples" +@pytest.fixture(autouse=True) +def _default_upload_mode_enabled_for_tests(): + with patch.dict( + "os.environ", + { + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE": "both", + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_ENABLED": "true", + }, + ): + yield + + class TestPreUploadGeneral: """Test general functionality of MultimodalPreUploader""" @@ -588,6 +607,71 @@ def test_max_multimodal_parts_limit(pre_uploader): # Process at most _MAX_MULTIMODAL_PARTS parts assert len(uploads) == _MAX_MULTIMODAL_PARTS + @staticmethod + @patch( + "opentelemetry.util.genai._multimodal_upload.pre_uploader._MAX_MULTIMODAL_DATA_SIZE", + 100, + ) + def test_blob_size_limit_exceeded(pre_uploader): + """Test Blob larger than limit is skipped""" + large_data = b"x" * 101 + part = Blob( + content=large_data, mime_type="image/png", modality="image" + ) + message = InputMessage(role="user", parts=[part]) + + uploads = pre_uploader.pre_upload( + span_context=None, + start_time_utc_nano=1000, + input_messages=[message], + output_messages=[], + ) + assert len(uploads) == 0 + + @staticmethod + @patch( + "opentelemetry.util.genai._multimodal_upload.pre_uploader._MAX_MULTIMODAL_DATA_SIZE", + 100, + ) + def test_base64_blob_size_limit_exceeded(pre_uploader): + """Test Base64Blob larger than limit is skipped""" + data = b"x" * 101 + b64_data = base64.b64encode(data).decode("ascii") + part = Base64Blob( + content=b64_data, mime_type="image/png", modality="image" + ) + message = InputMessage(role="user", parts=[part]) + + uploads = pre_uploader.pre_upload( + span_context=None, + start_time_utc_nano=1000, + input_messages=[message], + output_messages=[], + ) + assert len(uploads) == 0 + + @staticmethod + @patch( + "opentelemetry.util.genai._multimodal_upload.pre_uploader._MAX_MULTIMODAL_DATA_SIZE", + 100, + ) + def test_data_uri_size_limit_exceeded(pre_uploader): + """Test Data URI larger than limit is skipped""" + data = b"x" * 101 + b64_data = base64.b64encode(data).decode("ascii") + data_uri = f"data:image/png;base64,{b64_data}" + + part = Uri(modality="image", mime_type=None, uri=data_uri) + message = InputMessage(role="user", parts=[part]) + + uploads = pre_uploader.pre_upload( + span_context=None, + start_time_utc_nano=1000, + input_messages=[message], + output_messages=[], + ) + assert len(uploads) == 0 + class TestPreUploadEventLoop: """Test behavior in existing event loop scenarios""" @@ -643,6 +727,38 @@ async def test_pre_upload_in_existing_event_loop(pre_uploader): assert uploads[0].data == test_data assert uploads[1].source_uri == "https://example.com/test.png" + @staticmethod + def test_event_loop_is_instance_scoped(): + """Different pre-uploader instances should own independent loops.""" + pre_uploader_1 = MultimodalPreUploader(base_path="/tmp/test_upload_1") + pre_uploader_2 = MultimodalPreUploader(base_path="/tmp/test_upload_2") + + loop_1 = pre_uploader_1._ensure_loop() + loop_2 = pre_uploader_2._ensure_loop() + + assert loop_1 is not loop_2 + assert pre_uploader_1._loop_thread is not pre_uploader_2._loop_thread + + pre_uploader_1.shutdown(timeout=0.5) + assert pre_uploader_1._loop is None + assert pre_uploader_2._loop is not None + + pre_uploader_2.shutdown(timeout=0.5) + + @staticmethod + def test_run_async_after_shutdown_returns_empty(): + """No new async task should be accepted after shutdown.""" + pre_uploader = MultimodalPreUploader(base_path="/tmp/test_upload") + pre_uploader.shutdown(timeout=0.1) + + async def _dummy(): + await asyncio.sleep(0) + return { + "x": UriMetadata(content_type="image/png", content_length=1) + } + + assert pre_uploader._run_async(_dummy(), timeout=0.1) == {} + class TestPreUploadNonHttpUri: """Test non-HTTP URI handling""" @@ -789,7 +905,7 @@ def test_download_disabled_skips_uri(): ), Uri( modality="image", - mime_type="image/jpeg", + mime_type=None, uri="https://example.com/img.jpg", ), ], @@ -800,3 +916,366 @@ def test_download_disabled_skips_uri(): # Only processed Blob assert len(uploads) == 1 assert uploads[0].data is not None # Blob has data + + input_meta, output_meta = ( + MultimodalProcessingMixin._extract_multimodal_metadata( + input_messages, None + ) + ) + assert not output_meta + assert len(input_meta) == 2 + assert any( + item.get("uri") == "https://example.com/img.jpg" + for item in input_meta + ) + uri_meta = next( + item + for item in input_meta + if item.get("uri") == "https://example.com/img.jpg" + ) + assert uri_meta.get("mime_type") == "image/jpeg" + + @staticmethod + @patch.object(MultimodalPreUploader, "_fetch_metadata_batch") + def test_download_enabled_fetch_failed_uri_kept_in_metadata(mock_fetch): + """When metadata fetch fails, original URI should still appear in metadata.""" + with patch.dict( + "os.environ", + { + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE": "both", + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_ENABLED": "true", + }, + ): + mock_fetch.return_value = {} + pre_uploader = MultimodalPreUploader("/tmp/test") + + input_messages = [ + InputMessage( + role="user", + parts=[ + Uri( + modality="image", + mime_type=None, + uri="https://example.com/fail.png", + ) + ], + ) + ] + + uploads = pre_uploader.pre_upload(None, 0, input_messages, None) + assert not uploads + + input_meta, output_meta = ( + MultimodalProcessingMixin._extract_multimodal_metadata( + input_messages, None + ) + ) + assert not output_meta + assert len(input_meta) == 1 + assert input_meta[0]["uri"] == "https://example.com/fail.png" + assert input_meta[0]["mime_type"] == "image/png" + + +class TestPreUploadDataUri: + """Test data URI handling""" + + @pytest.fixture + def pre_uploader(self): # pylint: disable=R6301 + """Create PreUploader instance""" + return MultimodalPreUploader( + base_path="/tmp/test_upload", + extra_meta={"workspaceId": "test_workspace"}, + ) + + @staticmethod + def test_data_uri_processing(pre_uploader): + """Test processing of base64 data URI""" + # A small base64 image + base64_data = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAAAAAA6fptVAAAACklEQVR4nGNiAAAABgDNjd8qAAAAAElFTkSuQmCC" + data_uri = f"data:image/png;base64,{base64_data}" + + part = Uri(modality="image", mime_type=None, uri=data_uri) + message = InputMessage(role="user", parts=[part]) + input_messages = [message] + + uploads = pre_uploader.pre_upload( + span_context=None, + start_time_utc_nano=1000000000000, + input_messages=input_messages, + output_messages=[], + ) + + assert len(uploads) == 1 + assert uploads[0].content_type == "image/png" + assert uploads[0].url.startswith("/tmp/test_upload") + # Verify data is decoded correctly + assert uploads[0].data == base64.b64decode(base64_data) + # Verify original part is replaced with uploaded URL + assert message.parts[0].uri != data_uri + assert message.parts[0].uri == uploads[0].url + + @staticmethod + def test_data_uri_processing_explicit_mime(pre_uploader): + """Test processing of data URI with explicit mime type in Uri object""" + base64_data = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAAAAAA6fptVAAAACklEQVR4nGNiAAAABgDNjd8qAAAAAElFTkSuQmCC" + data_uri = f"data:image/png;base64,{base64_data}" + + part = Uri(modality="image", mime_type="image/custom", uri=data_uri) + message = InputMessage(role="user", parts=[part]) + + uploads = pre_uploader.pre_upload( + span_context=None, + start_time_utc_nano=1000000000000, + input_messages=[message], + output_messages=[], + ) + + assert len(uploads) == 1 + assert uploads[0].content_type == "image/png" + + @staticmethod + def test_invalid_data_uri(pre_uploader): + """Test invalid data URI handling""" + part = Uri(modality="image", mime_type=None, uri="data:invalid") + message = InputMessage(role="user", parts=[part]) + uploads = pre_uploader.pre_upload( + span_context=None, + start_time_utc_nano=1000, + input_messages=[message], + output_messages=[], + ) + assert len(uploads) == 0 + + @staticmethod + def test_non_base64_data_uri_skipped(pre_uploader): + """Test non-base64 data URI is skipped""" + data_uri = "data:text/plain,hello%20world" + part = Uri(modality="text", mime_type="text/plain", uri=data_uri) + message = InputMessage(role="user", parts=[part]) + + uploads = pre_uploader.pre_upload( + span_context=None, + start_time_utc_nano=1000, + input_messages=[message], + output_messages=[], + ) + assert len(uploads) == 0 + assert message.parts[0].uri == data_uri + + +class TestPreUploadLocalFile: + """Test local file handling with security checks""" + + @pytest.fixture + def pre_uploader_factory(self): # pylint: disable=R6301 + """Create PreUploader instance factory""" + + def _create(): + return MultimodalPreUploader( + base_path="/tmp/test_upload", + extra_meta={"workspaceId": "test_workspace"}, + ) + + return _create + + @staticmethod + def test_local_file_processing_allowed(pre_uploader_factory): + """Test processing of local file URI when allowed""" + # Use this test file itself as the source file + test_file = Path(__file__).resolve() + test_dir = test_file.parent + file_uri = f"file://{test_file}" + + # Enable local file and set allowed root + with patch.dict( + os.environ, + { + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_LOCAL_FILE_ENABLED": "true", + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_ALLOWED_ROOT_PATHS": str( + test_dir + ), + }, + ): + pre_uploader = pre_uploader_factory() + part = Uri(modality="image", mime_type="image/png", uri=file_uri) + message = InputMessage(role="user", parts=[part]) + + uploads = pre_uploader.pre_upload( + span_context=None, + start_time_utc_nano=1000000000000, + input_messages=[message], + output_messages=[], + ) + + assert len(uploads) == 1 + assert uploads[0].content_type == "image/png" + assert uploads[0].url.startswith("/tmp/test_upload") + # Verify data content matches file content + assert uploads[0].data == test_file.read_bytes() + # Verify original part is replaced + assert message.parts[0].uri != file_uri + assert message.parts[0].uri == uploads[0].url + + @staticmethod + def test_local_file_processing_relative_path(pre_uploader_factory): + """Test processing of relative path when allowed""" + test_file = Path(__file__).resolve() + test_dir = test_file.parent + # Create a relative path: ./test_pre_uploader.py (assuming we run from same dir) + # However, CWD might vary. Safer to use filename and rely on pre_uploader using CWD + # Or construct a relative path if we know where we are. + # Let's assume we allow the directory where this file resides. + # And we pass the absolute path of the file but without scheme, which counts as local path. + # OR we try to pass a relative path if we can force os.getcwd() to match. + relative_path = test_file.name + + with ( + patch.dict( + os.environ, + { + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_LOCAL_FILE_ENABLED": "true", + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_ALLOWED_ROOT_PATHS": str( + test_dir + ), + }, + ), + patch("os.getcwd", return_value=str(test_dir)), + ): + pre_uploader = pre_uploader_factory() + # Test with simple filename (relative path) + part = Uri( + modality="image", mime_type="image/png", uri=relative_path + ) + message = InputMessage(role="user", parts=[part]) + + uploads = pre_uploader.pre_upload( + span_context=None, + start_time_utc_nano=1000000000000, + input_messages=[message], + output_messages=[], + ) + + assert len(uploads) == 1 + assert uploads[0].data == test_file.read_bytes() + + @staticmethod + def test_local_file_processing_disabled_by_default(pre_uploader_factory): + """Test local file ignored when disabled (default)""" + test_file = Path(__file__).resolve() + file_uri = f"file://{test_file}" + + # Default environment (feature disabled) + pre_uploader = pre_uploader_factory() + part = Uri(modality="image", mime_type="image/png", uri=file_uri) + message = InputMessage(role="user", parts=[part]) + + uploads = pre_uploader.pre_upload( + span_context=None, + start_time_utc_nano=1000, + input_messages=[message], + output_messages=[], + ) + + assert len(uploads) == 0 + assert message.parts[0].uri == file_uri + + @staticmethod + def test_local_file_processing_forbidden_path(pre_uploader_factory): + """Test blocked access when path is not in allowed roots""" + test_file = Path(__file__).resolve() + # Allowed root is /tmp, but file is in source dir + allowed_root = "/tmp/fake_allowed_root" + + with patch.dict( + os.environ, + { + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_LOCAL_FILE_ENABLED": "true", + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_ALLOWED_ROOT_PATHS": allowed_root, + }, + ): + pre_uploader = pre_uploader_factory() + file_uri = f"file://{test_file}" + part = Uri(modality="image", mime_type="image/png", uri=file_uri) + message = InputMessage(role="user", parts=[part]) + + uploads = pre_uploader.pre_upload( + span_context=None, + start_time_utc_nano=1000, + input_messages=[message], + output_messages=[], + ) + + assert len(uploads) == 0 + # URI should remain unchanged + assert message.parts[0].uri == file_uri + + @staticmethod + def test_local_file_processing_symlink_traversal_blocked( + pre_uploader_factory, tmp_path + ): + """Test that symlink traversal outside allowed root is blocked""" + allowed_root = tmp_path / "allowed_root" + external_dir = tmp_path / "outside_root" + allowed_root.mkdir() + external_dir.mkdir() + + secret_file = external_dir / "secret.txt" + secret_file.write_text("top secret", encoding="utf-8") + + symlink_dir = allowed_root / "symlink_to_outside" + try: + os.symlink(external_dir, symlink_dir) + except (OSError, NotImplementedError): + pytest.skip("Symlinks are not supported on this platform") + + target_path = symlink_dir / "secret.txt" + file_uri = f"file://{target_path}" + + with patch.dict( + os.environ, + { + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_LOCAL_FILE_ENABLED": "true", + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_ALLOWED_ROOT_PATHS": str( + allowed_root + ), + }, + ): + pre_uploader = pre_uploader_factory() + part = Uri(modality="image", mime_type="image/png", uri=file_uri) + message = InputMessage(role="user", parts=[part]) + + uploads = pre_uploader.pre_upload( + span_context=None, + start_time_utc_nano=1000, + input_messages=[message], + output_messages=[], + ) + + assert len(uploads) == 0 + assert message.parts[0].uri == file_uri + + @staticmethod + def test_local_file_processing_no_allowed_roots(pre_uploader_factory): + """Test blocked access when no allowed roots configured""" + test_file = Path(__file__).resolve() + + with patch.dict( + os.environ, + { + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_LOCAL_FILE_ENABLED": "true", + # No ALLOWED_ROOT_PATHS + }, + ): + pre_uploader = pre_uploader_factory() + file_uri = f"file://{test_file}" + part = Uri(modality="image", mime_type="image/png", uri=file_uri) + message = InputMessage(role="user", parts=[part]) + + uploads = pre_uploader.pre_upload( + span_context=None, + start_time_utc_nano=1000, + input_messages=[message], + output_messages=[], + ) + + assert len(uploads) == 0 diff --git a/util/opentelemetry-util-genai/tests/_multimodal_upload/test_pre_uploader_audio.py b/util/opentelemetry-util-genai/tests/_multimodal_upload/test_pre_uploader_audio.py index c5b506dca..193441fb4 100644 --- a/util/opentelemetry-util-genai/tests/_multimodal_upload/test_pre_uploader_audio.py +++ b/util/opentelemetry-util-genai/tests/_multimodal_upload/test_pre_uploader_audio.py @@ -5,6 +5,7 @@ """ from pathlib import Path +from unittest.mock import patch import pytest @@ -18,6 +19,19 @@ TEST_AUDIO_DIR = Path(__file__).parent / "test_audio_samples" +@pytest.fixture(autouse=True) +def _default_upload_mode_enabled_for_tests(): + with patch.dict( + "os.environ", + { + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE": "both", + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_ENABLED": "true", + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_AUDIO_CONVERSION_ENABLED": "true", + }, + ): + yield + + class TestAudioFormatDetection: """Test automatic audio format detection functionality""" @@ -145,3 +159,31 @@ def test_pcm16_to_wav_conversion(pre_uploader, pcm_mime_type): else: # If library unavailable, should keep original format assert uploads[0].content_type == pcm_mime_type + + @staticmethod + def test_pcm16_conversion_disabled_by_default(): + """Test PCM16 conversion stays disabled when env var is unset""" + with patch.dict( + "os.environ", + { + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE": "both", + "OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_DOWNLOAD_ENABLED": "true", + }, + clear=True, + ): + pre_uploader = MultimodalPreUploader(base_path="/tmp/test_upload") + pcm_data = b"\x00\x01" * 1000 + part = Blob( + content=pcm_data, mime_type="audio/pcm16", modality="audio" + ) + input_messages = [InputMessage(role="user", parts=[part])] + + uploads = pre_uploader.pre_upload( + span_context=None, + start_time_utc_nano=1000000000000000000, + input_messages=input_messages, + output_messages=None, + ) + + assert len(uploads) == 1 + assert uploads[0].content_type == "audio/pcm16" diff --git a/util/opentelemetry-util-genai/tests/test_extended_handler.py b/util/opentelemetry-util-genai/tests/test_extended_handler.py index 0a3429d4c..a7814bdc0 100644 --- a/util/opentelemetry-util-genai/tests/test_extended_handler.py +++ b/util/opentelemetry-util-genai/tests/test_extended_handler.py @@ -17,6 +17,7 @@ import os import queue import threading +import time import unittest from typing import Any, Mapping from unittest.mock import MagicMock, patch @@ -64,6 +65,7 @@ OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT, ) from opentelemetry.util.genai.extended_handler import ( + ExtendedTelemetryHandler, get_extended_telemetry_handler, ) from opentelemetry.util.genai.extended_types import ( @@ -87,7 +89,9 @@ ) -def patch_env_vars(stability_mode, content_capturing=None, emit_event=None): +def patch_env_vars( + stability_mode, content_capturing=None, emit_event=None, **extra_env_vars +): def decorator(test_case): env_vars = { OTEL_SEMCONV_STABILITY_OPT_IN: stability_mode, @@ -99,6 +103,8 @@ def decorator(test_case): if emit_event is not None: env_vars[OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT] = emit_event + env_vars.update(extra_env_vars) + @patch.dict(os.environ, env_vars) def wrapper(*args, **kwargs): # Reset state. @@ -153,7 +159,6 @@ def setUp(self): # Clear singleton if exists to avoid test interference if hasattr(get_extended_telemetry_handler, "_default_handler"): delattr(get_extended_telemetry_handler, "_default_handler") - self.telemetry_handler = get_extended_telemetry_handler( tracer_provider=tracer_provider, logger_provider=logger_provider, @@ -1031,7 +1036,9 @@ class RerankError(RuntimeError): ) -class TestMultimodalProcessingMixin(unittest.TestCase): +class TestMultimodalProcessingMixin( # pylint: disable=too-many-public-methods + unittest.TestCase +): """Tests for MultimodalProcessingMixin. Uses orthogonal test design to maximize coverage with minimal test cases. @@ -1250,9 +1257,10 @@ def _get_uploader_and_pre_uploader(self): handler._init_multimodal() self.assertFalse(handler._multimodal_enabled) - @patch.dict( - os.environ, - {"OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE": "both"}, + @patch_env_vars( + "gen_ai_latest_experimental", + content_capturing="SPAN_ONLY", + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE="both", ) def test_init_multimodal_enabled_or_disabled_by_uploader(self): """Test _init_multimodal enabled when uploader available, disabled when None.""" @@ -1284,14 +1292,20 @@ def test_process_multimodal_returns_false_on_precondition_failure(self): inv1 = self._create_invocation_with_multimodal() inv1.context_token = None inv1.span = MagicMock() - self.assertFalse(handler.process_multimodal_stop(inv1)) - self.assertFalse(handler.process_multimodal_fail(inv1, error)) + self.assertFalse( + handler.process_multimodal_stop(inv1, method="stop_llm") + ) + self.assertFalse( + handler.process_multimodal_fail(inv1, error, method="fail_llm") + ) # span is None inv2 = self._create_invocation_with_multimodal() inv2.context_token = MagicMock() inv2.span = None - self.assertFalse(handler.process_multimodal_stop(inv2)) + self.assertFalse( + handler.process_multimodal_stop(inv2, method="stop_llm") + ) # No multimodal data inv3 = LLMInvocation(request_model="gpt-4") @@ -1300,16 +1314,21 @@ def test_process_multimodal_returns_false_on_precondition_failure(self): inv3.input_messages = [ InputMessage(role="user", parts=[Text(content="Hi")]) ] - self.assertFalse(handler.process_multimodal_stop(inv3)) + self.assertFalse( + handler.process_multimodal_stop(inv3, method="stop_llm") + ) # multimodal_enabled=False handler_disabled = self._create_mock_handler(enabled=False) inv4 = self._create_invocation_with_multimodal(with_context=True) - self.assertFalse(handler_disabled.process_multimodal_stop(inv4)) + self.assertFalse( + handler_disabled.process_multimodal_stop(inv4, method="stop_llm") + ) - @patch.dict( - os.environ, - {"OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE": "both"}, + @patch_env_vars( + "gen_ai_latest_experimental", + content_capturing="SPAN_ONLY", + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE="both", ) def test_process_multimodal_fallback_on_queue_issues(self): """Test process_multimodal_stop/fail uses fallback when queue is None or full.""" @@ -1320,27 +1339,36 @@ def test_process_multimodal_fallback_on_queue_issues(self): with patch.object(MultimodalProcessingMixin, "_ensure_async_worker"): # Queue is None MultimodalProcessingMixin._async_queue = None - with patch.object(handler, "_fallback_end_span") as mock_end: - self.assertTrue(handler.process_multimodal_stop(inv)) + with patch.object(handler, "_fallback_stop") as mock_end: + self.assertTrue( + handler.process_multimodal_stop(inv, method="stop_llm") + ) mock_end.assert_called_once() # Reset invocation context token inv.context_token = MagicMock() - with patch.object(handler, "_fallback_fail_span") as mock_fail: - self.assertTrue(handler.process_multimodal_fail(inv, error)) + with patch.object(handler, "_fallback_fail") as mock_fail: + self.assertTrue( + handler.process_multimodal_fail( + inv, error, method="fail_llm" + ) + ) mock_fail.assert_called_once() # Queue is full MultimodalProcessingMixin._async_queue = queue.Queue(maxsize=1) MultimodalProcessingMixin._async_queue.put("dummy") inv.context_token = MagicMock() - with patch.object(handler, "_fallback_end_span") as mock_end2: - self.assertTrue(handler.process_multimodal_stop(inv)) + with patch.object(handler, "_fallback_stop") as mock_end2: + self.assertTrue( + handler.process_multimodal_stop(inv, method="stop_llm") + ) mock_end2.assert_called_once() - @patch.dict( - os.environ, - {"OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE": "both"}, + @patch_env_vars( + "gen_ai_latest_experimental", + content_capturing="SPAN_ONLY", + OTEL_INSTRUMENTATION_GENAI_MULTIMODAL_UPLOAD_MODE="both", ) def test_process_multimodal_enqueues_task(self): """Test process_multimodal_stop/fail enqueues tasks correctly.""" @@ -1352,15 +1380,19 @@ def test_process_multimodal_enqueues_task(self): # stop inv1 = self._create_invocation_with_multimodal(with_context=True) - self.assertTrue(handler.process_multimodal_stop(inv1)) + self.assertTrue( + handler.process_multimodal_stop(inv1, method="stop_llm") + ) task = MultimodalProcessingMixin._async_queue.get_nowait() - self.assertEqual(task.method, "stop") + self.assertEqual(task.method, "stop_llm") # fail inv2 = self._create_invocation_with_multimodal(with_context=True) - self.assertTrue(handler.process_multimodal_fail(inv2, error)) + self.assertTrue( + handler.process_multimodal_fail(inv2, error, method="fail_llm") + ) task = MultimodalProcessingMixin._async_queue.get_nowait() - self.assertEqual(task.method, "fail") + self.assertEqual(task.method, "fail_llm") self.assertEqual(task.error, error) # ==================== Fallback / Async Methods Tests ==================== @@ -1372,19 +1404,19 @@ def test_fallback_and_async_methods_handle_span_none(self): inv.span = None # Should not raise - handler._fallback_end_span(inv) - handler._fallback_fail_span( - inv, Error(message="err", type=RuntimeError) + handler._fallback_stop(inv, "stop_llm") + handler._fallback_fail( + inv, Error(message="err", type=RuntimeError), "fail_llm" ) handler._async_stop_llm( _MultimodalAsyncTask( - invocation=inv, method="stop", handler=handler + invocation=inv, method="stop_llm", handler=handler ) ) handler._async_fail_llm( _MultimodalAsyncTask( invocation=inv, - method="fail", + method="fail_llm", error=Error(message="err", type=RuntimeError), handler=handler, ) @@ -1395,7 +1427,7 @@ def test_fallback_and_async_methods_handle_span_none(self): inv2.span = MagicMock() handler._async_fail_llm( _MultimodalAsyncTask( - invocation=inv2, method="fail", error=None, handler=handler + invocation=inv2, method="fail_llm", error=None, handler=handler ) ) @@ -1416,12 +1448,12 @@ def test_fallback_methods_apply_attributes(self): ) as m2, patch( "opentelemetry.util.genai._multimodal_processing._maybe_emit_llm_event" ): # fmt: skip - handler._fallback_end_span(inv) + handler._fallback_stop(inv, "stop_llm") m1.assert_called_with(mock_span, inv) mock_span.end.assert_called_once() mock_span.reset_mock() - handler._fallback_fail_span(inv, error) + handler._fallback_fail(inv, error, "fail_llm") m2.assert_called_with(mock_span, error) mock_span.end.assert_called_once() @@ -1454,7 +1486,7 @@ def test_async_stop_and_fail_llm_process_correctly(self): ): # fmt: skip handler._async_stop_llm( _MultimodalAsyncTask( - invocation=inv, method="stop", handler=handler + invocation=inv, method="stop_llm", handler=handler ) ) m1.assert_called_once() @@ -1465,12 +1497,197 @@ def test_async_stop_and_fail_llm_process_correctly(self): error = Error(message="err", type=ValueError) handler._async_fail_llm( _MultimodalAsyncTask( - invocation=inv, method="fail", error=error, handler=handler + invocation=inv, + method="fail_llm", + error=error, + handler=handler, ) ) m2.assert_called_once() mock_span.end.assert_called_once() + # ==================== Agent Async / Fallback / Dispatch Tests ==================== + + @staticmethod + def _create_agent_invocation_with_multimodal(with_context=False): + """Helper to create InvokeAgentInvocation with multimodal data.""" + invocation = InvokeAgentInvocation(provider="test") + invocation.input_messages = [ + InputMessage( + role="user", + parts=[ + Uri( + mime_type="image/png", modality="image", uri="http://x" + ) + ], + ) + ] + if with_context: + invocation.context_token = MagicMock() + invocation.span = MagicMock() + return invocation + + @staticmethod + def _create_mock_handler_with_agent_metrics(enabled=True): + """MockHandler that also has _record_extended_metrics.""" + mixin = MultimodalProcessingMixin + + class MockHandler(mixin): + def __init__(self): + self._multimodal_enabled = enabled + self._logger = MagicMock() + + def _get_uploader_and_pre_uploader(self): + return MagicMock(), MagicMock() + + def _record_llm_metrics(self, *args, **kwargs): + pass + + def _record_extended_metrics(self, *args, **kwargs): + pass + + return MockHandler() + + def test_dispatch_task_routes_agent_methods(self): + """Test _dispatch_task dispatches stop_agent/fail_agent correctly.""" + handler = self._create_mock_handler_with_agent_metrics() + mock_span = MagicMock() + mock_span._start_time = 1000000000 + mock_span.get_span_context.return_value = MagicMock() + + inv = self._create_agent_invocation_with_multimodal() + inv.span = mock_span + error = Error(message="err", type=RuntimeError) + + with patch( + "opentelemetry.util.genai._multimodal_processing._apply_invoke_agent_finish_attributes" + ) as m_attr, patch( + "opentelemetry.util.genai._multimodal_processing._maybe_emit_invoke_agent_event" + ): # fmt: skip + # stop_agent + handler._dispatch_task( + _MultimodalAsyncTask( + invocation=inv, method="stop_agent", handler=handler + ) + ) + m_attr.assert_called_once() + mock_span.end.assert_called_once() + + mock_span.reset_mock() + m_attr.reset_mock() + + # fail_agent + handler._dispatch_task( + _MultimodalAsyncTask( + invocation=inv, + method="fail_agent", + error=error, + handler=handler, + ) + ) + m_attr.assert_called_once() + mock_span.end.assert_called_once() + + def test_async_stop_and_fail_agent_process_correctly(self): + """Test _async_stop/fail_invoke_agent processes multimodal and end span.""" + handler = self._create_mock_handler_with_agent_metrics() + mock_span = MagicMock() + mock_span._start_time = 1000000000 + mock_span.get_span_context.return_value = MagicMock() + + inv = self._create_agent_invocation_with_multimodal() + inv.span = mock_span + + with patch( + "opentelemetry.util.genai._multimodal_processing._apply_invoke_agent_finish_attributes" + ) as m1, patch( + "opentelemetry.util.genai._multimodal_processing._apply_error_attributes" + ) as m2, patch( + "opentelemetry.util.genai._multimodal_processing._maybe_emit_invoke_agent_event" + ): # fmt: skip + handler._async_stop_invoke_agent( + _MultimodalAsyncTask( + invocation=inv, method="stop_agent", handler=handler + ) + ) + m1.assert_called_once() + mock_span.end.assert_called_once() + mock_span.set_attribute.assert_called() # multimodal metadata + + mock_span.reset_mock() + error = Error(message="err", type=ValueError) + handler._async_fail_invoke_agent( + _MultimodalAsyncTask( + invocation=inv, + method="fail_agent", + error=error, + handler=handler, + ) + ) + m2.assert_called_with(mock_span, error) + mock_span.end.assert_called_once() + + def test_agent_async_methods_handle_span_none(self): + """Test agent async methods return early when span is None.""" + handler = self._create_mock_handler_with_agent_metrics() + inv = InvokeAgentInvocation(provider="test") + inv.span = None + + # Should not raise + handler._async_stop_invoke_agent( + _MultimodalAsyncTask( + invocation=inv, method="stop_agent", handler=handler + ) + ) + handler._async_fail_invoke_agent( + _MultimodalAsyncTask( + invocation=inv, + method="fail_agent", + error=Error(message="err", type=RuntimeError), + handler=handler, + ) + ) + + def test_fallback_stop_agent_applies_attributes(self): + """Test _fallback_stop with stop_agent method applies agent attributes.""" + handler = self._create_mock_handler_with_agent_metrics() + mock_span = MagicMock() + mock_span._start_time = 1000000000 + + inv = InvokeAgentInvocation(provider="test") + inv.span = mock_span + + with patch( + "opentelemetry.util.genai._multimodal_processing._apply_invoke_agent_finish_attributes" + ) as m1, patch( + "opentelemetry.util.genai._multimodal_processing._maybe_emit_invoke_agent_event" + ): # fmt: skip + handler._fallback_stop(inv, "stop_agent") + m1.assert_called_with(mock_span, inv) + mock_span.end.assert_called_once() + + def test_fallback_fail_agent_applies_attributes(self): + """Test _fallback_fail with fail_agent method applies agent attributes.""" + handler = self._create_mock_handler_with_agent_metrics() + mock_span = MagicMock() + mock_span._start_time = 1000000000 + + inv = InvokeAgentInvocation(provider="test") + inv.span = mock_span + error = Error(message="err", type=ValueError) + + with patch( + "opentelemetry.util.genai._multimodal_processing._apply_invoke_agent_finish_attributes" + ) as m1, patch( + "opentelemetry.util.genai._multimodal_processing._apply_error_attributes" + ) as m2, patch( + "opentelemetry.util.genai._multimodal_processing._maybe_emit_invoke_agent_event" + ): # fmt: skip + handler._fallback_fail(inv, error, "fail_agent") + m1.assert_called_with(mock_span, inv) + m2.assert_called_with(mock_span, error) + mock_span.end.assert_called_once() + # ==================== Worker & Lifecycle Tests ==================== def test_ensure_worker_and_shutdown(self): @@ -1526,7 +1743,7 @@ def _async_stop_llm(self, task): inv1.span = MagicMock() mixin._async_queue.put( _MultimodalAsyncTask( - invocation=inv1, method="stop", handler=handler1 + invocation=inv1, method="stop_llm", handler=handler1 ) ) mixin._async_queue.put(None) @@ -1539,7 +1756,9 @@ def _async_stop_llm(self, task): # Test 2: Skips task with None handler mixin._async_queue = queue.Queue() mixin._async_queue.put( - _MultimodalAsyncTask(invocation=inv1, method="stop", handler=None) + _MultimodalAsyncTask( + invocation=inv1, method="stop_llm", handler=None + ) ) mixin._async_queue.put(None) worker_thread = threading.Thread(target=mixin._async_worker_loop) @@ -1560,7 +1779,7 @@ def _async_stop_llm(self, task): mixin._async_queue = queue.Queue() mixin._async_queue.put( _MultimodalAsyncTask( - invocation=inv2, method="stop", handler=Handler2() + invocation=inv2, method="stop_llm", handler=Handler2() ) ) mixin._async_queue.put(None) @@ -1598,3 +1817,163 @@ class Handler(MultimodalProcessingMixin): handler._separate_and_upload( mock_span2, inv, mock_uploader, mock_pre_uploader ) # Should not raise + + +class TestExtendedTelemetryHandlerShutdown(unittest.TestCase): + """Tests for ExtendedTelemetryHandler shutdown behavior. + + Design: use the real worker loop and control task execution through + mock task.handler._async_stop_llm. + """ + + def test_shutdown_waits_for_slow_task(self): + """Test shutdown waits for slow task completion (poison-pill mode).""" + # Reset state + MultimodalProcessingMixin._async_queue = None + MultimodalProcessingMixin._async_worker = None + + # Track task processing + task_started = threading.Event() + task_completed = threading.Event() + + try: + # Ensure worker is started + MultimodalProcessingMixin._ensure_async_worker() + + # Create a mock handler with slow processing + mock_handler = MagicMock() + + def slow_stop(task): + task_started.set() + time.sleep(0.15) + task_completed.set() + + mock_handler._dispatch_task = slow_stop + + mock_task = _MultimodalAsyncTask( + invocation=MagicMock(), method="stop_llm", handler=mock_handler + ) + MultimodalProcessingMixin._async_queue.put(mock_task) + + # Wait for the task to start + self.assertTrue( + task_started.wait(timeout=1.0), "Task did not start" + ) + + # Shutdown should wait for task completion + # (the poison pill is queued after the task) + MultimodalProcessingMixin.shutdown_multimodal_worker(timeout=5.0) + + # Verify the task has completed + self.assertTrue( + task_completed.is_set(), "Task should have completed" + ) + # Idempotency: repeated shutdown should not fail + MultimodalProcessingMixin.shutdown_multimodal_worker(timeout=1.0) + finally: + MultimodalProcessingMixin._async_queue = None + MultimodalProcessingMixin._async_worker = None + + def test_shutdown_timeout_exits(self): + """Test shutdown exits when timeout is reached.""" + # Reset state + MultimodalProcessingMixin._async_queue = None + MultimodalProcessingMixin._async_worker = None + + block_event = threading.Event() + task_started = threading.Event() + + try: + MultimodalProcessingMixin._ensure_async_worker() + + mock_handler = MagicMock() + + def blocking_stop(task): + task_started.set() + block_event.wait(timeout=5.0) + + mock_handler._dispatch_task = blocking_stop + + mock_task = _MultimodalAsyncTask( + invocation=MagicMock(), method="stop_llm", handler=mock_handler + ) + MultimodalProcessingMixin._async_queue.put(mock_task) + + # Wait for the task to start + self.assertTrue( + task_started.wait(timeout=1.0), "Task did not start" + ) + + # Shutdown timeout=0.3s, task blocks for 5s + start = time.time() + timeout = 0.3 + MultimodalProcessingMixin.shutdown_multimodal_worker( + timeout=timeout + ) + elapsed = time.time() - start + + # Verify it returns after timeout (cannot be shorter than timeout) + self.assertLess( + elapsed, timeout + 0.2, f"shutdown took {elapsed:.2f}s" + ) + self.assertGreaterEqual( + elapsed, timeout, f"shutdown too fast: {elapsed:.2f}s" + ) + finally: + block_event.set() + time.sleep(0.1) + MultimodalProcessingMixin._async_queue = None + MultimodalProcessingMixin._async_worker = None + + +class TestExtendedHandlerAtexitShutdown(unittest.TestCase): + def setUp(self): + ExtendedTelemetryHandler._shutdown_called = False + + @patch.object(ExtendedTelemetryHandler, "_shutdown_uploader") + @patch.object(ExtendedTelemetryHandler, "_shutdown_pre_uploader") + @patch.object(ExtendedTelemetryHandler, "shutdown_multimodal_worker") + def test_shutdown_sequence( + self, + mock_shutdown_worker: MagicMock, + mock_shutdown_pre_uploader: MagicMock, + mock_shutdown_uploader: MagicMock, + ): + calls = [] + + mock_shutdown_worker.side_effect = lambda timeout: calls.append( + ("handler", timeout) + ) + mock_shutdown_pre_uploader.side_effect = lambda timeout: calls.append( + ("pre_uploader", timeout) + ) + mock_shutdown_uploader.side_effect = lambda timeout: calls.append( + ("uploader", timeout) + ) + + ExtendedTelemetryHandler.shutdown( + worker_timeout=1.0, + pre_uploader_timeout=2.0, + uploader_timeout=3.0, + ) + + self.assertEqual( + calls, + [("handler", 1.0), ("pre_uploader", 2.0), ("uploader", 3.0)], + ) + + @patch.object(ExtendedTelemetryHandler, "_shutdown_uploader") + @patch.object(ExtendedTelemetryHandler, "_shutdown_pre_uploader") + @patch.object(ExtendedTelemetryHandler, "shutdown_multimodal_worker") + def test_shutdown_idempotent( # pylint: disable=no-self-use + self, + mock_shutdown_worker: MagicMock, + mock_shutdown_pre_uploader: MagicMock, + mock_shutdown_uploader: MagicMock, + ): + ExtendedTelemetryHandler.shutdown() + ExtendedTelemetryHandler.shutdown() + + mock_shutdown_worker.assert_called_once() + mock_shutdown_pre_uploader.assert_called_once() + mock_shutdown_uploader.assert_called_once() diff --git a/uv.lock b/uv.lock index f39523f3e..4d7f98037 100644 --- a/uv.lock +++ b/uv.lock @@ -4223,13 +4223,15 @@ dependencies = [ ] [package.optional-dependencies] -multimodal-upload = [ - { name = "fsspec" }, - { name = "httpx" }, +audio-conversion = [ { name = "numpy", version = "2.0.2", source = { registry = "https://mirrors.aliyun.com/pypi/simple/" }, marker = "python_full_version < '3.10'" }, { name = "numpy", version = "2.2.6", source = { registry = "https://mirrors.aliyun.com/pypi/simple/" }, marker = "python_full_version >= '3.10'" }, { name = "soundfile" }, ] +multimodal-upload = [ + { name = "fsspec" }, + { name = "httpx" }, +] test = [ { name = "pytest" }, ] @@ -4242,14 +4244,14 @@ requires-dist = [ { name = "fsspec", marker = "extra == 'multimodal-upload'", specifier = ">=2025.9.0" }, { name = "fsspec", marker = "extra == 'upload'", specifier = ">=2025.9.0" }, { name = "httpx", marker = "extra == 'multimodal-upload'" }, - { name = "numpy", marker = "extra == 'multimodal-upload'" }, + { name = "numpy", marker = "extra == 'audio-conversion'" }, { name = "opentelemetry-api", git = "https://github.com/open-telemetry/opentelemetry-python?subdirectory=opentelemetry-api&branch=main" }, { name = "opentelemetry-instrumentation", editable = "opentelemetry-instrumentation" }, { name = "opentelemetry-semantic-conventions", git = "https://github.com/open-telemetry/opentelemetry-python?subdirectory=opentelemetry-semantic-conventions&branch=main" }, { name = "pytest", marker = "extra == 'test'", specifier = ">=7.0.0" }, - { name = "soundfile", marker = "extra == 'multimodal-upload'" }, + { name = "soundfile", marker = "extra == 'audio-conversion'" }, ] -provides-extras = ["multimodal-upload", "test", "upload"] +provides-extras = ["audio-conversion", "multimodal-upload", "test", "upload"] [[package]] name = "opentelemetry-util-http"