Skip to content

Commit 6e7a123

Browse files
aryehkleinnirga
andauthored
fix(sdk): support overriding the span processor on_end hook (#2947)
Co-authored-by: Nir Gazit <[email protected]>
1 parent d7b3dcb commit 6e7a123

File tree

5 files changed

+270
-4
lines changed

5 files changed

+270
-4
lines changed
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
interactions:
2+
- request:
3+
body: '{"messages": [{"role": "user", "content": "Tell me a joke about opentelemetry"}],
4+
"model": "gpt-3.5-turbo"}'
5+
headers:
6+
accept:
7+
- application/json
8+
accept-encoding:
9+
- gzip, deflate
10+
connection:
11+
- keep-alive
12+
content-length:
13+
- '107'
14+
content-type:
15+
- application/json
16+
host:
17+
- api.openai.com
18+
user-agent:
19+
- OpenAI/Python 1.12.0
20+
x-stainless-arch:
21+
- arm64
22+
x-stainless-async:
23+
- 'false'
24+
x-stainless-lang:
25+
- python
26+
x-stainless-os:
27+
- MacOS
28+
x-stainless-package-version:
29+
- 1.12.0
30+
x-stainless-runtime:
31+
- CPython
32+
x-stainless-runtime-version:
33+
- 3.9.5
34+
method: POST
35+
uri: https://api.openai.com/v1/chat/completions
36+
response:
37+
body:
38+
string: !!binary |
39+
H4sIAAAAAAAAA1RRTUsDMRC9768Yc/HSyrbrWu1FVLQKfoKgIFLS7OxubJKJySxapP9d0tYWL3N4
40+
L+/Nm5efDEDoSoxBqFayst70j78+n267x9Hrw81DPRnd39QXd9dXz5PPb3t2KXpJQbMPVPynOlBk
41+
vUHW5Na0CigZk+tglJ/kRTksihVhqUKTZI3nfnFQ9rkLM+rng2G5UbakFUYxhrcMAOBnNVNGV+G3
42+
GEPe+0MsxigbFOPtIwARyCREyBh1ZOlY9HakIsfoVrFf2gVUugJuEXygJkhrMcAsoJxD5+FLcwvk
43+
0TEatMhhcQrnqGQXETSDos5Ubp8hIlpgAg5SIUhoiCpQ5ByqVMee2GxfbmMbanygWTrRdcZs8Vo7
44+
HdtpQBnJpYiRya/lywzgfVVP9+9i4QNZz1OmObpkOCjXdmL3ITtyONyQTCzNDi9G2SafiIvIaKe1
45+
dg0GH/S6q9pPj48G5ZE8OZS5yJbZLwAAAP//AwAUd8GRNQIAAA==
46+
headers:
47+
CF-Cache-Status:
48+
- DYNAMIC
49+
CF-RAY:
50+
- 85c044a25bdc0d6e-MXP
51+
Cache-Control:
52+
- no-cache, must-revalidate
53+
Connection:
54+
- keep-alive
55+
Content-Encoding:
56+
- gzip
57+
Content-Type:
58+
- application/json
59+
Date:
60+
- Tue, 27 Feb 2024 12:00:34 GMT
61+
Server:
62+
- cloudflare
63+
Set-Cookie:
64+
- __cf_bm=J7CW3gXyUth9bXxP62KOXJBT9fqPtQCn6rOtIOYSOms-1709035234-1.0-Af3zRwkM02ElV8pGlA2ndZpn9K5kxgF0BOjGNlpNg3Dv/qKUKDqX5KjIeb/o2pyy3ZD0WS15+EM040L1eN/yQ4s=;
65+
path=/; expires=Tue, 27-Feb-24 12:30:34 GMT; domain=.api.openai.com; HttpOnly;
66+
Secure; SameSite=None
67+
- _cfuvid=tr0R1O18cmcX2Q2PFjB12pv_1Mu4R.MQAHgSyTr7lfY-1709035234516-0.0-604800000;
68+
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
69+
Transfer-Encoding:
70+
- chunked
71+
access-control-allow-origin:
72+
- '*'
73+
alt-svc:
74+
- h3=":443"; ma=86400
75+
openai-model:
76+
- gpt-3.5-turbo-0125
77+
openai-organization:
78+
- traceloop
79+
openai-processing-ms:
80+
- '410'
81+
openai-version:
82+
- '2020-10-01'
83+
strict-transport-security:
84+
- max-age=15724800; includeSubDomains
85+
x-ratelimit-limit-requests:
86+
- '5000'
87+
x-ratelimit-limit-tokens:
88+
- '160000'
89+
x-ratelimit-remaining-requests:
90+
- '4999'
91+
x-ratelimit-remaining-tokens:
92+
- '159974'
93+
x-ratelimit-reset-requests:
94+
- 12ms
95+
x-ratelimit-reset-tokens:
96+
- 9ms
97+
x-request-id:
98+
- req_008b3333db5c78b9ded9415f11929844
99+
status:
100+
code: 200
101+
message: OK
102+
- request:
103+
body: '{"messages": [{"role": "user", "content": "Tell me a joke about opentelemetry"}],
104+
"model": "gpt-3.5-turbo"}'
105+
headers:
106+
accept:
107+
- application/json
108+
accept-encoding:
109+
- gzip, deflate
110+
connection:
111+
- keep-alive
112+
content-length:
113+
- '107'
114+
content-type:
115+
- application/json
116+
cookie:
117+
- __cf_bm=J7CW3gXyUth9bXxP62KOXJBT9fqPtQCn6rOtIOYSOms-1709035234-1.0-Af3zRwkM02ElV8pGlA2ndZpn9K5kxgF0BOjGNlpNg3Dv/qKUKDqX5KjIeb/o2pyy3ZD0WS15+EM040L1eN/yQ4s=;
118+
_cfuvid=tr0R1O18cmcX2Q2PFjB12pv_1Mu4R.MQAHgSyTr7lfY-1709035234516-0.0-604800000
119+
host:
120+
- api.openai.com
121+
user-agent:
122+
- OpenAI/Python 1.12.0
123+
x-stainless-arch:
124+
- arm64
125+
x-stainless-async:
126+
- 'false'
127+
x-stainless-lang:
128+
- python
129+
x-stainless-os:
130+
- MacOS
131+
x-stainless-package-version:
132+
- 1.12.0
133+
x-stainless-runtime:
134+
- CPython
135+
x-stainless-runtime-version:
136+
- 3.9.5
137+
method: POST
138+
uri: https://api.openai.com/v1/chat/completions
139+
response:
140+
body:
141+
string: !!binary |
142+
H4sIAAAAAAAAA1RRXWsbMRB8v1+x1bMdzj5favslEEpo6DcEUmiLkaX1nWKdVl2t45jg/15059j0
143+
RYiZnWFm97UAUM6qJSjTajFd9OP5/u+PL3d384eX5uHD/fb7x2f++s192n9uqsefapQVtH5CI2+q
144+
K0Nd9CiOwkAbRi2YXSfvy0VZ1dNq1hMdWfRZ1kQZV1f1WHa8pnE5mdYnZUvOYFJL+FUAALz2b84Y
145+
LL6oJZSjN6TDlHSDankeAlBMPiNKp+SS6CBqdCENBcHQx35sD2CdBWkRKGIQ9Nih8AEsPqOniAxr
146+
dqEBDV5biwxCsCfe3sDvcItG7xJm9QFa1GyHb0C0aPOksDY97xiSYEzv1CnH8VzAUxOZ1rls2Hl/
147+
xjcuuNSuGHWikMMmoTjIjwXAn35Ru/+6q8jURVkJbTFkw0k92KnLaS7kdHYihUT7C14tilM+lQ5J
148+
sFttXGiQI7tha5u4ml9P6mu9mOlSFcfiHwAAAP//AwB+6qFIPwIAAA==
149+
headers:
150+
CF-Cache-Status:
151+
- DYNAMIC
152+
CF-RAY:
153+
- 85c044a7edf50d6e-MXP
154+
Cache-Control:
155+
- no-cache, must-revalidate
156+
Connection:
157+
- keep-alive
158+
Content-Encoding:
159+
- gzip
160+
Content-Type:
161+
- application/json
162+
Date:
163+
- Tue, 27 Feb 2024 12:00:35 GMT
164+
Server:
165+
- cloudflare
166+
Transfer-Encoding:
167+
- chunked
168+
access-control-allow-origin:
169+
- '*'
170+
alt-svc:
171+
- h3=":443"; ma=86400
172+
openai-model:
173+
- gpt-3.5-turbo-0125
174+
openai-organization:
175+
- traceloop
176+
openai-processing-ms:
177+
- '750'
178+
openai-version:
179+
- '2020-10-01'
180+
strict-transport-security:
181+
- max-age=15724800; includeSubDomains
182+
x-ratelimit-limit-requests:
183+
- '5000'
184+
x-ratelimit-limit-tokens:
185+
- '160000'
186+
x-ratelimit-remaining-requests:
187+
- '4999'
188+
x-ratelimit-remaining-tokens:
189+
- '159974'
190+
x-ratelimit-reset-requests:
191+
- 12ms
192+
x-ratelimit-reset-tokens:
193+
- 9ms
194+
x-request-id:
195+
- req_bf1ec13b40c7a03dd17dcca80cbd9031
196+
status:
197+
code: 200
198+
message: OK
199+
version: 1

packages/traceloop-sdk/tests/conftest.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
"""Unit tests configuration module."""
22

33
import os
4+
import re
45
import pytest
56
from traceloop.sdk import Traceloop
67
from traceloop.sdk.instruments import Instruments
78
from traceloop.sdk.tracing.tracing import TracerWrapper
8-
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
9+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, BatchSpanProcessor
910
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
1011
from opentelemetry.context import attach, Context
12+
from opentelemetry.sdk.trace import ReadableSpan
13+
1114
pytest_plugins = []
1215

1316

@@ -70,6 +73,44 @@ def on_start(self, span, parent_context=None):
7073
TracerWrapper.instance = _trace_wrapper_instance
7174

7275

76+
@pytest.fixture(scope="function")
77+
def exporter_with_custom_span_postprocess_callback(exporter):
78+
79+
if hasattr(TracerWrapper, "instance"):
80+
_trace_wrapper_instance = TracerWrapper.instance
81+
del TracerWrapper.instance
82+
83+
def span_postprocess_callback(span: ReadableSpan) -> None:
84+
prompt_pattern = re.compile(r"gen_ai\.prompt\.\d+\.content$")
85+
completion_pattern = re.compile(r"gen_ai\.completion\.\d+\.content$")
86+
if hasattr(span, "_attributes"):
87+
attributes = span._attributes if span._attributes else {}
88+
# Find and encode all matching attributes
89+
for key, value in attributes.items():
90+
if (prompt_pattern.match(key) or completion_pattern.match(key)) and isinstance(value, str):
91+
attributes[key] = "REDACTED" # Modify the attributes directly
92+
93+
Traceloop.init(
94+
exporter=exporter,
95+
span_postprocess_callback=span_postprocess_callback,
96+
)
97+
98+
yield exporter
99+
100+
if hasattr(TracerWrapper, "instance"):
101+
# Get the span processor
102+
if hasattr(TracerWrapper.instance, "_TracerWrapper__spans_processor"):
103+
span_processor = TracerWrapper.instance._TracerWrapper__spans_processor
104+
# Reset the on_end method to its original class implementation.
105+
# This is needed to make this test run in isolation as SpanProcessor is a singleton.
106+
if isinstance(span_processor, SimpleSpanProcessor):
107+
span_processor.on_end = SimpleSpanProcessor.on_end.__get__(span_processor, SimpleSpanProcessor)
108+
elif isinstance(span_processor, BatchSpanProcessor):
109+
span_processor.on_end = BatchSpanProcessor.on_end.__get__(span_processor, BatchSpanProcessor)
110+
if _trace_wrapper_instance:
111+
TracerWrapper.instance = _trace_wrapper_instance
112+
113+
73114
@pytest.fixture
74115
def exporter_with_custom_instrumentations():
75116
# Clear singleton if existed

packages/traceloop-sdk/tests/test_sdk_initialization.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,19 @@ def run_workflow():
3333
assert workflow_span.attributes["custom_span"] == "yes"
3434

3535

36+
@pytest.mark.vcr
37+
def test_span_postprocess_callback(exporter_with_custom_span_postprocess_callback, openai_client):
38+
openai_client.chat.completions.create(
39+
model="gpt-3.5-turbo",
40+
messages=[{"role": "user", "content": "Tell me a joke about opentelemetry"}],
41+
)
42+
43+
spans = exporter_with_custom_span_postprocess_callback.get_finished_spans()
44+
open_ai_span = spans[0]
45+
assert open_ai_span.attributes["gen_ai.prompt.0.content"] == "REDACTED"
46+
assert open_ai_span.attributes["gen_ai.completion.0.content"] == "REDACTED"
47+
48+
3649
def test_instruments(exporter_with_custom_instrumentations):
3750
@workflow()
3851
def run_workflow():

packages/traceloop-sdk/traceloop/sdk/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
import sys
33
from pathlib import Path
44

5-
from typing import Optional, Set
5+
from typing import Callable, Optional, Set
66
from colorama import Fore
7-
from opentelemetry.sdk.trace import SpanProcessor
7+
from opentelemetry.sdk.trace import SpanProcessor, ReadableSpan
88
from opentelemetry.sdk.trace.export import SpanExporter
99
from opentelemetry.sdk.metrics.export import MetricExporter
1010
from opentelemetry.sdk._logs.export import LogExporter
@@ -66,6 +66,7 @@ def init(
6666
instruments: Optional[Set[Instruments]] = None,
6767
block_instruments: Optional[Set[Instruments]] = None,
6868
image_uploader: Optional[ImageUploader] = None,
69+
span_postprocess_callback: Optional[Callable[[ReadableSpan], None]] = None,
6970
) -> Optional[Client]:
7071
if not enabled:
7172
TracerWrapper.set_disabled(True)
@@ -147,6 +148,7 @@ def init(
147148
image_uploader=image_uploader or ImageUploader(api_endpoint, api_key),
148149
instruments=instruments,
149150
block_instruments=block_instruments,
151+
span_postprocess_callback=span_postprocess_callback,
150152
)
151153

152154
if not is_metrics_enabled() or not metrics_exporter and exporter:

packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
OTLPSpanExporter as GRPCExporter,
1313
)
1414
from opentelemetry.sdk.resources import Resource
15-
from opentelemetry.sdk.trace import TracerProvider, SpanProcessor
15+
from opentelemetry.sdk.trace import TracerProvider, SpanProcessor, ReadableSpan
1616
from opentelemetry.propagators.textmap import TextMapPropagator
1717
from opentelemetry.propagate import set_global_textmap
1818
from opentelemetry.sdk.trace.export import (
@@ -73,6 +73,7 @@ def __new__(
7373
instruments: Optional[Set[Instruments]] = None,
7474
block_instruments: Optional[Set[Instruments]] = None,
7575
image_uploader: ImageUploader = None,
76+
span_postprocess_callback: Optional[Callable[[ReadableSpan], None]] = None,
7677
) -> "TracerWrapper":
7778
if not hasattr(cls, "instance"):
7879
obj = cls.instance = super(TracerWrapper, cls).__new__(cls)
@@ -120,6 +121,16 @@ def __new__(
120121
obj.__spans_exporter
121122
)
122123
obj.__spans_processor_original_on_start = None
124+
if span_postprocess_callback:
125+
# Create a wrapper that calls both the custom and original methods
126+
original_on_end = obj.__spans_processor.on_end
127+
128+
def wrapped_on_end(span):
129+
# Call the custom on_end first
130+
span_postprocess_callback(span)
131+
# Then call the original to ensure normal processing
132+
original_on_end(span)
133+
obj.__spans_processor.on_end = wrapped_on_end
123134

124135
obj.__spans_processor.on_start = obj._span_processor_on_start
125136
obj.__tracer_provider.add_span_processor(obj.__spans_processor)

0 commit comments

Comments
 (0)