Skip to content

Commit d8f14ac

Browse files
committed
[backend] SSE message backpressure mechanism
1 parent e6863b5 commit d8f14ac

File tree

1 file changed

+23
-23
lines changed

1 file changed

+23
-23
lines changed

opencti-platform/opencti-graphql/src/graphql/sseMiddleware.js

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { once } from 'events';
12
import * as jsonpatch from 'fast-json-patch';
23
import { Promise } from 'bluebird';
34
import { LRUCache } from 'lru-cache';
@@ -74,12 +75,8 @@ const createBroadcastClient = (channel) => {
7475
setChannelDelay: (d) => channel.setDelay(d),
7576
setLastEventId: (id) => channel.setLastEventId(id),
7677
close: () => channel.close(),
77-
sendEvent: (eventId, topic, event) => {
78-
channel.sendEvent(eventId, topic, event);
79-
},
80-
sendConnected: (streamInfo) => {
81-
channel.sendEvent(undefined, 'connected', streamInfo);
82-
},
78+
sendEvent: async (eventId, topic, event) => channel.sendEvent(eventId, topic, event),
79+
sendConnected: async (streamInfo) => channel.sendEvent(undefined, 'connected', streamInfo),
8380
};
8481
};
8582

@@ -235,20 +232,22 @@ const createSseMiddleware = () => {
235232

236233
const initBroadcasting = async (req, res, client, processor) => {
237234
const broadcasterInfo = processor ? await processor.info() : {};
238-
req.on('close', () => {
235+
const close = () => {
239236
client.close();
240237
delete broadcastClients[client.id];
241238
logApp.info(`[STREAM] Closing stream processor for ${client.id}`);
242239
processor.shutdown();
243-
});
240+
};
241+
req.on('close', close);
242+
res.on('close', close);
244243
res.writeHead(200, {
245244
Connection: 'keep-alive',
246245
'Content-Type': 'text/event-stream; charset=utf-8',
247246
'Access-Control-Allow-Origin': '*',
248247
'Cache-Control': 'no-cache, no-transform', // no-transform is required for dev proxy
249248
});
250-
client.sendConnected({ ...broadcasterInfo, connectionId: client.id });
251249
broadcastClients[client.id] = client;
250+
await client.sendConnected({ ...broadcasterInfo, connectionId: client.id });
252251
};
253252
const createSseChannel = (req, res, startId) => {
254253
let lastEventId = startId;
@@ -266,8 +265,8 @@ const createSseMiddleware = () => {
266265
setLastEventId: (id) => {
267266
lastEventId = id;
268267
},
269-
connected: () => !res.finished,
270-
sendEvent: (eventId, topic, event) => {
268+
connected: () => !res.finished && res.writable,
269+
sendEvent: async (eventId, topic, event) => {
271270
// Write on an already terminated response
272271
if (res.finished || !res.writable) {
273272
return;
@@ -293,7 +292,9 @@ const createSseMiddleware = () => {
293292
message += '\n';
294293
}
295294
message += '\n';
296-
res.write(message);
295+
if (!res.write(message)) {
296+
await once(res, 'drain');
297+
}
297298
res.flush();
298299
},
299300
close: () => {
@@ -309,11 +310,11 @@ const createSseMiddleware = () => {
309310
}
310311
},
311312
};
312-
const heartTimer = () => {
313+
const heartTimer = async () => {
313314
if (lastEventId) {
314315
const [idTime] = lastEventId.split('-');
315316
const idDate = utcDate(parseInt(idTime, 10)).toISOString();
316-
channel.sendEvent(lastEventId, 'heartbeat', idDate);
317+
await channel.sendEvent(lastEventId, 'heartbeat', idDate);
317318
}
318319
};
319320
const heartbeatInterval = setInterval(heartTimer, HEARTBEAT_PERIOD);
@@ -338,7 +339,7 @@ const createSseMiddleware = () => {
338339
const { id: eventId, event, data } = elements[index];
339340
const instanceAccessible = await isUserCanAccessStixElement(context, user, data.data);
340341
if (instanceAccessible) {
341-
client.sendEvent(eventId, event, data);
342+
await client.sendEvent(eventId, event, data);
342343
}
343344
}
344345
client.setLastEventId(lastEventId);
@@ -447,7 +448,6 @@ const createSseMiddleware = () => {
447448
const entityTypeFilters = findFiltersFromKey(filters.filters, 'entity_type', 'eq');
448449
const entityTypeFilter = entityTypeFilters.length > 0 ? entityTypeFilters[0] : undefined;
449450
const entityTypeFilterValues = entityTypeFilter?.values ?? [];
450-
// eslint-disable-next-line no-restricted-syntax
451451
for (const id of entityTypeFilterValues) {
452452
// consider the operator
453453
if (entityTypeFilter.operator === 'not_eq') {
@@ -600,18 +600,18 @@ const createSseMiddleware = () => {
600600
const { newDocument: previous } = jsonpatch.applyPatch(structuredClone(stix), evenContext.reverse_patch);
601601
const isPreviouslyVisible = await isStixMatchFilterGroup(context, user, previous, streamFilters);
602602
if (isPreviouslyVisible && !isCurrentlyVisible && publishDeletion) { // No longer visible
603-
client.sendEvent(eventId, EVENT_TYPE_DELETE, eventData);
603+
await client.sendEvent(eventId, EVENT_TYPE_DELETE, eventData);
604604
cache.set(stix.id, 'hit');
605605
} else if (!isPreviouslyVisible && isCurrentlyVisible) { // Newly visible
606606
const isValidResolution = await resolveAndPublishDependencies(context, noDependencies, cache, channel, req, eventId, stix);
607607
if (isValidResolution) {
608-
client.sendEvent(eventId, EVENT_TYPE_CREATE, eventData);
608+
await client.sendEvent(eventId, EVENT_TYPE_CREATE, eventData);
609609
cache.set(stix.id, 'hit');
610610
}
611611
} else if (isCurrentlyVisible) { // Just an update
612612
const isValidResolution = await resolveAndPublishDependencies(context, noDependencies, cache, channel, req, eventId, stix);
613613
if (isValidResolution) {
614-
client.sendEvent(eventId, event, eventData);
614+
await client.sendEvent(eventId, event, eventData);
615615
cache.set(stix.id, 'hit');
616616
}
617617
} else if (isRelation && publishDependencies) { // Update but not visible - relation type
@@ -630,20 +630,20 @@ const createSseMiddleware = () => {
630630
// At least one container is matching the filter, so publishing the event
631631
if (countRelatedContainers > 0) {
632632
await resolveAndPublishMissingRefs(context, cache, channel, req, eventId, stix);
633-
client.sendEvent(eventId, event, eventData);
633+
await client.sendEvent(eventId, event, eventData);
634634
cache.set(stix.id, 'hit');
635635
}
636636
}
637637
} else if (isCurrentlyVisible) {
638638
if (type === EVENT_TYPE_DELETE) {
639639
if (publishDeletion) {
640-
client.sendEvent(eventId, event, eventData);
640+
await client.sendEvent(eventId, event, eventData);
641641
cache.set(stix.id, 'hit');
642642
}
643643
} else { // Create and merge
644644
const isValidResolution = await resolveAndPublishDependencies(context, noDependencies, cache, channel, req, eventId, stix);
645645
if (isValidResolution) {
646-
client.sendEvent(eventId, event, eventData);
646+
await client.sendEvent(eventId, event, eventData);
647647
cache.set(stix.id, 'hit');
648648
}
649649
}
@@ -689,7 +689,7 @@ const createSseMiddleware = () => {
689689
const message = generateCreateMessage(instance);
690690
const origin = { referer: EVENT_TYPE_INIT };
691691
const eventData = { data: stixData, message, origin, version: EVENT_CURRENT_VERSION };
692-
channel.sendEvent(eventId, EVENT_TYPE_CREATE, eventData);
692+
await channel.sendEvent(eventId, EVENT_TYPE_CREATE, eventData);
693693
cache.set(stixData.id, 'hit');
694694
}
695695
} else {

0 commit comments

Comments
 (0)