-
Notifications
You must be signed in to change notification settings - Fork 132
feat: add Arize Phoenix tracing integration #904
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
779bf1f
e9c7e83
ffb3704
7f80362
a5f8294
9a22e30
94fced1
8bd7d0a
ef3db13
3758794
ff01cdd
958da8f
8e1a354
9b1b0f6
1588bc2
54ec1b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,4 +15,6 @@ types-requests | |
| sse-starlette | ||
| google-genai | ||
| ollama | ||
| pytest-xdist | ||
| pytest-xdist | ||
| aioitertools | ||
| llvmlite | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| """ | ||
| Ragbits Core Example: Arize Phoenix Audit | ||
|
|
||
| This example demonstrates how to collect traces using Ragbits audit module with Arize Phoenix. | ||
| We run a simple LLM generation to collect telemetry data, which is then sent to the Phoenix server. | ||
|
|
||
| 1. The script exports traces to the local Phoenix server running on http://localhost:6006. | ||
| You need to have Phoenix running locally: | ||
|
|
||
| ```bash | ||
| pip install arize-phoenix | ||
| python -m phoenix.server.main serve | ||
mkoruszowic marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ``` | ||
|
|
||
| 2. To run the script, execute the following command: | ||
|
|
||
| ```bash | ||
| uv run examples/core/audit/phoenix.py | ||
| ``` | ||
|
|
||
| 3. To visualize the traces: | ||
| 1. Open your browser and navigate to http://localhost:6006. | ||
| 2. Check the Projects tab (default project). | ||
| """ | ||
|
|
||
| # /// script | ||
| # requires-python = ">=3.10" | ||
| # dependencies = [ | ||
| # "ragbits-core[phoenix]", | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it will be good to have constraint for the version where this extras will be available
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And which version would that be?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it will be included in the next release which is 1.4.0 . Am I right @mhordynski ? |
||
| # "tqdm", | ||
| # ] | ||
| # /// | ||
|
|
||
| import asyncio | ||
| from collections.abc import AsyncGenerator | ||
|
|
||
| from pydantic import BaseModel | ||
| from tqdm.asyncio import tqdm | ||
|
|
||
| from ragbits.core.audit import set_trace_handlers, traceable | ||
| from ragbits.core.llms import LiteLLM | ||
| from ragbits.core.prompt import Prompt | ||
|
|
||
|
|
||
| class QuestionPromptInput(BaseModel): | ||
| """ | ||
| Input schema for the question prompt. | ||
| """ | ||
|
|
||
| question: str | ||
|
|
||
|
|
||
| class QuestionPrompt(Prompt[QuestionPromptInput, str]): | ||
| """ | ||
| A simple prompt for answering questions. | ||
| """ | ||
|
|
||
| system_prompt = "You are a helpful assistant." | ||
| user_prompt = "Question: {{ question }}" | ||
|
|
||
|
|
||
| @traceable | ||
| async def process_request() -> None: | ||
| """ | ||
| Process an example request. | ||
| """ | ||
| llm = LiteLLM(model_name="gpt-3.5-turbo") | ||
|
|
||
| # Simple generation | ||
| prompt = QuestionPrompt(QuestionPromptInput(question="What is the capital of France?")) | ||
| await llm.generate(prompt) | ||
|
|
||
| # You can also use explicit tracing context | ||
| # with trace("my_custom_span") as span: | ||
| # ... | ||
|
|
||
|
|
||
| async def main() -> None: | ||
| """ | ||
| Run the example. | ||
| """ | ||
| # Ragbits observability setup | ||
| # This automatically sets up the OpenTelemetry TracerProvider pointing to Phoenix default (localhost:6006) | ||
| set_trace_handlers("phoenix") | ||
|
|
||
| print("Running requests... Make sure Phoenix is running at http://localhost:6006") | ||
|
|
||
| async def run() -> AsyncGenerator: | ||
| for _ in range(3): | ||
| await process_request() | ||
| yield | ||
|
|
||
| async for _ in tqdm(run()): | ||
| pass | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,6 +73,10 @@ logfire = [ | |
| "logfire[system-metrics]>=3.19.0,<4.0.0", | ||
| "opentelemetry-api>=1.27.0,<2.0.0", | ||
| ] | ||
| phoenix = [ | ||
| "arize-phoenix>=4.0.0,<5.0.0", | ||
| "openinference-instrumentation>=0.1.0,<1.0.0", | ||
| ] | ||
|
Comment on lines
76
to
80
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are the range constraints so restrictive?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 12.0.0; 0.1.41
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please update uv.lock accordingly |
||
| qdrant = [ | ||
| "qdrant-client>=1.12.1,<2.0.0", | ||
| ] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| import json | ||
|
|
||
| from openinference.semconv.trace import OpenInferenceSpanKindValues, SpanAttributes | ||
| from opentelemetry import trace | ||
| from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter | ||
| from opentelemetry.sdk.resources import SERVICE_NAME, Resource | ||
| from opentelemetry.sdk.trace import TracerProvider | ||
| from opentelemetry.sdk.trace.export import BatchSpanProcessor | ||
| from opentelemetry.trace import Span | ||
|
|
||
| from ragbits.core.audit.traces.otel import OtelTraceHandler | ||
|
|
||
|
|
||
| class PhoenixTraceHandler(OtelTraceHandler): | ||
| """ | ||
| Phoenix trace handler. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| endpoint: str = "http://localhost:6006/v1/traces", | ||
| service_name: str = "ragbits-phoenix", | ||
| ) -> None: | ||
| """ | ||
| Initialize the PhoenixTraceHandler instance. | ||
|
|
||
| Args: | ||
| endpoint: The Phoenix endpoint. | ||
| service_name: The service name. | ||
| """ | ||
| resource = Resource(attributes={SERVICE_NAME: service_name}) | ||
| span_exporter = OTLPSpanExporter(endpoint=endpoint) | ||
| tracer_provider = TracerProvider(resource=resource) | ||
| tracer_provider.add_span_processor(BatchSpanProcessor(span_exporter)) | ||
| trace.set_tracer_provider(tracer_provider) | ||
| super().__init__(provider=tracer_provider) | ||
|
|
||
| def start(self, name: str, inputs: dict, current_span: Span | None = None) -> Span: | ||
| """ | ||
| Log input data at the beginning of the trace. | ||
|
|
||
| Args: | ||
| name: The name of the trace. | ||
| inputs: The input data. | ||
| current_span: The current trace span. | ||
|
|
||
| Returns: | ||
| The updated current trace span. | ||
| """ | ||
| span = super().start(name, inputs, current_span) | ||
|
|
||
| # Check if it's an LLM generation | ||
| if "generate" in name.lower(): | ||
| span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, OpenInferenceSpanKindValues.LLM.value) | ||
|
|
||
| if "model_name" in inputs: | ||
| span.set_attribute(SpanAttributes.LLM_MODEL_NAME, inputs["model_name"]) | ||
|
|
||
| if "prompt" in inputs: | ||
| span.set_attribute(SpanAttributes.INPUT_VALUE, str(inputs["prompt"])) | ||
|
|
||
| if "options" in inputs: | ||
| span.set_attribute(SpanAttributes.LLM_INVOCATION_PARAMETERS, str(inputs["options"])) | ||
|
|
||
| return span | ||
|
|
||
| def stop(self, outputs: dict, current_span: Span) -> None: | ||
| """ | ||
| Log output data at the end of the trace. | ||
|
|
||
| Args: | ||
| outputs: The output data. | ||
| current_span: The current trace span. | ||
| """ | ||
| if "response" in outputs: | ||
| response = outputs["response"] | ||
| # If response is a list of objects, serialize it | ||
| if isinstance(response, list | dict): | ||
| try: | ||
| current_span.set_attribute(SpanAttributes.OUTPUT_VALUE, json.dumps(str(response))) | ||
| except (TypeError, ValueError): | ||
| current_span.set_attribute(SpanAttributes.OUTPUT_VALUE, str(response)) | ||
| else: | ||
| current_span.set_attribute(SpanAttributes.OUTPUT_VALUE, str(response)) | ||
|
|
||
| super().stop(outputs, current_span) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,97 @@ | ||
| from unittest.mock import MagicMock, patch | ||
|
|
||
| import pytest | ||
| from openinference.semconv.trace import OpenInferenceSpanKindValues, SpanAttributes | ||
|
|
||
| from ragbits.core.audit.traces.phoenix import PhoenixTraceHandler | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def mock_otel_exporter(): | ||
| with patch("ragbits.core.audit.traces.phoenix.OTLPSpanExporter") as mock: | ||
| yield mock | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def mock_tracer_provider(): | ||
| with patch("ragbits.core.audit.traces.phoenix.TracerProvider") as mock: | ||
| yield mock | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def mock_batch_span_processor(): | ||
| with patch("ragbits.core.audit.traces.phoenix.BatchSpanProcessor") as mock: | ||
| yield mock | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def mock_set_tracer_provider(): | ||
| with patch("ragbits.core.audit.traces.phoenix.trace.set_tracer_provider") as mock: | ||
| yield mock | ||
|
|
||
|
|
||
| def test_phoenix_trace_handler_init( | ||
| mock_otel_exporter: MagicMock, | ||
| mock_tracer_provider: MagicMock, | ||
| mock_batch_span_processor: MagicMock, | ||
| mock_set_tracer_provider: MagicMock, | ||
| ) -> None: | ||
| handler = PhoenixTraceHandler() | ||
|
|
||
| assert isinstance(handler, PhoenixTraceHandler) | ||
| mock_otel_exporter.assert_called_once_with(endpoint="http://localhost:6006/v1/traces") | ||
| mock_tracer_provider.assert_called_once() | ||
| mock_batch_span_processor.assert_called_once() | ||
| mock_set_tracer_provider.assert_called_once() | ||
|
|
||
|
|
||
| def test_phoenix_trace_handler_init_custom_endpoint( | ||
| mock_otel_exporter: MagicMock, | ||
| mock_tracer_provider: MagicMock, | ||
| mock_batch_span_processor: MagicMock, | ||
| mock_set_tracer_provider: MagicMock, | ||
| ) -> None: | ||
| custom_endpoint = "http://custom-phoenix:6006/v1/traces" | ||
| handler = PhoenixTraceHandler(endpoint=custom_endpoint) | ||
|
|
||
| assert isinstance(handler, PhoenixTraceHandler) | ||
| mock_otel_exporter.assert_called_once_with(endpoint=custom_endpoint) | ||
|
|
||
|
|
||
| def test_start_llm_span( | ||
| mock_otel_exporter: MagicMock, | ||
| mock_tracer_provider: MagicMock, | ||
| mock_batch_span_processor: MagicMock, | ||
| mock_set_tracer_provider: MagicMock, | ||
| ) -> None: | ||
| handler = PhoenixTraceHandler() | ||
| span_mock = MagicMock() | ||
|
|
||
| with patch("ragbits.core.audit.traces.otel.OtelTraceHandler.start", return_value=span_mock) as mock_super_start: | ||
| inputs = {"model_name": "gpt-4", "prompt": "hello", "options": {"temperature": 0.5}} | ||
| handler.start("generate", inputs) | ||
|
|
||
| mock_super_start.assert_called_once() | ||
| span_mock.set_attribute.assert_any_call( | ||
| SpanAttributes.OPENINFERENCE_SPAN_KIND, OpenInferenceSpanKindValues.LLM.value | ||
| ) | ||
| span_mock.set_attribute.assert_any_call(SpanAttributes.LLM_MODEL_NAME, "gpt-4") | ||
| span_mock.set_attribute.assert_any_call(SpanAttributes.INPUT_VALUE, "hello") | ||
| span_mock.set_attribute.assert_any_call(SpanAttributes.LLM_INVOCATION_PARAMETERS, "{'temperature': 0.5}") | ||
|
|
||
|
|
||
| def test_stop_llm_span( | ||
| mock_otel_exporter: MagicMock, | ||
| mock_tracer_provider: MagicMock, | ||
| mock_batch_span_processor: MagicMock, | ||
| mock_set_tracer_provider: MagicMock, | ||
| ) -> None: | ||
| handler = PhoenixTraceHandler() | ||
| span_mock = MagicMock() | ||
|
|
||
| with patch("ragbits.core.audit.traces.otel.OtelTraceHandler.stop") as mock_super_stop: | ||
| outputs = {"response": "world"} | ||
| handler.stop(outputs, span_mock) | ||
|
|
||
| mock_super_stop.assert_called_once() | ||
| span_mock.set_attribute.assert_any_call(SpanAttributes.OUTPUT_VALUE, "world") |
Uh oh!
There was an error while loading. Please reload this page.