@@ -28,17 +28,14 @@ class AnthropicStreamWrapper(AdapterCompletionStreamWrapper):
28
28
TextBlock ,
29
29
)
30
30
31
- def __init__ (self , completion_stream : Any , model : str ):
32
- super ().__init__ (completion_stream )
33
- self .model = model
34
-
35
31
sent_first_chunk : bool = False
36
32
sent_content_block_start : bool = False
37
33
sent_content_block_finish : bool = False
38
34
current_content_block_type : Literal ["text" , "tool_use" ] = "text"
39
35
sent_last_message : bool = False
40
36
holding_chunk : Optional [Any ] = None
41
37
holding_stop_reason_chunk : Optional [Any ] = None
38
+ queued_usage_chunk : bool = False
42
39
current_content_block_index : int = 0
43
40
current_content_block_start : ContentBlockContentBlockDict = TextBlock (
44
41
type = "text" ,
@@ -47,6 +44,10 @@ def __init__(self, completion_stream: Any, model: str):
47
44
pending_new_content_block : bool = False
48
45
chunk_queue : deque = deque () # Queue for buffering multiple chunks
49
46
47
+ def __init__ (self , completion_stream : Any , model : str ):
48
+ super ().__init__ (completion_stream )
49
+ self .model = model
50
+
50
51
def __next__ (self ):
51
52
from .transformation import LiteLLMAnthropicMessagesAdapter
52
53
@@ -217,77 +218,83 @@ async def __anext__(self): # noqa: PLR0915
217
218
218
219
# Queue the merged chunk and reset
219
220
self .chunk_queue .append (merged_chunk )
221
+ self .queued_usage_chunk = True
220
222
self .holding_stop_reason_chunk = None
221
223
return self .chunk_queue .popleft ()
222
224
223
225
# Check if this processed chunk has a stop_reason - hold it for next chunk
224
226
225
- if should_start_new_block and not self .sent_content_block_finish :
226
- # Queue the sequence: content_block_stop -> content_block_start -> current_chunk
227
-
228
- # 1. Stop current content block
229
- self .chunk_queue .append (
230
- {
231
- "type" : "content_block_stop" ,
232
- "index" : max (self .current_content_block_index - 1 , 0 ),
233
- }
234
- )
235
-
236
- # 2. Start new content block
237
- self .chunk_queue .append (
238
- {
239
- "type" : "content_block_start" ,
240
- "index" : self .current_content_block_index ,
241
- "content_block" : self .current_content_block_start ,
242
- }
243
- )
244
-
245
- # 3. Queue the current chunk (don't lose it!)
246
- self .chunk_queue .append (processed_chunk )
247
-
248
- # Reset state for new block
249
- self .sent_content_block_finish = False
250
-
251
- # Return the first queued item
252
- return self .chunk_queue .popleft ()
253
-
254
- if (
255
- processed_chunk ["type" ] == "message_delta"
256
- and self .sent_content_block_finish is False
257
- ):
258
- # Queue both the content_block_stop and the holding chunk
259
- self .chunk_queue .append (
260
- {
261
- "type" : "content_block_stop" ,
262
- "index" : self .current_content_block_index ,
263
- }
264
- )
265
- self .sent_content_block_finish = True
266
- if processed_chunk .get ("delta" , {}).get ("stop_reason" ) is not None :
227
+ if not self .queued_usage_chunk :
228
+ if should_start_new_block and not self .sent_content_block_finish :
229
+ # Queue the sequence: content_block_stop -> content_block_start -> current_chunk
230
+
231
+ # 1. Stop current content block
232
+ self .chunk_queue .append (
233
+ {
234
+ "type" : "content_block_stop" ,
235
+ "index" : max (self .current_content_block_index - 1 , 0 ),
236
+ }
237
+ )
238
+
239
+ # 2. Start new content block
240
+ self .chunk_queue .append (
241
+ {
242
+ "type" : "content_block_start" ,
243
+ "index" : self .current_content_block_index ,
244
+ "content_block" : self .current_content_block_start ,
245
+ }
246
+ )
247
+
248
+ # 3. Queue the current chunk (don't lose it!)
249
+ self .chunk_queue .append (processed_chunk )
267
250
268
- self .holding_stop_reason_chunk = processed_chunk
251
+ # Reset state for new block
252
+ self .sent_content_block_finish = False
253
+
254
+ # Return the first queued item
255
+ return self .chunk_queue .popleft ()
256
+
257
+ if (
258
+ processed_chunk ["type" ] == "message_delta"
259
+ and self .sent_content_block_finish is False
260
+ ):
261
+ # Queue both the content_block_stop and the holding chunk
262
+ self .chunk_queue .append (
263
+ {
264
+ "type" : "content_block_stop" ,
265
+ "index" : self .current_content_block_index ,
266
+ }
267
+ )
268
+ self .sent_content_block_finish = True
269
+ if (
270
+ processed_chunk .get ("delta" , {}).get ("stop_reason" )
271
+ is not None
272
+ ):
273
+
274
+ self .holding_stop_reason_chunk = processed_chunk
275
+ else :
276
+ self .chunk_queue .append (processed_chunk )
277
+ return self .chunk_queue .popleft ()
278
+ elif self .holding_chunk is not None :
279
+ # Queue both chunks
280
+ self .chunk_queue .append (self .holding_chunk )
281
+ self .chunk_queue .append (processed_chunk )
282
+ self .holding_chunk = None
283
+ return self .chunk_queue .popleft ()
269
284
else :
285
+ # Queue the current chunk
270
286
self .chunk_queue .append (processed_chunk )
271
- return self .chunk_queue .popleft ()
272
- elif self .holding_chunk is not None :
273
- # Queue both chunks
274
- self .chunk_queue .append (self .holding_chunk )
275
- self .chunk_queue .append (processed_chunk )
276
- self .holding_chunk = None
277
- return self .chunk_queue .popleft ()
278
- else :
279
- # Queue the current chunk
280
- self .chunk_queue .append (processed_chunk )
281
- return self .chunk_queue .popleft ()
287
+ return self .chunk_queue .popleft ()
282
288
283
289
# Handle any remaining held chunks after stream ends
284
- if self .holding_stop_reason_chunk is not None :
285
- self .chunk_queue .append (self .holding_stop_reason_chunk )
286
- self .holding_stop_reason_chunk = None
290
+ if not self .queued_usage_chunk :
291
+ if self .holding_stop_reason_chunk is not None :
292
+ self .chunk_queue .append (self .holding_stop_reason_chunk )
293
+ self .holding_stop_reason_chunk = None
287
294
288
- if self .holding_chunk is not None :
289
- self .chunk_queue .append (self .holding_chunk )
290
- self .holding_chunk = None
295
+ if self .holding_chunk is not None :
296
+ self .chunk_queue .append (self .holding_chunk )
297
+ self .holding_chunk = None
291
298
292
299
if not self .sent_last_message :
293
300
self .sent_last_message = True
0 commit comments