Skip to content

Commit 30dbe34

Browse files
SW881sid-bruno
andauthored
fix: add a hard sequence in SSE and WS requests (#6569)
* fix: sse sequence in ipc layer * fix: remove tick rate and flushing * fix: added sequence logic for websockets * fix: added sequence logic for websockets per request based * fix: correct the order for how the messages are added. `WSMessagesList` already handles a lot of the ordering for us, don't modify the order the messages are added since redirect and connection are internal states, it changes the execution trail * chore: reduce whitespace diffs * fix: a possible null case exception Though we always create an empty data buffer at source so shouldn't happen unless that is modified * fix: implement sequence logic for WebSocket messages * fix: remove unused sequenceState property from WsClient * fix: update message sorting logic to handle missing sequence numbers * fix: remove unused lodash import * fix: add clean method to sequencer for better sequence management * fix: don't show dropdown when streaming --------- Co-authored-by: Sid <siddharth@usebruno.com>
1 parent c83c055 commit 30dbe34

File tree

5 files changed

+86
-20
lines changed

5 files changed

+86
-20
lines changed

packages/bruno-app/src/components/ResponsePane/WsResponsePane/WSMessagesList/index.js

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import CodeEditor from 'components/CodeEditor/index';
66
import { useTheme } from 'providers/Theme';
77
import { useState } from 'react';
88
import { useSelector } from 'react-redux';
9-
import _ from 'lodash';
109
import { useRef } from 'react';
1110
import { useEffect } from 'react';
1211

@@ -183,12 +182,16 @@ const WSMessagesList = ({ order = -1, messages = [] }) => {
183182
if (!messages.length) {
184183
return <StyledWrapper><div className="empty-state">No messages yet.</div></StyledWrapper>;
185184
}
186-
const ordered = order === -1 ? messages : messages.slice().reverse();
185+
186+
// sort based on order, seq was newly added and might be missing in some cases and when missing,
187+
// the timestamp will be used instead
188+
const ordered = messages.toSorted((x, y) => ((x.seq ?? x.timestamp) - (y.seq ?? y.timestamp)) * order);
189+
187190
return (
188191
<StyledWrapper className="ws-messages-list flex flex-col">
189192
{ordered.map((msg, idx, src) => {
190193
const inFocus = order === -1 ? src.length - 1 === idx : idx === 0;
191-
return <WSMessageItem key={msg.timestamp} inFocus={inFocus} id={idx} message={msg} />;
194+
return <WSMessageItem key={msg.seq ? msg.seq : msg.timestamp} inFocus={inFocus} id={idx} message={msg} />;
192195
})}
193196
</StyledWrapper>
194197
);

packages/bruno-app/src/components/ResponsePane/index.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ const ResponsePane = ({ item, collection }) => {
225225
onClick={() => setShowScriptErrorCard(true)}
226226
/>
227227
)}
228-
{focusedTab?.responsePaneTab === 'response' && item?.response ? (
228+
{focusedTab?.responsePaneTab === 'response' && item?.response && !(item.response?.stream ?? false) ? (
229229
<>
230230
{/* Result View Tabs (Visualizations + Response Format) */}
231231
<div className="result-view-tabs">

packages/bruno-app/src/providers/ReduxStore/slices/collections/index.js

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ export const collectionsSlice = createSlice({
383383
}
384384
},
385385
requestCancelled: (state, action) => {
386-
const { itemUid, collectionUid } = action.payload;
386+
const { itemUid, collectionUid, seq, timestamp } = action.payload;
387387
const collection = findCollectionByUid(state.collections, collectionUid);
388388

389389
if (collection) {
@@ -394,7 +394,7 @@ export const collectionsSlice = createSlice({
394394

395395
const startTimestamp = item.requestSent.timestamp;
396396
item.response.duration = startTimestamp ? Date.now() - startTimestamp : item.response.duration;
397-
item.response.data = [{ type: 'info', timestamp: Date.now(), message: 'Connection Closed' }].concat(item.response.data);
397+
item.response.data = [{ type: 'info', timestamp: Date.now(), seq: seq, message: 'Connection Closed' }].concat(item.response.data);
398398
} else {
399399
item.response = null;
400400
item.requestUid = null;
@@ -3118,7 +3118,7 @@ export const collectionsSlice = createSlice({
31183118
}
31193119
},
31203120
streamDataReceived: (state, action) => {
3121-
const { itemUid, collectionUid, data } = action.payload;
3121+
const { itemUid, collectionUid, seq, timestamp, data } = action.payload;
31223122
const collection = findCollectionByUid(state.collections, collectionUid);
31233123

31243124
if (collection) {
@@ -3127,12 +3127,16 @@ export const collectionsSlice = createSlice({
31273127
item.response.data ||= [];
31283128
item.response.data = [{
31293129
type: 'incoming',
3130+
seq,
31303131
message: data.data,
31313132
messageHexdump: hexdump(data.data),
3132-
timestamp: Date.now()
3133+
timestamp: timestamp || Date.now()
31333134
}].concat(item.response.data);
31343135
}
3135-
item.response.dataBuffer = Buffer.concat([Buffer.from(item.response.dataBuffer), Buffer.from(data.dataBuffer)]);
3136+
if (item.response.dataBuffer && item.response.dataBuffer.length && data.dataBuffer) {
3137+
item.response.dataBuffer = Buffer.concat([Buffer.from(item.response.dataBuffer), Buffer.from(data.dataBuffer)]);
3138+
}
3139+
31363140
item.response.size = data.data?.length + (item.response.size || 0);
31373141
}
31383142
},
@@ -3255,7 +3259,8 @@ export const collectionsSlice = createSlice({
32553259
updatedResponse.responses.push({
32563260
message: eventData.message,
32573261
type: eventData.type,
3258-
timestamp: eventData.timestamp
3262+
timestamp: eventData.timestamp,
3263+
seq: eventData.seq
32593264
});
32603265
break;
32613266

@@ -3271,7 +3276,8 @@ export const collectionsSlice = createSlice({
32713276
updatedResponse.responses.push({
32723277
message: `Connected to ${eventData.url}`,
32733278
type: 'info',
3274-
timestamp: eventData.timestamp
3279+
timestamp: eventData.timestamp,
3280+
seq: eventData.seq
32753281
});
32763282
break;
32773283

@@ -3287,7 +3293,8 @@ export const collectionsSlice = createSlice({
32873293
updatedResponse.responses.push({
32883294
type: code !== 1000 ? 'info' : 'error',
32893295
message: reason.trim().length ? ['Closed:', reason.trim()].join(' ') : 'Closed',
3290-
timestamp
3296+
timestamp: eventData.timestamp,
3297+
seq: eventData.seq
32913298
});
32923299
break;
32933300

@@ -3302,7 +3309,8 @@ export const collectionsSlice = createSlice({
33023309
updatedResponse.responses.push({
33033310
type: 'error',
33043311
message: errorDetails || 'WebSocket error occurred',
3305-
timestamp
3312+
timestamp: eventData.timestamp,
3313+
seq: eventData.seq
33063314
});
33073315

33083316
break;

packages/bruno-electron/src/ipc/network/index.js

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1003,6 +1003,7 @@ const registerNetworkIpc = (mainWindow) => {
10031003

10041004
// handler for sending http request
10051005
ipcMain.handle('send-http-request', async (event, item, collection, environment, runtimeVariables) => {
1006+
let seq = 0;
10061007
const collectionUid = collection.uid;
10071008
const envVars = getEnvVars(environment);
10081009
const processEnvVars = getProcessEnvVars(collectionUid);
@@ -1012,16 +1013,29 @@ const registerNetworkIpc = (mainWindow) => {
10121013
response.stream = { running: response.status >= 200 && response.status < 300 };
10131014

10141015
stream.on('data', (newData) => {
1016+
seq += 1;
1017+
10151018
const parsed = parseDataFromResponse({ data: newData, headers: {} });
1016-
mainWindow.webContents.send('main:http-stream-new-data', { collectionUid, itemUid: item.uid, data: parsed });
1019+
1020+
mainWindow.webContents.send('main:http-stream-new-data', {
1021+
collectionUid,
1022+
itemUid: item.uid,
1023+
seq,
1024+
timestamp: Date.now(),
1025+
data: parsed
1026+
});
10171027
});
10181028

10191029
stream.on('close', () => {
1020-
if (!cancelTokens[response.cancelTokenUid]) {
1021-
return;
1022-
}
1030+
if (!cancelTokens[response.cancelTokenUid]) return;
1031+
1032+
mainWindow.webContents.send('main:http-stream-end', {
1033+
collectionUid,
1034+
itemUid: item.uid,
1035+
seq: seq + 1,
1036+
timestamp: Date.now()
1037+
});
10231038

1024-
mainWindow.webContents.send('main:http-stream-end', { collectionUid, itemUid: item.uid });
10251039
deleteCancelToken(response.cancelTokenUid);
10261040
});
10271041
}

packages/bruno-requests/src/ws/ws-client.js

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,36 @@ const normalizeMessageByFormat = (message, format) => {
4949
}
5050
};
5151

52+
const createSequencer = () => {
53+
const seq = {};
54+
55+
const nextSeq = (requestId, collectionId) => {
56+
seq[requestId] ||= {};
57+
seq[requestId][collectionId] ||= 0;
58+
return ++seq[requestId][collectionId];
59+
};
60+
61+
/**
62+
* @param {string} requestId
63+
* @param {string} [collectionId]
64+
*/
65+
const clean = (requestId, collectionId = undefined) => {
66+
if (collectionId) {
67+
delete seq[requestId][collectionId];
68+
}
69+
if (!Object.keys(seq[requestId]).length) {
70+
delete seq[requestId];
71+
}
72+
};
73+
74+
return {
75+
next: nextSeq,
76+
clean
77+
};
78+
};
79+
80+
const seq = createSequencer();
81+
5282
class WsClient {
5383
messageQueues = {};
5484
activeConnections = new Map();
@@ -181,6 +211,7 @@ class WsClient {
181211
message: payload,
182212
messageHexdump: hexdump(payload),
183213
type: 'outgoing',
214+
seq: seq.next(requestId, collectionUid),
184215
timestamp: Date.now()
185216
});
186217
}
@@ -204,6 +235,7 @@ class WsClient {
204235
if (connectionMeta?.connection) {
205236
connectionMeta.connection.close(code, reason);
206237
this.#removeConnection(requestId);
238+
seq.clean(requestId);
207239
}
208240
}
209241

@@ -283,7 +315,8 @@ class WsClient {
283315

284316
this.eventCallback('main:ws:open', requestId, collectionUid, {
285317
timestamp: Date.now(),
286-
url: ws.url
318+
url: ws.url,
319+
seq: seq.next(requestId, collectionUid)
287320
});
288321
});
289322

@@ -294,14 +327,16 @@ class WsClient {
294327
message: `Redirected to ${url}`,
295328
type: 'info',
296329
timestamp: Date.now(),
297-
headers: headers
330+
headers: headers,
331+
seq: seq.next(requestId, collectionUid)
298332
});
299333
});
300334

301335
ws.on('upgrade', (response) => {
302336
this.eventCallback('main:ws:upgrade', requestId, collectionUid, {
303337
type: 'info',
304338
timestamp: Date.now(),
339+
seq: seq.next(requestId, collectionUid),
305340
headers: { ...response.headers }
306341
});
307342
});
@@ -313,6 +348,7 @@ class WsClient {
313348
message,
314349
messageHexdump: hexdump(Buffer.from(data)),
315350
type: 'incoming',
351+
seq: seq.next(requestId, collectionUid),
316352
timestamp: Date.now()
317353
});
318354
} catch (error) {
@@ -321,6 +357,7 @@ class WsClient {
321357
message: data.toString(),
322358
messageHexdump: hexdump(data),
323359
type: 'incoming',
360+
seq: seq.next(requestId, collectionUid),
324361
timestamp: Date.now()
325362
});
326363
}
@@ -330,14 +367,17 @@ class WsClient {
330367
this.eventCallback('main:ws:close', requestId, collectionUid, {
331368
code,
332369
reason: Buffer.from(reason).toString(),
370+
seq: seq.next(requestId, collectionUid),
333371
timestamp: Date.now()
334372
});
373+
seq.clean(requestId, collectionUid);
335374
this.#removeConnection(requestId);
336375
});
337376

338377
ws.on('error', (error) => {
339378
this.eventCallback('main:ws:error', requestId, collectionUid, {
340379
error: error.message,
380+
seq: seq.next(requestId, collectionUid),
341381
timestamp: Date.now()
342382
});
343383
});
@@ -356,6 +396,7 @@ class WsClient {
356396
this.eventCallback('main:ws:connections-changed', {
357397
type: 'added',
358398
requestId,
399+
seq: seq.next(requestId, collectionUid),
359400
activeConnectionIds: this.getActiveConnectionIds()
360401
});
361402
}

0 commit comments

Comments
 (0)