From 2f8df3d74a63629594d0be70e902789a6f81f1f5 Mon Sep 17 00:00:00 2001 From: Caleb Courier Date: Tue, 7 Oct 2025 15:37:17 -0500 Subject: [PATCH 1/2] package setup and initial implementation --- .github/workflows/pr_qc.yml | 27 ++ .github/workflows/release.yml | 120 ++++++ .gitignore | 4 + .pre-commit-config.yaml | 10 + LICENSE | 21 + Makefile | 34 ++ README.md | 160 +++++++- pyproject.toml | 137 +++++++ pyrightconfig.json | 19 + .../telemetry/openinference/__init__.py | 3 + .../openinference_instrumentor.py | 375 ++++++++++++++++++ .../openinference/openinference_utils.py | 29 ++ 12 files changed, 937 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/pr_qc.yml create mode 100644 .github/workflows/release.yml create mode 100644 .gitignore create mode 100644 .pre-commit-config.yaml create mode 100644 LICENSE create mode 100644 Makefile create mode 100644 pyproject.toml create mode 100644 pyrightconfig.json create mode 100644 src/snowglobe/telemetry/openinference/__init__.py create mode 100644 src/snowglobe/telemetry/openinference/openinference_instrumentor.py create mode 100644 src/snowglobe/telemetry/openinference/openinference_utils.py diff --git a/.github/workflows/pr_qc.yml b/.github/workflows/pr_qc.yml new file mode 100644 index 0000000..029aef0 --- /dev/null +++ b/.github/workflows/pr_qc.yml @@ -0,0 +1,27 @@ +name: Quality Control + +on: + workflow_dispatch: + pull_request: # Trigger the workflow on push events + +jobs: + quality_control: + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.11' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + make install-dev + - name: Run Type Checks + run: | + make type + - name: Run Linter + run: | + make lint \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..00ebe05 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,120 @@ +name: Publish to PyPI + +permissions: + id-token: write + contents: write + +on: + workflow_dispatch: + # push: + # branches: + # - main + +jobs: + setup: + runs-on: ubuntu-latest + env: + GUARDRAILS_TOKEN: ${{ secrets.PRIV_PYPI_PUBLISH_TOKEN }} + PYPI_REPOSITORY_URL: 'https://pypi.guardrailsai.com' + steps: + - name: Checkout Repository + uses: actions/checkout@v4 + with: + fetch-tags: true + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.11' + + - name: Check for version bump + id: version + continue-on-error: false + shell: bash {0} + run: | + PYPROJECT_TOML="pyproject.toml" + + # Extract the version using grep and sed + version=$(grep -m 1 "version" "$PYPROJECT_TOML" | sed -E 's/.*version[[:space:]]*=[[:space:]]*"([^"]*)".*/\1/') + + echo "Project version: $version" + + if [ -z "$version" ] + then + echo "Version is missing from pyproject.toml!" + exit 1 + fi + + echo "Checking if $version already exists..." + version_commit="$(git rev-parse "$version" 2>/dev/null)" + if [ ! -z "$version_commit" ] && [ "$version_commit" != "$version" ]; + then + echo "Version $version already exist on commit $version_commit!" + echo "Abandoning build..." + echo "To complete this release update the version field in the pyproject.toml with an appropriate semantic version." + exit 1 + else + echo "version=$version" >> "$GITHUB_OUTPUT" + exit 0 + fi + + - name: Install Twine & Build + shell: bash + run: | + python -m pip install --upgrade pip + pip install twine build toml + + - name: Create .pypirc + shell: bash + run: | + touch ~/.pypirc + echo "[distutils]" >> ~/.pypirc + echo "index-servers =" >> ~/.pypirc + echo " private-repository" >> ~/.pypirc + echo "" >> ~/.pypirc + echo "[private-repository]" >> ~/.pypirc + echo "repository = $PYPI_REPOSITORY_URL" >> ~/.pypirc + echo "username = __token__" >> ~/.pypirc + echo "password = $GUARDRAILS_TOKEN" >> ~/.pypirc + + - name: Build & Upload + shell: bash + run: | + python -m build + twine upload dist/* -u __token__ -p $GUARDRAILS_TOKEN -r private-repository + + - name: Create .pypirc for PyPI.org + shell: bash + env: + PYPI_PASSWORD: ${{ secrets.PYPI_PASSWORD }} + run: | + echo "[distutils]" > ~/.pypirc + echo "index-servers =" >> ~/.pypirc + echo " pypi" >> ~/.pypirc + echo "" >> ~/.pypirc + echo "[pypi]" >> ~/.pypirc + echo "repository = https://upload.pypi.org/legacy/" >> ~/.pypirc + echo "username = __token__" >> ~/.pypirc + echo "password = $PYPI_PASSWORD" >> ~/.pypirc + + - name: Upload to PyPI.org + shell: bash + env: + PYPI_PASSWORD: ${{ secrets.PYPI_PASSWORD }} + run: | + twine upload dist/* -u __token__ -p $PYPI_PASSWORD -r pypi + + - name: Tag + id: tag + continue-on-error: false + run: | + version="${{ steps.version.outputs.version }}" + echo "Configuring github bot" + git config user.name "github-actions[bot]" + # Comes from https://api.github.com/users/github-actions%5Bbot%5D + git config user.email "41898282+github-actions[bot]@users.noreply.github.com" + echo "Creating github tag: $version" + git tag "$version" + echo "Pushing tags" + git push --tags \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3765b20 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +build +*.egg-info +.venv +.ruff_cache \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..1e51c4e --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,10 @@ +repos: +- repo: https://github.com/astral-sh/ruff-pre-commit + # Ruff version. + rev: v0.9.4 + hooks: + # Run the linter. + - id: ruff + args: [ --fix ] + # Run the formatter. + - id: ruff-format \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..538ee01 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Guardrails AI + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..51b1578 --- /dev/null +++ b/Makefile @@ -0,0 +1,34 @@ +.PHONY: install install-dev lint lint-fix type qa test test-cov test-cov-ci +# Installs production dependencies +install: + pip install .; + +# Installs development dependencies +install-dev: + pip install ".[dev]"; + +lint: + ruff check . + ruff format . + +lint-fix: + ruff check . --fix + ruff format . + +type: + pyright + +qa: + make install-dev + make lint + make type + +test: + python -m unittest discover -s tests --buffer --failfast + +test-cov: + coverage run -m unittest discover --start-directory tests --buffer --failfast + coverage report -m + +test-cov-ci: + coverage run -m unittest discover --start-directory tests --buffer --failfast \ No newline at end of file diff --git a/README.md b/README.md index ec9d497..6994057 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,158 @@ -# snowglobe-telemetry-openinference -Telemetry instrumentation for the snowglobe client using OpenInference semantics. +# Snowlgobe Telemetry Instrumentation for OpenInference + +Instrument your Snowglobe connected app with OpenInference and start sending traces to popular OpenInference compatible sinks like Arize or Arize Phoenix. + +## Installation + +``` +pip install snowglobe-telemetry-openinference +``` + +If using uv, set the `--prerelease=allow` flag +``` +uv pip install --prerelease=allow snowglobe-telemetry-openinference +``` + + +## Add the OpenInferenceInstrumentor to your agent file + +Reminder: Each agent wrapper file resides in the root directory of your project, and is named after the agent (e.g. `My Agent Name` becomes `my_agent_name.py`). + +```python +from snowglobe.client import CompletionRequest, CompletionFunctionOutputs +from openai import OpenAI +import os + +### Add these two lines to your agent file and watch context rich traces come in! +from snowglobe.telemetry.openinference import OpenInferenceInstrumentor +OpenInferenceInstrumentor().instrument() + + +client = OpenAI(api_key=os.getenv("SNOWGLOBE_API_KEY")) + +def completion_fn(request: CompletionRequest) -> CompletionFunctionOutputs: + """ + Process a scenario request from Snowglobe. + + This function is called by the Snowglobe client to process requests. It should return a + CompletionFunctionOutputs object with the response content. + + Example CompletionRequest: + CompletionRequest( + messages=[ + SnowglobeMessage(role="user", content="Hello, how are you?", snowglobe_data=None), + ] + ) + + Example CompletionFunctionOutputs: + CompletionFunctionOutputs(response="This is a string response from your application") + + Args: + request (CompletionRequest): The request object containing the messages. + + Returns: + CompletionFunctionOutputs: The response object with the generated content. + """ + + # Process the request using the messages. Example: + messages = request.to_openai_messages() + response = client.chat.completions.create( + model="gpt-4o-mini", + messages=messages + ) + return CompletionFunctionOutputs(response=response.choices[0].message.content) +``` + + + +## Enhancing Snowglobe Connect SDK's Traces with OpenInference Integrations +You can add more rich context to the traces the Snowglobe Connect SDK captures by installing additional OpenInference instrumentors and registering the appropriate tracer provider in your agent wrapper file. + +The below examples shows how to add OpenAI instrumentation for either Arize or Arize Phoenix in addition to Snowglobe's OpenInference instrumentation: + +### Arize + +Install the Arize OpenTelemetry pacakge and the OpenAI specific instrumentor. +```sh +pip install openinference-instrumentation-openai arize-otel +``` + +Then register the tracer provider and use the OpenAI instrumentator in your agent file: +```py +import os +from openai import OpenAI +from snowglobe.client import CompletionRequest, CompletionFunctionOutputs +from arize.otel import register + +# Setup OTel via our convenience function +tracer_provider = register( + space_id = "your-space-id", # in app space settings page + api_key = "your-api-key", # in app space settings page + project_name = "your-project-name", # name this to whatever you would like +) + +# Import the OpenAI instrumentor from OpenInference +from openinference.instrumentation.openai import OpenAIInstrumentor + +# Instrument OpenAI +OpenAIInstrumentor().instrument(tracer_provider=tracer_provider) + +# Import the OpenInference instrumentor from Snowglobe +from snowglobe.telemetry.openinference import OpenInferenceInstrumentor + +# Instrument the Snowglobe client +OpenInferenceInstrumentor().instrument(tracer_provider=tracer_provider) + + +def completion_fn(request: CompletionRequest) -> CompletionFunctionOutputs: + messages = request.to_openai_messages() + response = client.chat.completions.create( + model="gpt-4o-mini", + messages=messages + ) + return CompletionFunctionOutputs(response=response.choices[0].message.content) +``` + + +### Arize Phoenix + +Install the Arize Phoenix OpenTelemetry pacakge and the OpenAI specific instrumentor. +```sh +pip install openinference-instrumentation-openai arize-phoenix-otel +``` + +Then register the tracer provider and use the OpenAI instrumentator in your agent file: +```py +import os +from openai import OpenAI +from snowglobe.client import CompletionRequest, CompletionFunctionOutputs +from phoenix.otel import register + +os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "http://localhost:6006" + +# configure the Phoenix tracer +tracer_provider = register( + project_name="my-llm-app", # Default is 'default' +) + +# Import the OpenAI instrumentor from OpenInference +from openinference.instrumentation.openai import OpenAIInstrumentor + +# Instrument OpenAI +OpenAIInstrumentor().instrument(tracer_provider=tracer_provider) + +# Import the OpenInference instrumentor from Snowglobe +from snowglobe.telemetry.openinference import OpenInferenceInstrumentor + +# Instrument the Snowglobe client +OpenInferenceInstrumentor().instrument(tracer_provider=tracer_provider) + + +def completion_fn(request: CompletionRequest) -> CompletionFunctionOutputs: + messages = request.to_openai_messages() + response = client.chat.completions.create( + model="gpt-4o-mini", + messages=messages + ) + return CompletionFunctionOutputs(response=response.choices[0].message.content) +``` \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..3a99f2d --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,137 @@ +[project] +name = "snowglobe-telemetry-openinference" +version = "0.0.0a0" +authors = [ + {name = "Guardrails AI", email = "contact@guardrailsai.com"} +] +license = {file = "LICENSE"} + +description = "OpenInference instrumentation for the snowglobe client" +readme = "README.md" +requires-python = ">= 3.9" + +dependencies = [ + "opentelemetry-api", + "opentelemetry-sdk", + "opentelemetry-exporter-otlp", + "opentelemetry-instrumentation", + "opentelemetry-semantic-conventions", + "openinference-instrumentation>=0.1.27", + "openinference-semantic-conventions>=0.1.17", + "packaging" +] + +[project.optional-dependencies] +dev = [ + "snowglobe>=0.4.16", + "ruff>=0.1.0,<0.2.0", + "mypy>=1.5.0,<2.0.0", + "pre-commit>=4.1.0", + "coverage>=7.6.12", + "pyright[nodejs]>=1.1.396" +] + +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + +[tool.ruff] +# Exclude a variety of commonly ignored directories. +exclude = [ + ".bzr", + ".direnv", + ".eggs", + ".git", + ".git-rewrite", + ".hg", + ".ipynb_checkpoints", + ".mypy_cache", + ".nox", + ".pants.d", + ".pyenv", + ".pytest_cache", + ".pytype", + ".ruff_cache", + ".svn", + ".tox", + ".venv", + ".vscode", + "__pypackages__", + "_build", + "buck-out", + "build", + "dist", + "node_modules", + "site-packages", + "venv", +] + +# Same as Black. +line-length = 88 +indent-width = 4 + +target-version = "py39" + + +[tool.ruff.lint] +# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. +# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or +# McCabe complexity (`C901`) by default. +select = ["E4", "E7", "E9", "F"] +ignore = [] + +# Allow fix for all enabled rules (when `--fix`) is provided. +fixable = ["ALL"] +unfixable = [] + +# Allow unused variables when underscore-prefixed. +dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" + +[tool.ruff.format] +# Like Black, use double quotes for strings. +quote-style = "double" + +# Like Black, indent with spaces, rather than tabs. +indent-style = "space" + +# Like Black, respect magic trailing commas. +skip-magic-trailing-comma = false + +# Like Black, automatically detect the appropriate line ending. +line-ending = "auto" + +# Enable auto-formatting of code examples in docstrings. Markdown, +# reStructuredText code/literal blocks and doctests are all supported. +# +# This is currently disabled by default, but it is planned for this +# to be opt-out in the future. +docstring-code-format = false + +# Set the line length limit used when formatting code snippets in +# docstrings. +# +# This only has an effect when the `docstring-code-format` setting is +# enabled. +docstring-code-line-length = "dynamic" + +[tool.coverage.run] +omit = [ + "tests/*", + "/tmp/*", + "*/tmp*", + "/private/var/folders/*" +] + +[tool.mypy] +python_version = "3.9" +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true +disallow_incomplete_defs = true +check_untyped_defs = true +disallow_untyped_decorators = true +no_implicit_optional = true +warn_redundant_casts = true +warn_unused_ignores = true +warn_no_return = true +warn_unreachable = true diff --git a/pyrightconfig.json b/pyrightconfig.json new file mode 100644 index 0000000..3f94c45 --- /dev/null +++ b/pyrightconfig.json @@ -0,0 +1,19 @@ +{ + "include": [ + "src/snowglobe/telemetry/openinference", + ], + + "exclude": [ + "**/node_modules", + "**/__pycache__" + ], + + "defineConstant": { + "DEBUG": true + }, + + "reportMissingImports": "error", + "reportMissingTypeStubs": false, + + "pythonVersion": "3.11", + } \ No newline at end of file diff --git a/src/snowglobe/telemetry/openinference/__init__.py b/src/snowglobe/telemetry/openinference/__init__.py new file mode 100644 index 0000000..78c6a83 --- /dev/null +++ b/src/snowglobe/telemetry/openinference/__init__.py @@ -0,0 +1,3 @@ +from .openinference_instrumentor import OpenInferenceInstrumentor + +__all__ = ["OpenInferenceInstrumentor"] diff --git a/src/snowglobe/telemetry/openinference/openinference_instrumentor.py b/src/snowglobe/telemetry/openinference/openinference_instrumentor.py new file mode 100644 index 0000000..8b0f213 --- /dev/null +++ b/src/snowglobe/telemetry/openinference/openinference_instrumentor.py @@ -0,0 +1,375 @@ +import logging +import os +import opentelemetry.context as context_api +from importlib import import_module, metadata +from functools import wraps +from packaging.version import Version + +from openinference.instrumentation import get_attributes_from_context +from openinference.instrumentation import OITracer, TraceConfig +from openinference.semconv.trace import ( + OpenInferenceSpanKindValues, + OpenInferenceMimeTypeValues, + SpanAttributes, +) + +from opentelemetry import trace as trace_api +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.trace import Tracer, TracerProvider as iTracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor + +from snowglobe.client.src.models import ( + CompletionRequest, + CompletionFunctionOutputs, + RiskEvaluationRequest, + RiskEvaluationOutputs, +) +from snowglobe.client.src.types import ( + CompletionFnTelemetryContext, + RiskEvalTelemetryContext, +) +from snowglobe.telemetry.openinference.openinference_utils import _flatten + +from typing import Any, Awaitable, Callable, Collection, TypeVar, Union + + +logger = logging.getLogger(__name__) + + +T = TypeVar("T") + + +class OpenInferenceInstrumentor(BaseInstrumentor): + __slots__ = ( + "_tracer", + "_snowglobe_version", + "_run_completion_fn", + "_run_risk_evaluation_fn", + ) + + _trace: Tracer + _snowglobe_version: str + _run_completion_fn: Callable[ + [ + Union[ + Callable[[CompletionRequest], CompletionFunctionOutputs], + Callable[[CompletionRequest], Awaitable[CompletionFunctionOutputs]], + ], + CompletionRequest, + CompletionFnTelemetryContext, + ], + Awaitable[CompletionFunctionOutputs], + ] + _run_risk_evaluation_fn: Callable[ + [ + Union[ + Callable[[RiskEvaluationRequest], RiskEvaluationOutputs], + Callable[[RiskEvaluationRequest], Awaitable[RiskEvaluationOutputs]], + ], + RiskEvaluationRequest, + RiskEvalTelemetryContext, + ], + Awaitable[RiskEvaluationOutputs], + ] + + def __init__(self): + super().__init__() + self._snowglobe_version = metadata.version("snowglobe") + runner = import_module("snowglobe.client.src.runner") + self._run_completion_fn = runner.run_completion_fn + self._run_risk_evaluation_fn = runner.run_risk_evaluation_fn + + def instrumentation_dependencies(self) -> Collection[str]: + return ["snowglobe >= 0.4.16"] + + def _setup_default_tracer_provider(self) -> iTracerProvider: + tracer_provider = trace_api.get_tracer_provider() + + if ( + not tracer_provider._active_span_processor._span_processors # type: ignore + and hasattr(tracer_provider, "add_span_processor") + ): + if not os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL"): + os.environ["OTEL_EXPORTER_OTLP_PROTOCOL"] = "http/protobuf" + trace_exporter = OTLPSpanExporter() + span_processor = BatchSpanProcessor(trace_exporter) + tracer_provider.add_span_processor(span_processor) # type: ignore + + return tracer_provider + + def _instrument(self, **kwargs: Any): + version = Version(metadata.version("snowglobe")) + if (version.major, version.minor, version.micro) < (0, 4, 16): + logger.info("Snowglobe version < 0.4.16 detected, skipping instrumentation") + return + + tracer_provider = kwargs.get("tracer_provider") + if not tracer_provider: + tracer_provider = self._setup_default_tracer_provider() + + config = kwargs.get("config") + if not config: + config = TraceConfig() + else: + assert isinstance(config, TraceConfig) + self._tracer = OITracer( + trace_api.get_tracer(__name__, self._snowglobe_version, tracer_provider), + config=config, + ) + + runner = import_module("snowglobe.client.src.runner") + + run_completion_fn: Callable[ + [ + Union[ + Callable[[CompletionRequest], CompletionFunctionOutputs], + Callable[[CompletionRequest], Awaitable[CompletionFunctionOutputs]], + ], + CompletionRequest, + CompletionFnTelemetryContext, + ], + Awaitable[CompletionFunctionOutputs], + ] = runner.run_completion_fn + wrapped_run_completion_fn = self._instrument_completion_fn(run_completion_fn) + setattr(runner, "run_completion_fn", wrapped_run_completion_fn) + setattr(runner.run_completion_fn, "__instrumented_by_openinference", True) + + run_risk_evaluation_fn: Callable[ + [ + Union[ + Callable[[RiskEvaluationRequest], RiskEvaluationOutputs], + Callable[[RiskEvaluationRequest], Awaitable[RiskEvaluationOutputs]], + ], + RiskEvaluationRequest, + RiskEvalTelemetryContext, + ], + Awaitable[RiskEvaluationOutputs], + ] = runner.run_risk_evaluation_fn + wrapped_risk_evaluation_fn = self._instrument_risk_evaluation_fn( + run_risk_evaluation_fn + ) + setattr(runner, "run_risk_evaluation_fn", wrapped_risk_evaluation_fn) + setattr(runner.run_risk_evaluation_fn, "__instrumented_by_openinference", True) + + def _uninstrument(self, **kwargs: Any): + runner = import_module("snowglobe.client.src.runner") + if self._run_completion_fn: + setattr(runner, "run_completion_fn", self._run_completion_fn) + delattr(runner.run_completion_fn, "__instrumented_by_openinference") + + if self._run_risk_evaluation_fn: + setattr(runner, "run_risk_evaluation_fn", self._run_risk_evaluation_fn) + delattr(runner.run_risk_evaluation_fn, "__instrumented_by_openinference") + + async def _collapse(self, wave_fn: Union[T, Awaitable[T]]) -> T: + if isinstance(wave_fn, Awaitable): + awaited = await wave_fn + return awaited + return wave_fn + + def _instrument_completion_fn( + self, + run_completion_fn: Callable[ + [ + Union[ + Callable[[CompletionRequest], CompletionFunctionOutputs], + Callable[[CompletionRequest], Awaitable[CompletionFunctionOutputs]], + ], + CompletionRequest, + CompletionFnTelemetryContext, + ], + Awaitable[CompletionFunctionOutputs], + ], + ): + @wraps(run_completion_fn) + async def run_completion_fn_wrapper( + completion_fn: Union[ + Callable[[CompletionRequest], CompletionFunctionOutputs], + Callable[[CompletionRequest], Awaitable[CompletionFunctionOutputs]], + ], + completion_request: CompletionRequest, + telemetry_context: CompletionFnTelemetryContext, + ) -> CompletionFunctionOutputs: + session_id = telemetry_context["session_id"] + conversation_id = telemetry_context["conversation_id"] + message_id = telemetry_context["message_id"] + simulation_name = telemetry_context["simulation_name"] + agent_name = telemetry_context["agent_name"] + span_type = telemetry_context["span_type"] + + @wraps(completion_fn) + async def completion_fn_wrapper( + req: CompletionRequest, + ) -> CompletionFunctionOutputs: + if context_api.get_value(context_api._SUPPRESS_INSTRUMENTATION_KEY): + completion_fn_out = await self._collapse(completion_fn(req)) + else: + messages = req.to_openai_messages() + with self._tracer.start_as_current_span( + span_type, + attributes=dict( + _flatten( + { + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.LLM, + SpanAttributes.INPUT_VALUE: messages[-1].get( + "content" + ) + or "", + SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.TEXT, + } + ) + ), + ) as span: + context_attributes = dict(get_attributes_from_context()) + span_attributes = dict( + _flatten( + { + **context_attributes, + SpanAttributes.SESSION_ID: str(session_id), + SpanAttributes.PROMPT_ID: str(message_id), + SpanAttributes.AGENT_NAME: agent_name, + SpanAttributes.LLM_INPUT_MESSAGES: messages, + "snowglobe.version": self._snowglobe_version, + "snowglobe.span.type": span_type, + "snowglobe.conversation.id": str(conversation_id), + "snowglobe.message.id": str(message_id), + "snowglobe.simulation.name": simulation_name, + } + ) + ) + span.set_attributes(span_attributes) + try: + completion_fn_out = await self._collapse(completion_fn(req)) + span.set_attributes( + dict( + _flatten( + { + SpanAttributes.OUTPUT_VALUE: completion_fn_out.response, + SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.TEXT, + SpanAttributes.LLM_OUTPUT_MESSAGES: [ + *messages, + { + "role": "assistant", + "content": completion_fn_out.response, + }, + ], + } + ) + ) + ) + except Exception as exception: + span.set_status( + trace_api.Status( + trace_api.StatusCode.ERROR, str(exception) + ) + ) + span.record_exception(exception) + raise + span.set_status(trace_api.StatusCode.OK) + + return completion_fn_out + + completion_fn_wrapper_out = await completion_fn_wrapper(completion_request) + return completion_fn_wrapper_out + + return run_completion_fn_wrapper + + def _instrument_risk_evaluation_fn( + self, + run_risk_evaluation_fn: Callable[ + [ + Union[ + Callable[[RiskEvaluationRequest], RiskEvaluationOutputs], + Callable[[RiskEvaluationRequest], Awaitable[RiskEvaluationOutputs]], + ], + RiskEvaluationRequest, + RiskEvalTelemetryContext, + ], + Awaitable[RiskEvaluationOutputs], + ], + ): + @wraps(run_risk_evaluation_fn) + async def run_risk_evaluation_fn_wrapper( + risk_evaluation_fn: Union[ + Callable[[RiskEvaluationRequest], RiskEvaluationOutputs], + Callable[[RiskEvaluationRequest], Awaitable[RiskEvaluationOutputs]], + ], + risk_evaluation_request: RiskEvaluationRequest, + telemetry_context: RiskEvalTelemetryContext, + ) -> RiskEvaluationOutputs: + session_id = telemetry_context["session_id"] + conversation_id = telemetry_context["conversation_id"] + message_id = telemetry_context["message_id"] + simulation_name = telemetry_context["simulation_name"] + agent_name = telemetry_context["agent_name"] + span_type = telemetry_context["span_type"] + risk_name = telemetry_context["risk_name"] + + @wraps(risk_evaluation_fn) + async def risk_evaluation_fn_wrapper( + req: RiskEvaluationRequest, + ): + if context_api.get_value(context_api._SUPPRESS_INSTRUMENTATION_KEY): + risk_evaluation_fn_out = await self._collapse( + risk_evaluation_fn(req) + ) + else: + with self._tracer.start_as_current_span( + span_type, + attributes=dict( + _flatten( + { + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.EVALUATOR, + SpanAttributes.INPUT_VALUE: req.model_dump_json(), + SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON, + } + ) + ), + ) as span: + context_attributes = dict(get_attributes_from_context()) + span_attributes = { + **context_attributes, + SpanAttributes.SESSION_ID: str(session_id), + SpanAttributes.PROMPT_ID: str(message_id), + SpanAttributes.AGENT_NAME: agent_name, + "snowglobe.client.version": self._snowglobe_version, + "snowglobe.span.type": span_type, + "snowglobe.conversation.id": str(conversation_id), + "snowglobe.message.id": str(message_id), + "snowglobe.simulation.name": simulation_name, + "snowglobe.risk.name": risk_name, + } + span.set_attributes(span_attributes) + try: + risk_evaluation_fn_out = await self._collapse( + risk_evaluation_fn(req) + ) + span.set_attributes( + dict( + _flatten( + { + SpanAttributes.OUTPUT_VALUE: risk_evaluation_fn_out.model_dump_json(), + SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON, + } + ) + ) + ) + except Exception as exception: + span.set_status( + trace_api.Status( + trace_api.StatusCode.ERROR, str(exception) + ) + ) + span.record_exception(exception) + raise + span.set_status(trace_api.StatusCode.OK) + + return risk_evaluation_fn_out + + risk_evaluation_fn_wrapper_out = await risk_evaluation_fn_wrapper( + risk_evaluation_request + ) + return risk_evaluation_fn_wrapper_out + + return run_risk_evaluation_fn_wrapper diff --git a/src/snowglobe/telemetry/openinference/openinference_utils.py b/src/snowglobe/telemetry/openinference/openinference_utils.py new file mode 100644 index 0000000..7434702 --- /dev/null +++ b/src/snowglobe/telemetry/openinference/openinference_utils.py @@ -0,0 +1,29 @@ +""" +Borrowed heavily from openinference-instrumentation-guardrails +""" +from enum import Enum +from typing import Any, Iterator, List, Mapping, Optional, Tuple +from opentelemetry.util.types import AttributeValue + + +def _flatten( + mapping: Optional[Mapping[str, Any]], +) -> Iterator[Tuple[str, AttributeValue]]: + if not mapping: + return + for key, value in mapping.items(): + if value is None: + continue + if isinstance(value, Mapping): + for sub_key, sub_value in _flatten(value): + yield f"{key}.{sub_key}", sub_value + elif isinstance(value, List) and any( + isinstance(item, Mapping) for item in value + ): + for index, sub_mapping in enumerate(value): + for sub_key, sub_value in _flatten(sub_mapping): + yield f"{key}.{index}.{sub_key}", sub_value + else: + if isinstance(value, Enum): + value = value.value + yield key, value From 65bf398ade25c845f6fb7b93a02b1b58ae04389e Mon Sep 17 00:00:00 2001 From: Caleb Courier Date: Tue, 7 Oct 2025 15:40:06 -0500 Subject: [PATCH 2/2] Cite source for _flatten --- src/snowglobe/telemetry/openinference/openinference_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/snowglobe/telemetry/openinference/openinference_utils.py b/src/snowglobe/telemetry/openinference/openinference_utils.py index 7434702..58f0602 100644 --- a/src/snowglobe/telemetry/openinference/openinference_utils.py +++ b/src/snowglobe/telemetry/openinference/openinference_utils.py @@ -1,5 +1,6 @@ """ -Borrowed heavily from openinference-instrumentation-guardrails +Borrowed heavily from openinference-instrumentation-guardrails. +Source: https://github.com/Arize-ai/openinference/blob/main/python/instrumentation/openinference-instrumentation-guardrails/src/openinference/instrumentation/guardrails/_wrap_guard_call.py#L29-L45 """ from enum import Enum from typing import Any, Iterator, List, Mapping, Optional, Tuple