Skip to content

Commit 79ebb2c

Browse files
Merge pull request BerriAI#14888 from mrFranklin/feat/improve-opik
feat: improve opik integration code
2 parents f2f75bf + f717e53 commit 79ebb2c

File tree

3 files changed

+82
-33
lines changed

3 files changed

+82
-33
lines changed

docs/my-website/docs/observability/opik_integration.md

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ These can be passed inside metadata with the `opik` key.
140140
- `project_name` - Name of the Opik project to send data to.
141141
- `current_span_data` - The current span data to be used for tracing.
142142
- `tags` - Tags to be used for tracing.
143+
- `thread_id` - The thread id to group together multiple related traces.
143144
144145
### Usage
145146
@@ -159,8 +160,10 @@ response = litellm.completion(
159160
messages=messages,
160161
metadata = {
161162
"opik": {
163+
"project_name": "your-opik-project-name",
162164
"current_span_data": get_current_span_data(),
163165
"tags": ["streaming-test"],
166+
"thread_id": "your-thread-id"
164167
},
165168
}
166169
)
@@ -174,7 +177,7 @@ curl -L -X POST 'http://0.0.0.0:4000/v1/chat/completions' \
174177
-H 'Content-Type: application/json' \
175178
-H 'Authorization: Bearer sk-1234' \
176179
-d '{
177-
"model": "gpt-3.5-turbo-testing",
180+
"model": "gpt-3.5-turbo",
178181
"messages": [
179182
{
180183
"role": "user",
@@ -183,8 +186,10 @@ curl -L -X POST 'http://0.0.0.0:4000/v1/chat/completions' \
183186
],
184187
"metadata": {
185188
"opik": {
189+
"project_name": "your-opik-project-name",
186190
"current_span_data": "...",
187191
"tags": ["streaming-test"],
192+
"thread_id": "your-thread-id"
188193
},
189194
}
190195
}'
@@ -195,12 +200,25 @@ curl -L -X POST 'http://0.0.0.0:4000/v1/chat/completions' \
195200

196201

197202

203+
You can also pass the fields as part of the request header with a `opik_*` prefix:
198204

199-
200-
201-
202-
203-
205+
```shell
206+
curl --location --request POST 'http://0.0.0.0:4000/chat/completions' \
207+
--header 'Content-Type: application/json' \
208+
--header 'Authorization: Bearer sk-1234' \
209+
--header 'opik_project_name: your-opik-project-name' \
210+
--header 'opik_thread_id: your-thread-id' \
211+
--header 'opik_tags: ["streaming-test"]' \
212+
--data '{
213+
"model": "gpt-3.5-turbo",
214+
"messages": [
215+
{
216+
"role": "user",
217+
"content": "What's the weather like in Boston today?"
218+
}
219+
]
220+
}'
221+
```
204222
205223
206224

litellm/integrations/opik/opik.py

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,25 @@ def _create_opik_payload( # noqa: PLR0915
192192

193193
# Extract opik metadata
194194
litellm_opik_metadata = litellm_params_metadata.get("opik", {})
195+
196+
# Use standard_logging_object to create metadata and input/output data
197+
standard_logging_object = kwargs.get("standard_logging_object", None)
198+
if standard_logging_object is None:
199+
verbose_logger.debug(
200+
"OpikLogger skipping event; no standard_logging_object found"
201+
)
202+
return []
203+
204+
# Update litellm_opik_metadata with opik metadata from requester
205+
standard_logging_metadata = standard_logging_object.get("metadata", {}) or {}
206+
requester_metadata = standard_logging_metadata.get("requester_metadata", {}) or {}
207+
requester_opik_metadata = requester_metadata.get("opik", {}) or {}
208+
litellm_opik_metadata.update(requester_opik_metadata)
209+
195210
verbose_logger.debug(
196211
f"litellm_opik_metadata - {json.dumps(litellm_opik_metadata, default=str)}"
197212
)
213+
198214
project_name = litellm_opik_metadata.get("project_name", self.opik_project_name)
199215

200216
# Extract trace_id and parent_span_id
@@ -208,19 +224,33 @@ def _create_opik_payload( # noqa: PLR0915
208224
else:
209225
trace_id = None
210226
parent_span_id = None
227+
211228
# Create Opik tags
212229
opik_tags = litellm_opik_metadata.get("tags", [])
213230
if kwargs.get("custom_llm_provider"):
214231
opik_tags.append(kwargs["custom_llm_provider"])
215-
216-
# Use standard_logging_object to create metadata and input/output data
217-
standard_logging_object = kwargs.get("standard_logging_object", None)
218-
if standard_logging_object is None:
219-
verbose_logger.debug(
220-
"OpikLogger skipping event; no standard_logging_object found"
221-
)
222-
return []
223-
232+
233+
# Get thread_id if present
234+
thread_id = litellm_opik_metadata.get("thread_id", None)
235+
236+
# Override with any opik_ headers from proxy request
237+
proxy_server_request = _litellm_params.get("proxy_server_request", {}) or {}
238+
proxy_headers = proxy_server_request.get("headers", {}) or {}
239+
for key, value in proxy_headers.items():
240+
if key.startswith("opik_"):
241+
param_key = key.replace("opik_", "", 1)
242+
if param_key == "project_name" and value:
243+
project_name = value
244+
elif param_key == "thread_id" and value:
245+
thread_id = value
246+
elif param_key == "tags" and value:
247+
try:
248+
parsed_tags = json.loads(value)
249+
if isinstance(parsed_tags, list):
250+
opik_tags.extend(parsed_tags)
251+
except (json.JSONDecodeError, TypeError):
252+
pass
253+
224254
# Create input and output data
225255
input_data = standard_logging_object.get("messages", {})
226256
output_data = standard_logging_object.get("response", {})
@@ -243,7 +273,7 @@ def _create_opik_payload( # noqa: PLR0915
243273
del metadata["current_span_data"]
244274
metadata["created_from"] = "litellm"
245275

246-
metadata.update(standard_logging_object.get("metadata", {}))
276+
metadata.update(standard_logging_metadata)
247277
if "call_type" in standard_logging_object:
248278
metadata["type"] = standard_logging_object["call_type"]
249279
if "status" in standard_logging_object:
@@ -286,20 +316,20 @@ def _create_opik_payload( # noqa: PLR0915
286316
verbose_logger.debug(
287317
f"OpikLogger creating payload for trace with id {trace_id}"
288318
)
289-
290-
payload.append(
291-
{
292-
"project_name": project_name,
293-
"id": trace_id,
294-
"name": trace_name,
295-
"start_time": start_time.astimezone(timezone.utc).isoformat().replace("+00:00", "Z"),
296-
"end_time": end_time.astimezone(timezone.utc).isoformat().replace("+00:00", "Z"),
297-
"input": input_data,
298-
"output": output_data,
299-
"metadata": metadata,
300-
"tags": opik_tags,
301-
}
302-
)
319+
payload.append(
320+
{
321+
"project_name": project_name,
322+
"id": trace_id,
323+
"name": trace_name,
324+
"start_time": start_time.astimezone(timezone.utc).isoformat().replace("+00:00", "Z"),
325+
"end_time": end_time.astimezone(timezone.utc).isoformat().replace("+00:00", "Z"),
326+
"input": input_data,
327+
"output": output_data,
328+
"metadata": metadata,
329+
"tags": opik_tags,
330+
"thread_id": thread_id,
331+
}
332+
)
303333

304334
span_id = create_uuid7()
305335
verbose_logger.debug(
@@ -319,6 +349,7 @@ def _create_opik_payload( # noqa: PLR0915
319349
"output": output_data,
320350
"metadata": metadata,
321351
"tags": opik_tags,
352+
"thread_id": thread_id,
322353
"usage": usage,
323354
}
324355
)

tests/local_testing/test_opik.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,9 @@ def test_sync_opik_logging_http_request():
120120
temperature=0.2,
121121
mock_response="This is a mock response",
122122
)
123-
124-
# Need to wait for a short amount of time as the log_success callback is called in a different thread
125-
time.sleep(1)
123+
124+
# Need to wait for a short amount of time as the log_success callback is called in a different thread. One or two seconds is often not enough.
125+
time.sleep(3)
126126

127127
# Check that 5 spans and 5 traces were sent
128128
assert mock_post.call_count == 10, f"Expected 10 HTTP requests, but got {mock_post.call_count}"

0 commit comments

Comments
 (0)