Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 55 additions & 31 deletions opencti-platform/opencti-graphql/src/graphql/sseMiddleware.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { once } from 'events';
import * as jsonpatch from 'fast-json-patch';
import { Promise } from 'bluebird';
import { LRUCache } from 'lru-cache';
import { now } from 'moment';
import conf, { basePath, logApp } from '../config/conf';
Expand Down Expand Up @@ -53,6 +53,7 @@ const broadcastClients = {};
const queryIndices = [...READ_STIX_INDICES, READ_INDEX_STIX_META_OBJECTS];
const DEFAULT_LIVE_STREAM = 'live';
const ONE_HOUR = 1000 * 60 * 60;
const DEFAULT_SOCKET_BUFFER = 65536;
const MAX_CACHE_TIME = (conf.get('app:live_stream:cache_max_time') ?? 1) * ONE_HOUR;
const MAX_CACHE_SIZE = conf.get('app:live_stream:cache_max_size') ?? 5000;
const HEARTBEAT_PERIOD = conf.get('app:live_stream:heartbeat_period') ?? 5000;
Expand All @@ -74,12 +75,8 @@ const createBroadcastClient = (channel) => {
setChannelDelay: (d) => channel.setDelay(d),
setLastEventId: (id) => channel.setLastEventId(id),
close: () => channel.close(),
sendEvent: (eventId, topic, event) => {
channel.sendEvent(eventId, topic, event);
},
sendConnected: (streamInfo) => {
channel.sendEvent(undefined, 'connected', streamInfo);
},
sendEvent: async (eventId, topic, event) => channel.sendEvent(eventId, topic, event),
sendConnected: async (streamInfo) => channel.sendEvent(undefined, 'connected', streamInfo),
};
};

Expand Down Expand Up @@ -235,20 +232,26 @@ const createSseMiddleware = () => {

const initBroadcasting = async (req, res, client, processor) => {
const broadcasterInfo = processor ? await processor.info() : {};
req.on('close', () => {
let closed = false;
const close = () => {
if (closed) {
return;
}
client.close();
delete broadcastClients[client.id];
logApp.info(`[STREAM] Closing stream processor for ${client.id}`);
processor.shutdown();
});
closed = true;
};
req.on('close', close);
res.writeHead(200, {
Connection: 'keep-alive',
'Content-Type': 'text/event-stream; charset=utf-8',
'Access-Control-Allow-Origin': '*',
'Cache-Control': 'no-cache, no-transform', // no-transform is required for dev proxy
});
client.sendConnected({ ...broadcasterInfo, connectionId: client.id });
broadcastClients[client.id] = client;
await client.sendConnected({ ...broadcasterInfo, connectionId: client.id });
};
const createSseChannel = (req, res, startId) => {
let lastEventId = startId;
Expand All @@ -266,12 +269,13 @@ const createSseMiddleware = () => {
setLastEventId: (id) => {
lastEventId = id;
},
connected: () => !res.finished,
sendEvent: (eventId, topic, event) => {
connected: () => !res.finished && res.writable,
sendEvent: async (eventId, topic, event) => {
// Write on an already terminated response
if (res.finished || !res.writable) {
return;
}
// region build message
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i prefer to use function to group the code

let message = '';
if (eventId) {
lastEventId = eventId;
Expand All @@ -293,8 +297,23 @@ const createSseMiddleware = () => {
message += '\n';
}
message += '\n';
res.write(message);
res.flush();
// endregion
// Send a message to socket by chunk
// It's necessary to chuck to prevent any oversized buffer
const messageBuffer = Buffer.from(message, 'utf-8');
const chunkSize = res.writableHighWaterMark || DEFAULT_SOCKET_BUFFER;
let offset = 0;
while (offset < messageBuffer.length) {
const end = Math.min(offset + chunkSize, messageBuffer.length);
const chunk = messageBuffer.subarray(offset, end);
if (!res.write(chunk)) {
logApp.debug('[STREAM] Buffer draining', { buffer: res.writableLength, limit: res.writableHighWaterMark });
await once(res, 'drain');
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if chunk is no more necessary, i prefer to remove it, since we can easily break the SSE message due to race conditions caused by this await. this race condition may already exist with heartbeat that are issued in //

}
res.flush();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one flush per chunk is not necessary. should be done only at the end of all chunks

offset += chunkSize;
}
// endregion
},
close: () => {
logApp.info('[STREAM] Closing SSE channel', { clientId: channel.userId });
Expand All @@ -309,11 +328,17 @@ const createSseMiddleware = () => {
}
},
};
const heartTimer = () => {
if (lastEventId) {
const [idTime] = lastEventId.split('-');
const idDate = utcDate(parseInt(idTime, 10)).toISOString();
channel.sendEvent(lastEventId, 'heartbeat', idDate);
const heartTimer = async () => {
try {
// heartbeat must be sent to maintain the connection
// Only when the last event is accessible and nothing is currently in the socket.
if (lastEventId && res.writableLength === 0) {
const [idTime] = lastEventId.split('-');
const idDate = utcDate(parseInt(idTime, 10)).toISOString();
await channel.sendEvent(lastEventId, 'heartbeat', idDate);
}
} catch {
// ignore
}
};
const heartbeatInterval = setInterval(heartTimer, HEARTBEAT_PERIOD);
Expand All @@ -338,7 +363,7 @@ const createSseMiddleware = () => {
const { id: eventId, event, data } = elements[index];
const instanceAccessible = await isUserCanAccessStixElement(context, user, data.data);
if (instanceAccessible) {
client.sendEvent(eventId, event, data);
await client.sendEvent(eventId, event, data);
}
}
client.setLastEventId(lastEventId);
Expand Down Expand Up @@ -395,7 +420,7 @@ const createSseMiddleware = () => {
const message = generateCreateMessage(missingInstance);
const origin = { referer: EVENT_TYPE_DEPENDENCIES };
const content = { data: missingData, message, origin, version: EVENT_CURRENT_VERSION };
channel.sendEvent(eventId, EVENT_TYPE_CREATE, content);
await channel.sendEvent(eventId, EVENT_TYPE_CREATE, content);
cache.set(missingData.id, 'hit');
await wait(channel.delay);
}
Expand All @@ -421,7 +446,7 @@ const createSseMiddleware = () => {
const message = generateCreateMessage(missingRelation);
const origin = { referer: EVENT_TYPE_DEPENDENCIES };
const content = { data: stixRelation, message, origin, version: EVENT_CURRENT_VERSION };
channel.sendEvent(eventId, EVENT_TYPE_CREATE, content);
await channel.sendEvent(eventId, EVENT_TYPE_CREATE, content);
cache.set(stixRelation.id, 'hit');
}
}
Expand All @@ -447,7 +472,6 @@ const createSseMiddleware = () => {
const entityTypeFilters = findFiltersFromKey(filters.filters, 'entity_type', 'eq');
const entityTypeFilter = entityTypeFilters.length > 0 ? entityTypeFilters[0] : undefined;
const entityTypeFilterValues = entityTypeFilter?.values ?? [];
// eslint-disable-next-line no-restricted-syntax
for (const id of entityTypeFilterValues) {
// consider the operator
if (entityTypeFilter.operator === 'not_eq') {
Expand Down Expand Up @@ -501,7 +525,7 @@ const createSseMiddleware = () => {
// From or to are visible, consider it as a dependency
const origin = { referer: EVENT_TYPE_DEPENDENCIES };
const content = { data: stix, message, origin, version: EVENT_CURRENT_VERSION };
channel.sendEvent(eventId, type, content);
await channel.sendEvent(eventId, type, content);
}
}
};
Expand Down Expand Up @@ -600,18 +624,18 @@ const createSseMiddleware = () => {
const { newDocument: previous } = jsonpatch.applyPatch(structuredClone(stix), evenContext.reverse_patch);
const isPreviouslyVisible = await isStixMatchFilterGroup(context, user, previous, streamFilters);
if (isPreviouslyVisible && !isCurrentlyVisible && publishDeletion) { // No longer visible
client.sendEvent(eventId, EVENT_TYPE_DELETE, eventData);
await client.sendEvent(eventId, EVENT_TYPE_DELETE, eventData);
cache.set(stix.id, 'hit');
} else if (!isPreviouslyVisible && isCurrentlyVisible) { // Newly visible
const isValidResolution = await resolveAndPublishDependencies(context, noDependencies, cache, channel, req, eventId, stix);
if (isValidResolution) {
client.sendEvent(eventId, EVENT_TYPE_CREATE, eventData);
await client.sendEvent(eventId, EVENT_TYPE_CREATE, eventData);
cache.set(stix.id, 'hit');
}
} else if (isCurrentlyVisible) { // Just an update
const isValidResolution = await resolveAndPublishDependencies(context, noDependencies, cache, channel, req, eventId, stix);
if (isValidResolution) {
client.sendEvent(eventId, event, eventData);
await client.sendEvent(eventId, event, eventData);
cache.set(stix.id, 'hit');
}
} else if (isRelation && publishDependencies) { // Update but not visible - relation type
Expand All @@ -630,20 +654,20 @@ const createSseMiddleware = () => {
// At least one container is matching the filter, so publishing the event
if (countRelatedContainers > 0) {
await resolveAndPublishMissingRefs(context, cache, channel, req, eventId, stix);
client.sendEvent(eventId, event, eventData);
await client.sendEvent(eventId, event, eventData);
cache.set(stix.id, 'hit');
}
}
} else if (isCurrentlyVisible) {
if (type === EVENT_TYPE_DELETE) {
if (publishDeletion) {
client.sendEvent(eventId, event, eventData);
await client.sendEvent(eventId, event, eventData);
cache.set(stix.id, 'hit');
}
} else { // Create and merge
const isValidResolution = await resolveAndPublishDependencies(context, noDependencies, cache, channel, req, eventId, stix);
if (isValidResolution) {
client.sendEvent(eventId, event, eventData);
await client.sendEvent(eventId, event, eventData);
cache.set(stix.id, 'hit');
}
}
Expand Down Expand Up @@ -689,7 +713,7 @@ const createSseMiddleware = () => {
const message = generateCreateMessage(instance);
const origin = { referer: EVENT_TYPE_INIT };
const eventData = { data: stixData, message, origin, version: EVENT_CURRENT_VERSION };
channel.sendEvent(eventId, EVENT_TYPE_CREATE, eventData);
await channel.sendEvent(eventId, EVENT_TYPE_CREATE, eventData);
cache.set(stixData.id, 'hit');
}
} else {
Expand Down