|
17 | 17 | import json |
18 | 18 |
|
19 | 19 | from langtrace.trace_attributes import Event, LLMSpanAttributes |
20 | | -from langtrace_python_sdk.utils import set_span_attribute, silently_fail |
| 20 | +from langtrace_python_sdk.utils import set_span_attribute |
| 21 | +from langtrace_python_sdk.utils.silently_fail import silently_fail |
| 22 | + |
21 | 23 | from langtrace_python_sdk.utils.llm import ( |
| 24 | + StreamWrapper, |
22 | 25 | get_extra_attributes, |
23 | 26 | get_langtrace_attributes, |
24 | 27 | get_llm_request_attributes, |
25 | 28 | get_llm_url, |
26 | 29 | get_span_name, |
27 | 30 | is_streaming, |
28 | 31 | set_event_completion, |
29 | | - set_event_completion_chunk, |
30 | 32 | set_usage_attributes, |
31 | 33 | ) |
32 | 34 | from opentelemetry.trace import SpanKind |
@@ -83,61 +85,7 @@ def traced_method(wrapped, instance, args, kwargs): |
83 | 85 | span.end() |
84 | 86 | raise |
85 | 87 |
|
86 | | - def handle_streaming_response(result, span): |
87 | | - """Process and yield streaming response chunks.""" |
88 | | - result_content = [] |
89 | | - span.add_event(Event.STREAM_START.value) |
90 | | - input_tokens = 0 |
91 | | - output_tokens = 0 |
92 | | - try: |
93 | | - for chunk in result: |
94 | | - if ( |
95 | | - hasattr(chunk, "message") |
96 | | - and chunk.message is not None |
97 | | - and hasattr(chunk.message, "model") |
98 | | - and chunk.message.model is not None |
99 | | - ): |
100 | | - span.set_attribute( |
101 | | - SpanAttributes.LLM_RESPONSE_MODEL, chunk.message.model |
102 | | - ) |
103 | | - content = "" |
104 | | - if hasattr(chunk, "delta") and chunk.delta is not None: |
105 | | - content = chunk.delta.text if hasattr(chunk.delta, "text") else "" |
106 | | - # Assuming content needs to be aggregated before processing |
107 | | - result_content.append(content if len(content) > 0 else "") |
108 | | - |
109 | | - if hasattr(chunk, "message") and hasattr(chunk.message, "usage"): |
110 | | - input_tokens += ( |
111 | | - chunk.message.usage.input_tokens |
112 | | - if hasattr(chunk.message.usage, "input_tokens") |
113 | | - else 0 |
114 | | - ) |
115 | | - output_tokens += ( |
116 | | - chunk.message.usage.output_tokens |
117 | | - if hasattr(chunk.message.usage, "output_tokens") |
118 | | - else 0 |
119 | | - ) |
120 | | - |
121 | | - # Assuming span.add_event is part of a larger logging or event system |
122 | | - # Add event for each chunk of content |
123 | | - if content: |
124 | | - set_event_completion_chunk(span, "".join(content)) |
125 | | - |
126 | | - # Assuming this is part of a generator, yield chunk or aggregated content |
127 | | - yield content |
128 | | - finally: |
129 | | - |
130 | | - # Finalize span after processing all chunks |
131 | | - span.add_event(Event.STREAM_END.value) |
132 | | - set_usage_attributes( |
133 | | - span, {"input_tokens": input_tokens, "output_tokens": output_tokens} |
134 | | - ) |
135 | | - completion = [{"role": "assistant", "content": "".join(result_content)}] |
136 | | - set_event_completion(span, completion) |
137 | | - |
138 | | - span.set_status(StatusCode.OK) |
139 | | - span.end() |
140 | | - |
| 88 | + @silently_fail |
141 | 89 | def set_response_attributes(result, span, kwargs): |
142 | 90 | if not is_streaming(kwargs): |
143 | 91 | if hasattr(result, "content") and result.content is not None: |
@@ -174,7 +122,7 @@ def set_response_attributes(result, span, kwargs): |
174 | 122 | span.end() |
175 | 123 | return result |
176 | 124 | else: |
177 | | - return handle_streaming_response(result, span) |
| 125 | + return StreamWrapper(result, span) |
178 | 126 |
|
179 | 127 | # return the wrapped method |
180 | 128 | return traced_method |
0 commit comments