Skip to content

Commit b901d2d

Browse files
committed
Merge branch 'feature/v4-update-integration' into feature/v4
2 parents 35a5d2c + bcd1e98 commit b901d2d

File tree

3 files changed

+150
-9
lines changed

3 files changed

+150
-9
lines changed

src/llm/src/claude/client.lua

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ function claude_client.process_stream(stream_response, callbacks)
236236
local finish_reason = nil
237237
local usage = {}
238238
local content_blocks = {}
239+
local leftover = ""
239240

240241
while true do
241242
local chunk, err = stream_response.stream:read()
@@ -253,7 +254,31 @@ function claude_client.process_stream(stream_response, callbacks)
253254
goto continue
254255
end
255256

256-
for event_type, data_json in chunk:gmatch("event: ([^\n]+)\ndata: ([^\n]+)") do
257+
if leftover ~= "" then
258+
chunk = leftover .. chunk
259+
leftover = ""
260+
end
261+
262+
local last_boundary = 0
263+
local pos = 1
264+
while true do
265+
local found = chunk:find("\n\n", pos, true)
266+
if not found then break end
267+
last_boundary = found + 1
268+
pos = found + 2
269+
end
270+
271+
if last_boundary == 0 then
272+
leftover = chunk
273+
goto continue
274+
end
275+
276+
local complete = chunk:sub(1, last_boundary)
277+
if last_boundary < #chunk then
278+
leftover = chunk:sub(last_boundary + 1)
279+
end
280+
281+
for event_type, data_json in complete:gmatch("event: ([^\n]+)\ndata: ([^\n]+)") do
257282
local data, decode_err = json.decode(tostring(data_json))
258283
if decode_err or not data then
259284
goto continue_event

src/llm/src/claude/client_test.lua

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1249,6 +1249,102 @@ local function define_tests()
12491249
test.eq(full_content, "Hello")
12501250
test.eq(#content_chunks, 1)
12511251
end)
1252+
1253+
it("should handle SSE events split across chunk boundaries", function()
1254+
local stream_chunks = {
1255+
'event: message_start\ndata: {"type":"message_start","message":{"usage":{"input_tokens":10,"output_tokens":0}}}\n\n',
1256+
'event: content_block_start\ndata: {"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"call_split","name":"run_script"}}\n\n',
1257+
'event: content_block_delta\ndata: {"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"{\\"scr"}}\n\n' ..
1258+
'event: content_block_del',
1259+
'ta\ndata: {"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"ipt\\": \\"echo"}}\n\n',
1260+
'event: content_block_delta\ndata: {"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":" hello\\"}"}}\n\nevent: content_block_stop\ndata: {"type":"content_block_stop","index":0}\n\n',
1261+
'event: message_delta\ndata: {"type":"message_delta","delta":{"stop_reason":"tool_use"},"usage":{"output_tokens":8}}\n\n',
1262+
'event: message_stop\ndata: {"type":"message_stop"}\n\n'
1263+
}
1264+
1265+
local mock_stream = {
1266+
chunks = stream_chunks,
1267+
current = 0
1268+
}
1269+
1270+
setmetatable(mock_stream, {
1271+
__index = {
1272+
read = function(self)
1273+
self.current = self.current + 1
1274+
if self.current <= #self.chunks then
1275+
return self.chunks[self.current]
1276+
end
1277+
return nil
1278+
end
1279+
}
1280+
})
1281+
1282+
local stream_response = {
1283+
stream = mock_stream,
1284+
metadata = {}
1285+
}
1286+
1287+
local tool_calls = {}
1288+
1289+
local full_content, err, result = claude_client.process_stream(stream_response, {
1290+
on_tool_call = function(tool_call)
1291+
table.insert(tool_calls, tool_call)
1292+
end
1293+
})
1294+
1295+
test.is_nil(err)
1296+
test.eq(#tool_calls, 1)
1297+
local tc = assert(tool_calls[1])
1298+
test.eq(tc.id, "call_split")
1299+
test.eq(tc.name, "run_script")
1300+
test.eq(tc.arguments.script, "echo hello")
1301+
end)
1302+
1303+
it("should handle chunk containing no complete events", function()
1304+
local stream_chunks = {
1305+
'event: message_sta',
1306+
'rt\ndata: {"type":"message_start","message":{"usage":{"input_tokens":5,"output_tokens":0}}}\n\n',
1307+
'event: content_block_start\ndata: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}\n\n',
1308+
'event: content_block_delta\ndata: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"OK"}}\n\n',
1309+
'event: content_block_stop\ndata: {"type":"content_block_stop","index":0}\n\n',
1310+
'event: message_delta\ndata: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":1}}\n\n',
1311+
'event: message_stop\ndata: {"type":"message_stop"}\n\n'
1312+
}
1313+
1314+
local mock_stream = {
1315+
chunks = stream_chunks,
1316+
current = 0
1317+
}
1318+
1319+
setmetatable(mock_stream, {
1320+
__index = {
1321+
read = function(self)
1322+
self.current = self.current + 1
1323+
if self.current <= #self.chunks then
1324+
return self.chunks[self.current]
1325+
end
1326+
return nil
1327+
end
1328+
}
1329+
})
1330+
1331+
local stream_response = {
1332+
stream = mock_stream,
1333+
metadata = {}
1334+
}
1335+
1336+
local content_chunks = {}
1337+
1338+
local full_content, err, result = claude_client.process_stream(stream_response, {
1339+
on_content = function(chunk)
1340+
table.insert(content_chunks, chunk)
1341+
end
1342+
})
1343+
1344+
test.is_nil(err)
1345+
test.eq(full_content, "OK")
1346+
test.eq(#content_chunks, 1)
1347+
end)
12521348
end)
12531349

12541350
describe("Configuration Edge Cases", function()

src/llm/src/openai/client.lua

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -286,28 +286,49 @@ function openai_client.process_stream(stream_response, callbacks)
286286
local on_error = callbacks.on_error or function(...) end
287287
local on_done = callbacks.on_done or function(...) end
288288

289-
-- Process each streamed chunk
289+
local leftover = ""
290+
290291
while true do
291292
local chunk, err = stream_response.stream:read()
292293

293-
-- Handle read errors
294294
if err then
295295
on_error(err)
296296
return nil, err
297297
end
298298

299-
-- End of stream
300299
if not chunk then
301300
break
302301
end
303302

304-
-- Skip empty chunks
305303
if chunk == "" then
306304
goto continue
307305
end
308306

309-
-- Check for errors in the chunk
310-
local error_json = chunk:match('data:%s*({.-"error":.-)%s*\n')
307+
if leftover ~= "" then
308+
chunk = leftover .. chunk
309+
leftover = ""
310+
end
311+
312+
local last_boundary = 0
313+
local pos = 1
314+
while true do
315+
local found = chunk:find("\n\n", pos, true)
316+
if not found then break end
317+
last_boundary = found + 1
318+
pos = found + 2
319+
end
320+
321+
if last_boundary == 0 then
322+
leftover = chunk
323+
goto continue
324+
end
325+
326+
local complete = chunk:sub(1, last_boundary)
327+
if last_boundary < #chunk then
328+
leftover = chunk:sub(last_boundary + 1)
329+
end
330+
331+
local error_json = complete:match('data:%s*({.-"error":.-)%s*\n')
311332
if error_json then
312333
local parsed_error, parse_err = json.decode(tostring(error_json))
313334
if not parse_err and parsed_error and parsed_error.error then
@@ -322,8 +343,7 @@ function openai_client.process_stream(stream_response, callbacks)
322343
end
323344
end
324345

325-
-- Process each data line in the chunk
326-
for data_line in chunk:gmatch('data:%s*(.-)%s*\n') do
346+
for data_line in complete:gmatch('data:%s*(.-)%s*\n') do
327347
-- Skip empty data lines
328348
if data_line == "" then
329349
goto continue_line

0 commit comments

Comments
 (0)