diff --git a/packages/bruno-app/src/providers/ReduxStore/slices/collections/index.js b/packages/bruno-app/src/providers/ReduxStore/slices/collections/index.js index 3665661924d..bfe158685f1 100644 --- a/packages/bruno-app/src/providers/ReduxStore/slices/collections/index.js +++ b/packages/bruno-app/src/providers/ReduxStore/slices/collections/index.js @@ -3124,12 +3124,14 @@ export const collectionsSlice = createSlice({ if (collection) { const item = findItemInCollection(collection, itemUid); if (data.data) { + // Avoid generating massive hexdumps for large streaming chunks (can lock up the UI). + const MAX_HEXDUMP_BYTES = 32 * 1024; item.response.data ||= []; item.response.data = [{ type: 'incoming', seq, message: data.data, - messageHexdump: hexdump(data.data), + messageHexdump: hexdump(data.data, { length: MAX_HEXDUMP_BYTES }), timestamp: timestamp || Date.now() }].concat(item.response.data); } diff --git a/packages/bruno-electron/src/ipc/network/event-stream.js b/packages/bruno-electron/src/ipc/network/event-stream.js new file mode 100644 index 00000000000..7ac01fba462 --- /dev/null +++ b/packages/bruno-electron/src/ipc/network/event-stream.js @@ -0,0 +1,86 @@ +const { StringDecoder } = require('string_decoder'); + +const parseSseEventData = (eventText) => { + // https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream + // We only extract `data:` lines, because that's what users care about in Bruno. + const lines = eventText.split(/\r?\n/); + const dataLines = []; + for (const line of lines) { + // comment/heartbeat + if (!line || line.startsWith(':')) { + continue; + } + if (line.startsWith('data:')) { + dataLines.push(line.slice(5).replace(/^\s/, '')); + continue; + } + if (line === 'data') { + dataLines.push(''); + continue; + } + } + if (dataLines.length) { + return dataLines.join('\n'); + } + // Fallback: if server doesn't follow SSE fields, still surface raw chunk. + return eventText; +}; + +const createEventStreamEmitter = ({ onMessage, maxBufferedChars = 256 * 1024 }) => { + const decoder = new StringDecoder('utf8'); + let buffered = ''; + + const drain = () => { + while (true) { + const idxCrlf = buffered.indexOf('\r\n\r\n'); + const idxLf = buffered.indexOf('\n\n'); + let idx = -1; + let sepLen = 0; + if (idxCrlf !== -1 && (idxLf === -1 || idxCrlf < idxLf)) { + idx = idxCrlf; + sepLen = 4; + } else if (idxLf !== -1) { + idx = idxLf; + sepLen = 2; + } + + if (idx === -1) { + break; + } + + const frame = buffered.slice(0, idx); + buffered = buffered.slice(idx + sepLen); + + const data = parseSseEventData(frame); + if (data && data.length) { + onMessage(data); + } + } + + // Safety valve: if we never find a frame separator, still stream raw text. + if (buffered.length >= maxBufferedChars) { + onMessage(buffered); + buffered = ''; + } + }; + + return { + write: (chunk) => { + buffered += decoder.write(chunk); + drain(); + }, + end: () => { + buffered += decoder.end(); + drain(); + if (buffered && buffered.length) { + onMessage(buffered); + buffered = ''; + } + } + }; +}; + +module.exports = { + createEventStreamEmitter +}; + diff --git a/packages/bruno-electron/src/ipc/network/index.js b/packages/bruno-electron/src/ipc/network/index.js index e17fe4ae9f7..09033d08805 100644 --- a/packages/bruno-electron/src/ipc/network/index.js +++ b/packages/bruno-electron/src/ipc/network/index.js @@ -36,6 +36,7 @@ const registerGrpcEventHandlers = require('./grpc-event-handlers'); const { registerWsEventHandlers } = require('./ws-event-handlers'); const { getCertsAndProxyConfig } = require('./cert-utils'); const { buildFormUrlEncodedPayload, isFormData } = require('@usebruno/common').utils; +const { createEventStreamEmitter } = require('./event-stream'); const ERROR_OCCURRED_WHILE_EXECUTING_REQUEST = 'Error occurred while executing the request!'; @@ -1011,23 +1012,28 @@ const registerNetworkIpc = (mainWindow) => { const stream = response.stream; response.stream = { running: response.status >= 200 && response.status < 300 }; - stream.on('data', (newData) => { - seq += 1; - - const parsed = parseDataFromResponse({ data: newData, headers: {} }); + const emitter = createEventStreamEmitter({ + onMessage: (message) => { + seq += 1; + mainWindow.webContents.send('main:http-stream-new-data', { + collectionUid, + itemUid: item.uid, + seq, + timestamp: Date.now(), + data: { data: message } + }); + } + }); - mainWindow.webContents.send('main:http-stream-new-data', { - collectionUid, - itemUid: item.uid, - seq, - timestamp: Date.now(), - data: parsed - }); + stream.on('data', (newData) => { + emitter.write(newData); }); stream.on('close', () => { if (!cancelTokens[response.cancelTokenUid]) return; + emitter.end(); + mainWindow.webContents.send('main:http-stream-end', { collectionUid, itemUid: item.uid, diff --git a/packages/bruno-electron/tests/network/event-stream-decoding.spec.js b/packages/bruno-electron/tests/network/event-stream-decoding.spec.js new file mode 100644 index 00000000000..52df040e803 --- /dev/null +++ b/packages/bruno-electron/tests/network/event-stream-decoding.spec.js @@ -0,0 +1,46 @@ +const { createEventStreamEmitter } = require('../../src/ipc/network/event-stream'); + +describe('event-stream decoding', () => { + it('does not lose utf-8 bytes across chunk boundaries', async () => { + // Force a multibyte boundary split: πŸ˜€ is 4 bytes in UTF-8. + const payload = `data: {\"text\":\"δ½ ε₯½πŸ˜€δΈ–η•Œ\"}\n\n`; + const buf = Buffer.from(payload, 'utf8'); + + const parts = [buf.subarray(0, buf.length - 1), buf.subarray(buf.length - 1)]; + const received = []; + const emitter = createEventStreamEmitter({ onMessage: (m) => received.push(m) }); + + parts.forEach((p) => emitter.write(p)); + emitter.end(); + + expect(received).toEqual(['{"text":"δ½ ε₯½πŸ˜€δΈ–η•Œ"}']); + }); + + it('reassembles a large SSE event split into many chunks', async () => { + const largeJson = JSON.stringify({ a: 'x'.repeat(200_000), b: 'y'.repeat(50_000) }); + const payload = `data: ${largeJson}\n\n`; + const buf = Buffer.from(payload, 'utf8'); + + const received = []; + const emitter = createEventStreamEmitter({ onMessage: (m) => received.push(m) }); + + // split into small chunks to simulate arbitrary TCP chunking + for (let i = 0; i < buf.length; i += 1024) { + emitter.write(buf.subarray(i, i + 1024)); + } + emitter.end(); + + expect(received.length).toBe(1); + expect(received[0]).toBe(largeJson); + }); + + it('falls back to raw streaming when no SSE separators appear', async () => { + const received = []; + const emitter = createEventStreamEmitter({ onMessage: (m) => received.push(m), maxBufferedChars: 10 }); + emitter.write(Buffer.from('abcdefghij', 'utf8')); + emitter.write(Buffer.from('klmno', 'utf8')); + emitter.end(); + + expect(received.join('')).toBe('abcdefghijklmno'); + }); +});