Skip to content

Commit 7edbae1

Browse files
committed
Refactor process_chunk
1 parent bd560f7 commit 7edbae1

File tree

1 file changed

+70
-29
lines changed
  • src/langtrace_python_sdk/utils

1 file changed

+70
-29
lines changed

src/langtrace_python_sdk/utils/llm.py

Lines changed: 70 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -235,8 +235,9 @@ class StreamWrapper:
235235
span: Span
236236

237237
def __init__(
238-
self, stream, span, prompt_tokens, function_call=False, tool_calls=False
238+
self, stream, span, prompt_tokens=0, function_call=False, tool_calls=False
239239
):
240+
240241
self.stream = stream
241242
self.span = span
242243
self.prompt_tokens = prompt_tokens
@@ -245,6 +246,7 @@ def __init__(
245246
self.result_content = []
246247
self.completion_tokens = 0
247248
self._span_started = False
249+
self._response_model = None
248250
self.setup()
249251

250252
def setup(self):
@@ -253,6 +255,11 @@ def setup(self):
253255

254256
def cleanup(self):
255257
if self._span_started:
258+
set_span_attribute(
259+
self.span,
260+
SpanAttributes.LLM_RESPONSE_MODEL,
261+
self._response_model,
262+
)
256263
set_span_attribute(
257264
self.span,
258265
SpanAttributes.LLM_USAGE_PROMPT_TOKENS,
@@ -320,21 +327,40 @@ async def __anext__(self):
320327
self.cleanup()
321328
raise StopAsyncIteration
322329

323-
def process_chunk(self, chunk):
330+
def process_anthropic_chunk(self, chunk):
331+
if hasattr(chunk, "message") and chunk.message is not None:
332+
if self._response_model is None:
333+
self._response_model = chunk.message.model
334+
335+
self.prompt_tokens = chunk.message.usage.input_tokens
336+
337+
if hasattr(chunk, "usage") and chunk.usage is not None:
338+
self.completion_tokens = chunk.usage.output_tokens
339+
340+
if hasattr(chunk, "delta"):
341+
if hasattr(chunk.delta, "text") and chunk.delta.text is not None:
342+
self.build_streaming_response(chunk.delta.text)
343+
344+
def set_response_model(self, chunk):
345+
if self._response_model:
346+
return
347+
348+
# OpenAI response model is set on all chunks
324349
if hasattr(chunk, "model") and chunk.model is not None:
325-
set_span_attribute(
326-
self.span,
327-
SpanAttributes.LLM_RESPONSE_MODEL,
328-
chunk.model,
329-
)
350+
self._response_model = chunk.model
351+
352+
# Anthropic response model is set on the first chunk message
353+
if hasattr(chunk, "message") and chunk.message is not None:
354+
if hasattr(chunk.message, "model") and chunk.message.model is not None:
355+
self._response_model = chunk.message.model
330356

357+
def build_streaming_response(self, chunk):
358+
content = []
359+
# OpenAI
331360
if hasattr(chunk, "choices") and chunk.choices is not None:
332-
content = []
333361
if not self.function_call and not self.tool_calls:
334362
for choice in chunk.choices:
335363
if choice.delta and choice.delta.content is not None:
336-
token_counts = estimate_tokens(choice.delta.content)
337-
self.completion_tokens += token_counts
338364
content = [choice.delta.content]
339365
elif self.function_call:
340366
for choice in chunk.choices:
@@ -343,10 +369,6 @@ def process_chunk(self, chunk):
343369
and choice.delta.function_call is not None
344370
and choice.delta.function_call.arguments is not None
345371
):
346-
token_counts = estimate_tokens(
347-
choice.delta.function_call.arguments
348-
)
349-
self.completion_tokens += token_counts
350372
content = [choice.delta.function_call.arguments]
351373
elif self.tool_calls:
352374
for choice in chunk.choices:
@@ -359,26 +381,45 @@ def process_chunk(self, chunk):
359381
and tool_call.function is not None
360382
and tool_call.function.arguments is not None
361383
):
362-
token_counts = estimate_tokens(
363-
tool_call.function.arguments
364-
)
365-
self.completion_tokens += token_counts
366384
content.append(tool_call.function.arguments)
367-
if content:
368-
self.result_content.append(content[0])
369385

370-
if hasattr(chunk, "text"):
371-
token_counts = estimate_tokens(chunk.text)
372-
self.completion_tokens += token_counts
386+
# VertexAI
387+
if hasattr(chunk, "text") and chunk.text is not None:
373388
content = [chunk.text]
374-
set_event_completion_chunk(
375-
self.span,
376-
"".join(content) if len(content) > 0 and content[0] is not None else "",
377-
)
378389

379-
if content:
380-
self.result_content.append(content[0])
390+
# Anthropic
391+
if hasattr(chunk, "delta") and chunk.delta is not None:
392+
content = [chunk.delta.text] if hasattr(chunk.delta, "text") else []
393+
394+
if content:
395+
self.result_content.append(content[0])
396+
397+
def set_usage_attributes(self, chunk):
398+
399+
# Anthropic & OpenAI
400+
if hasattr(chunk, "type") and chunk.type == "message_start":
401+
self.prompt_tokens = chunk.message.usage.input_tokens
381402

403+
if hasattr(chunk, "usage") and chunk.usage is not None:
404+
if hasattr(chunk.usage, "output_tokens"):
405+
self.completion_tokens = chunk.usage.output_tokens
406+
407+
if hasattr(chunk.usage, "prompt_tokens"):
408+
self.prompt_tokens = chunk.usage.prompt_tokens
409+
410+
if hasattr(chunk.usage, "completion_tokens"):
411+
self.completion_tokens = chunk.usage.completion_tokens
412+
413+
# VertexAI
382414
if hasattr(chunk, "usage_metadata"):
383415
self.completion_tokens = chunk.usage_metadata.candidates_token_count
384416
self.prompt_tokens = chunk.usage_metadata.prompt_token_count
417+
418+
def process_chunk(self, chunk):
419+
420+
# 2. We save the completion text from the chunk
421+
# 3. We save the prompt + completions tokens from the chunk
422+
423+
self.set_response_model(chunk=chunk)
424+
self.build_streaming_response(chunk=chunk)
425+
self.set_usage_attributes(chunk=chunk)

0 commit comments

Comments
 (0)