Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3124,12 +3124,14 @@ export const collectionsSlice = createSlice({
if (collection) {
const item = findItemInCollection(collection, itemUid);
if (data.data) {
// Avoid generating massive hexdumps for large streaming chunks (can lock up the UI).
const MAX_HEXDUMP_BYTES = 32 * 1024;
item.response.data ||= [];
item.response.data = [{
type: 'incoming',
seq,
message: data.data,
messageHexdump: hexdump(data.data),
messageHexdump: hexdump(data.data, { length: MAX_HEXDUMP_BYTES }),
timestamp: timestamp || Date.now()
}].concat(item.response.data);
}
Expand Down
86 changes: 86 additions & 0 deletions packages/bruno-electron/src/ipc/network/event-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
const { StringDecoder } = require('string_decoder');

const parseSseEventData = (eventText) => {
// https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream
// We only extract `data:` lines, because that's what users care about in Bruno.
const lines = eventText.split(/\r?\n/);
const dataLines = [];
for (const line of lines) {
// comment/heartbeat
if (!line || line.startsWith(':')) {
continue;
}
if (line.startsWith('data:')) {
dataLines.push(line.slice(5).replace(/^\s/, ''));
continue;
}
if (line === 'data') {
dataLines.push('');
continue;
}
}
if (dataLines.length) {
return dataLines.join('\n');
}
// Fallback: if server doesn't follow SSE fields, still surface raw chunk.
return eventText;
};

const createEventStreamEmitter = ({ onMessage, maxBufferedChars = 256 * 1024 }) => {
const decoder = new StringDecoder('utf8');
let buffered = '';

const drain = () => {
while (true) {
const idxCrlf = buffered.indexOf('\r\n\r\n');
const idxLf = buffered.indexOf('\n\n');
let idx = -1;
let sepLen = 0;
if (idxCrlf !== -1 && (idxLf === -1 || idxCrlf < idxLf)) {
idx = idxCrlf;
sepLen = 4;
} else if (idxLf !== -1) {
idx = idxLf;
sepLen = 2;
}

if (idx === -1) {
break;
}

const frame = buffered.slice(0, idx);
buffered = buffered.slice(idx + sepLen);

const data = parseSseEventData(frame);
if (data && data.length) {
onMessage(data);
}
}

// Safety valve: if we never find a frame separator, still stream raw text.
if (buffered.length >= maxBufferedChars) {
onMessage(buffered);
buffered = '';
}
};

return {
write: (chunk) => {
buffered += decoder.write(chunk);
drain();
},
end: () => {
buffered += decoder.end();
drain();
if (buffered && buffered.length) {
onMessage(buffered);
buffered = '';
}
}
};
};

module.exports = {
createEventStreamEmitter
};

28 changes: 17 additions & 11 deletions packages/bruno-electron/src/ipc/network/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const registerGrpcEventHandlers = require('./grpc-event-handlers');
const { registerWsEventHandlers } = require('./ws-event-handlers');
const { getCertsAndProxyConfig } = require('./cert-utils');
const { buildFormUrlEncodedPayload, isFormData } = require('@usebruno/common').utils;
const { createEventStreamEmitter } = require('./event-stream');

const ERROR_OCCURRED_WHILE_EXECUTING_REQUEST = 'Error occurred while executing the request!';

Expand Down Expand Up @@ -1011,23 +1012,28 @@ const registerNetworkIpc = (mainWindow) => {
const stream = response.stream;
response.stream = { running: response.status >= 200 && response.status < 300 };

stream.on('data', (newData) => {
seq += 1;

const parsed = parseDataFromResponse({ data: newData, headers: {} });
const emitter = createEventStreamEmitter({
onMessage: (message) => {
seq += 1;
mainWindow.webContents.send('main:http-stream-new-data', {
collectionUid,
itemUid: item.uid,
seq,
timestamp: Date.now(),
data: { data: message }
});
}
});

mainWindow.webContents.send('main:http-stream-new-data', {
collectionUid,
itemUid: item.uid,
seq,
timestamp: Date.now(),
data: parsed
});
stream.on('data', (newData) => {
emitter.write(newData);
});

stream.on('close', () => {
if (!cancelTokens[response.cancelTokenUid]) return;

emitter.end();

mainWindow.webContents.send('main:http-stream-end', {
collectionUid,
itemUid: item.uid,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
const { createEventStreamEmitter } = require('../../src/ipc/network/event-stream');

describe('event-stream decoding', () => {
it('does not lose utf-8 bytes across chunk boundaries', async () => {
// Force a multibyte boundary split: 😀 is 4 bytes in UTF-8.
const payload = `data: {\"text\":\"你好😀世界\"}\n\n`;
const buf = Buffer.from(payload, 'utf8');

const parts = [buf.subarray(0, buf.length - 1), buf.subarray(buf.length - 1)];
const received = [];
const emitter = createEventStreamEmitter({ onMessage: (m) => received.push(m) });

parts.forEach((p) => emitter.write(p));
emitter.end();

expect(received).toEqual(['{"text":"你好😀世界"}']);
});

it('reassembles a large SSE event split into many chunks', async () => {
const largeJson = JSON.stringify({ a: 'x'.repeat(200_000), b: 'y'.repeat(50_000) });
const payload = `data: ${largeJson}\n\n`;
const buf = Buffer.from(payload, 'utf8');

const received = [];
const emitter = createEventStreamEmitter({ onMessage: (m) => received.push(m) });

// split into small chunks to simulate arbitrary TCP chunking
for (let i = 0; i < buf.length; i += 1024) {
emitter.write(buf.subarray(i, i + 1024));
}
emitter.end();

expect(received.length).toBe(1);
expect(received[0]).toBe(largeJson);
});

it('falls back to raw streaming when no SSE separators appear', async () => {
const received = [];
const emitter = createEventStreamEmitter({ onMessage: (m) => received.push(m), maxBufferedChars: 10 });
emitter.write(Buffer.from('abcdefghij', 'utf8'));
emitter.write(Buffer.from('klmno', 'utf8'));
emitter.end();

expect(received.join('')).toBe('abcdefghijklmno');
});
});