Skip to content

Commit 248b3d2

Browse files
fix(openai): propagate span IDs properly to events (#3243)
1 parent 3926524 commit 248b3d2

File tree

4 files changed

+380
-301
lines changed

4 files changed

+380
-301
lines changed

packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py

Lines changed: 133 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
SpanAttributes,
4949
)
5050
from opentelemetry.trace import SpanKind, Tracer
51+
from opentelemetry import trace
5152
from opentelemetry.trace.status import Status, StatusCode
5253
from wrapt import ObjectProxy
5354

@@ -86,75 +87,77 @@ def chat_wrapper(
8687
attributes={SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value},
8788
)
8889

89-
run_async(_handle_request(span, kwargs, instance))
90-
try:
91-
start_time = time.time()
92-
response = wrapped(*args, **kwargs)
93-
end_time = time.time()
94-
except Exception as e: # pylint: disable=broad-except
95-
end_time = time.time()
96-
duration = end_time - start_time if "start_time" in locals() else 0
97-
98-
attributes = {
99-
"error.type": e.__class__.__name__,
100-
}
101-
102-
if duration > 0 and duration_histogram:
103-
duration_histogram.record(duration, attributes=attributes)
104-
if exception_counter:
105-
exception_counter.add(1, attributes=attributes)
106-
107-
span.set_attribute(ERROR_TYPE, e.__class__.__name__)
108-
span.record_exception(e)
109-
span.set_status(Status(StatusCode.ERROR, str(e)))
110-
span.end()
90+
# Use the span as current context to ensure events get proper trace context
91+
with trace.use_span(span, end_on_exit=False):
92+
run_async(_handle_request(span, kwargs, instance))
93+
try:
94+
start_time = time.time()
95+
response = wrapped(*args, **kwargs)
96+
end_time = time.time()
97+
except Exception as e: # pylint: disable=broad-except
98+
end_time = time.time()
99+
duration = end_time - start_time if "start_time" in locals() else 0
100+
101+
attributes = {
102+
"error.type": e.__class__.__name__,
103+
}
111104

112-
raise
105+
if duration > 0 and duration_histogram:
106+
duration_histogram.record(duration, attributes=attributes)
107+
if exception_counter:
108+
exception_counter.add(1, attributes=attributes)
113109

114-
if is_streaming_response(response):
115-
# span will be closed after the generator is done
116-
if is_openai_v1():
117-
return ChatStream(
118-
span,
119-
response,
120-
instance,
121-
token_counter,
122-
choice_counter,
123-
duration_histogram,
124-
streaming_time_to_first_token,
125-
streaming_time_to_generate,
126-
start_time,
127-
kwargs,
128-
)
129-
else:
130-
return _build_from_streaming_response(
131-
span,
132-
response,
133-
instance,
134-
token_counter,
135-
choice_counter,
136-
duration_histogram,
137-
streaming_time_to_first_token,
138-
streaming_time_to_generate,
139-
start_time,
140-
kwargs,
141-
)
110+
span.set_attribute(ERROR_TYPE, e.__class__.__name__)
111+
span.record_exception(e)
112+
span.set_status(Status(StatusCode.ERROR, str(e)))
113+
span.end()
142114

143-
duration = end_time - start_time
115+
raise
144116

145-
_handle_response(
146-
response,
147-
span,
148-
instance,
149-
token_counter,
150-
choice_counter,
151-
duration_histogram,
152-
duration,
153-
)
117+
if is_streaming_response(response):
118+
# span will be closed after the generator is done
119+
if is_openai_v1():
120+
return ChatStream(
121+
span,
122+
response,
123+
instance,
124+
token_counter,
125+
choice_counter,
126+
duration_histogram,
127+
streaming_time_to_first_token,
128+
streaming_time_to_generate,
129+
start_time,
130+
kwargs,
131+
)
132+
else:
133+
return _build_from_streaming_response(
134+
span,
135+
response,
136+
instance,
137+
token_counter,
138+
choice_counter,
139+
duration_histogram,
140+
streaming_time_to_first_token,
141+
streaming_time_to_generate,
142+
start_time,
143+
kwargs,
144+
)
154145

155-
span.end()
146+
duration = end_time - start_time
156147

157-
return response
148+
_handle_response(
149+
response,
150+
span,
151+
instance,
152+
token_counter,
153+
choice_counter,
154+
duration_histogram,
155+
duration,
156+
)
157+
158+
span.end()
159+
160+
return response
158161

159162

160163
@_with_chat_telemetry_wrapper
@@ -182,78 +185,80 @@ async def achat_wrapper(
182185
attributes={SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value},
183186
)
184187

185-
await _handle_request(span, kwargs, instance)
188+
# Use the span as current context to ensure events get proper trace context
189+
with trace.use_span(span, end_on_exit=False):
190+
await _handle_request(span, kwargs, instance)
186191

187-
try:
188-
start_time = time.time()
189-
response = await wrapped(*args, **kwargs)
190-
end_time = time.time()
191-
except Exception as e: # pylint: disable=broad-except
192-
end_time = time.time()
193-
duration = end_time - start_time if "start_time" in locals() else 0
194-
195-
common_attributes = Config.get_common_metrics_attributes()
196-
attributes = {
197-
**common_attributes,
198-
"error.type": e.__class__.__name__,
199-
}
200-
201-
if duration > 0 and duration_histogram:
202-
duration_histogram.record(duration, attributes=attributes)
203-
if exception_counter:
204-
exception_counter.add(1, attributes=attributes)
205-
206-
span.set_attribute(ERROR_TYPE, e.__class__.__name__)
207-
span.record_exception(e)
208-
span.set_status(Status(StatusCode.ERROR, str(e)))
209-
span.end()
192+
try:
193+
start_time = time.time()
194+
response = await wrapped(*args, **kwargs)
195+
end_time = time.time()
196+
except Exception as e: # pylint: disable=broad-except
197+
end_time = time.time()
198+
duration = end_time - start_time if "start_time" in locals() else 0
199+
200+
common_attributes = Config.get_common_metrics_attributes()
201+
attributes = {
202+
**common_attributes,
203+
"error.type": e.__class__.__name__,
204+
}
210205

211-
raise
206+
if duration > 0 and duration_histogram:
207+
duration_histogram.record(duration, attributes=attributes)
208+
if exception_counter:
209+
exception_counter.add(1, attributes=attributes)
212210

213-
if is_streaming_response(response):
214-
# span will be closed after the generator is done
215-
if is_openai_v1():
216-
return ChatStream(
217-
span,
218-
response,
219-
instance,
220-
token_counter,
221-
choice_counter,
222-
duration_histogram,
223-
streaming_time_to_first_token,
224-
streaming_time_to_generate,
225-
start_time,
226-
kwargs,
227-
)
228-
else:
229-
return _abuild_from_streaming_response(
230-
span,
231-
response,
232-
instance,
233-
token_counter,
234-
choice_counter,
235-
duration_histogram,
236-
streaming_time_to_first_token,
237-
streaming_time_to_generate,
238-
start_time,
239-
kwargs,
240-
)
211+
span.set_attribute(ERROR_TYPE, e.__class__.__name__)
212+
span.record_exception(e)
213+
span.set_status(Status(StatusCode.ERROR, str(e)))
214+
span.end()
241215

242-
duration = end_time - start_time
216+
raise
243217

244-
_handle_response(
245-
response,
246-
span,
247-
instance,
248-
token_counter,
249-
choice_counter,
250-
duration_histogram,
251-
duration,
252-
)
218+
if is_streaming_response(response):
219+
# span will be closed after the generator is done
220+
if is_openai_v1():
221+
return ChatStream(
222+
span,
223+
response,
224+
instance,
225+
token_counter,
226+
choice_counter,
227+
duration_histogram,
228+
streaming_time_to_first_token,
229+
streaming_time_to_generate,
230+
start_time,
231+
kwargs,
232+
)
233+
else:
234+
return _abuild_from_streaming_response(
235+
span,
236+
response,
237+
instance,
238+
token_counter,
239+
choice_counter,
240+
duration_histogram,
241+
streaming_time_to_first_token,
242+
streaming_time_to_generate,
243+
start_time,
244+
kwargs,
245+
)
253246

254-
span.end()
247+
duration = end_time - start_time
255248

256-
return response
249+
_handle_response(
250+
response,
251+
span,
252+
instance,
253+
token_counter,
254+
choice_counter,
255+
duration_histogram,
256+
duration,
257+
)
258+
259+
span.end()
260+
261+
return response
257262

258263

259264
@dont_throw

packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py

Lines changed: 39 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22

33
from opentelemetry import context as context_api
4+
from opentelemetry import trace
45
from opentelemetry.instrumentation.openai.shared import (
56
_set_client_attributes,
67
_set_functions_attributes,
@@ -55,25 +56,27 @@ def completion_wrapper(tracer, wrapped, instance, args, kwargs):
5556
attributes={SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value},
5657
)
5758

58-
_handle_request(span, kwargs, instance)
59+
# Use the span as current context to ensure events get proper trace context
60+
with trace.use_span(span, end_on_exit=False):
61+
_handle_request(span, kwargs, instance)
62+
63+
try:
64+
response = wrapped(*args, **kwargs)
65+
except Exception as e:
66+
span.set_attribute(ERROR_TYPE, e.__class__.__name__)
67+
span.record_exception(e)
68+
span.set_status(Status(StatusCode.ERROR, str(e)))
69+
span.end()
70+
raise
71+
72+
if is_streaming_response(response):
73+
# span will be closed after the generator is done
74+
return _build_from_streaming_response(span, kwargs, response)
75+
else:
76+
_handle_response(response, span, instance)
5977

60-
try:
61-
response = wrapped(*args, **kwargs)
62-
except Exception as e:
63-
span.set_attribute(ERROR_TYPE, e.__class__.__name__)
64-
span.record_exception(e)
65-
span.set_status(Status(StatusCode.ERROR, str(e)))
6678
span.end()
67-
raise
68-
69-
if is_streaming_response(response):
70-
# span will be closed after the generator is done
71-
return _build_from_streaming_response(span, kwargs, response)
72-
else:
73-
_handle_response(response, span, instance)
74-
75-
span.end()
76-
return response
79+
return response
7780

7881

7982
@_with_tracer_wrapper
@@ -89,25 +92,27 @@ async def acompletion_wrapper(tracer, wrapped, instance, args, kwargs):
8992
attributes={SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value},
9093
)
9194

92-
_handle_request(span, kwargs, instance)
95+
# Use the span as current context to ensure events get proper trace context
96+
with trace.use_span(span, end_on_exit=False):
97+
_handle_request(span, kwargs, instance)
98+
99+
try:
100+
response = await wrapped(*args, **kwargs)
101+
except Exception as e:
102+
span.set_attribute(ERROR_TYPE, e.__class__.__name__)
103+
span.record_exception(e)
104+
span.set_status(Status(StatusCode.ERROR, str(e)))
105+
span.end()
106+
raise
107+
108+
if is_streaming_response(response):
109+
# span will be closed after the generator is done
110+
return _abuild_from_streaming_response(span, kwargs, response)
111+
else:
112+
_handle_response(response, span, instance)
93113

94-
try:
95-
response = await wrapped(*args, **kwargs)
96-
except Exception as e:
97-
span.set_attribute(ERROR_TYPE, e.__class__.__name__)
98-
span.record_exception(e)
99-
span.set_status(Status(StatusCode.ERROR, str(e)))
100114
span.end()
101-
raise
102-
103-
if is_streaming_response(response):
104-
# span will be closed after the generator is done
105-
return _abuild_from_streaming_response(span, kwargs, response)
106-
else:
107-
_handle_response(response, span, instance)
108-
109-
span.end()
110-
return response
115+
return response
111116

112117

113118
@dont_throw

0 commit comments

Comments
 (0)