Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 68 additions & 50 deletions opencti-platform/opencti-graphql/src/graphql/sseMiddleware.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { once } from 'events';
import * as jsonpatch from 'fast-json-patch';
import { Promise } from 'bluebird';
import { LRUCache } from 'lru-cache';
import { now } from 'moment';
import conf, { basePath, logApp } from '../config/conf';
Expand Down Expand Up @@ -74,12 +74,8 @@ const createBroadcastClient = (channel) => {
setChannelDelay: (d) => channel.setDelay(d),
setLastEventId: (id) => channel.setLastEventId(id),
close: () => channel.close(),
sendEvent: (eventId, topic, event) => {
channel.sendEvent(eventId, topic, event);
},
sendConnected: (streamInfo) => {
channel.sendEvent(undefined, 'connected', streamInfo);
},
sendEvent: async (eventId, topic, event) => channel.sendEvent(eventId, topic, event),
sendConnected: async (streamInfo) => channel.sendEvent(undefined, 'connected', streamInfo),
};
};

Expand Down Expand Up @@ -235,23 +231,56 @@ const createSseMiddleware = () => {

const initBroadcasting = async (req, res, client, processor) => {
const broadcasterInfo = processor ? await processor.info() : {};
req.on('close', () => {
let closed = false;
const close = () => {
if (closed) {
return;
}
client.close();
delete broadcastClients[client.id];
logApp.info(`[STREAM] Closing stream processor for ${client.id}`);
processor.shutdown();
});
closed = true;
};
req.on('close', close); // On closing the request
res.on('close', close); // On closing the response
res.writeHead(200, {
Connection: 'keep-alive',
'Content-Type': 'text/event-stream; charset=utf-8',
'Access-Control-Allow-Origin': '*',
'Cache-Control': 'no-cache, no-transform', // no-transform is required for dev proxy
});
client.sendConnected({ ...broadcasterInfo, connectionId: client.id });
broadcastClients[client.id] = client;
await client.sendConnected({ ...broadcasterInfo, connectionId: client.id });
};

const createSseChannel = (req, res, startId) => {
let lastEventId = startId;

const buildMessage = (eventId, topic, event) => {
let message = '';
if (eventId) {
message += `id: ${eventId}\n`;
}
if (topic) {
message += `event: ${topic}\n`;
}
if (event) {
message += 'data: ';
const isDataTopic = eventId && topic !== 'heartbeat';
if (isDataTopic && req.user && !isUserHasCapability(req.user, KNOWLEDGE_ORGANIZATION_RESTRICT)) {
const filtered = { ...event };
delete filtered.data.extensions[STIX_EXT_OCTI].granted_refs;
message += JSON.stringify(filtered);
} else {
message += JSON.stringify(event);
}
message += '\n';
}
message += '\n';
return message;
};

const channel = {
id: generateInternalId(),
delay: parseInt(extractQueryParameter(req, 'delay') || req.headers['event-delay'] || 0, 10),
Expand All @@ -266,34 +295,18 @@ const createSseMiddleware = () => {
setLastEventId: (id) => {
lastEventId = id;
},
connected: () => !res.finished,
sendEvent: (eventId, topic, event) => {
connected: () => !res.finished && res.writable,
sendEvent: async (eventId, topic, event) => {
// Write on an already terminated response
if (res.finished || !res.writable) {
return;
}
let message = '';
if (eventId) {
lastEventId = eventId;
message += `id: ${eventId}\n`;
lastEventId = eventId || lastEventId;
const message = buildMessage(eventId, topic, event);
if (!res.write(message)) {
logApp.debug('[STREAM] Buffer draining', { buffer: res.writableLength, limit: res.writableHighWaterMark });
await once(res, 'drain');
}
if (topic) {
message += `event: ${topic}\n`;
}
if (event) {
message += 'data: ';
const isDataTopic = eventId && topic !== 'heartbeat';
if (isDataTopic && req.user && !isUserHasCapability(req.user, KNOWLEDGE_ORGANIZATION_RESTRICT)) {
const filtered = { ...event };
delete filtered.data.extensions[STIX_EXT_OCTI].granted_refs;
message += JSON.stringify(filtered);
} else {
message += JSON.stringify(event);
}
message += '\n';
}
message += '\n';
res.write(message);
res.flush();
},
close: () => {
Expand All @@ -309,11 +322,17 @@ const createSseMiddleware = () => {
}
},
};
const heartTimer = () => {
if (lastEventId) {
const [idTime] = lastEventId.split('-');
const idDate = utcDate(parseInt(idTime, 10)).toISOString();
channel.sendEvent(lastEventId, 'heartbeat', idDate);
const heartTimer = async () => {
try {
// heartbeat must be sent to maintain the connection
// Only when the last event is accessible and nothing is currently in the socket.
if (lastEventId && res.writableLength === 0) {
const [idTime] = lastEventId.split('-');
const idDate = utcDate(parseInt(idTime, 10)).toISOString();
await channel.sendEvent(lastEventId, 'heartbeat', idDate);
}
} catch {
// ignore
}
};
const heartbeatInterval = setInterval(heartTimer, HEARTBEAT_PERIOD);
Expand All @@ -338,7 +357,7 @@ const createSseMiddleware = () => {
const { id: eventId, event, data } = elements[index];
const instanceAccessible = await isUserCanAccessStixElement(context, user, data.data);
if (instanceAccessible) {
client.sendEvent(eventId, event, data);
await client.sendEvent(eventId, event, data);
}
}
client.setLastEventId(lastEventId);
Expand Down Expand Up @@ -395,7 +414,7 @@ const createSseMiddleware = () => {
const message = generateCreateMessage(missingInstance);
const origin = { referer: EVENT_TYPE_DEPENDENCIES };
const content = { data: missingData, message, origin, version: EVENT_CURRENT_VERSION };
channel.sendEvent(eventId, EVENT_TYPE_CREATE, content);
await channel.sendEvent(eventId, EVENT_TYPE_CREATE, content);
cache.set(missingData.id, 'hit');
await wait(channel.delay);
}
Expand All @@ -421,7 +440,7 @@ const createSseMiddleware = () => {
const message = generateCreateMessage(missingRelation);
const origin = { referer: EVENT_TYPE_DEPENDENCIES };
const content = { data: stixRelation, message, origin, version: EVENT_CURRENT_VERSION };
channel.sendEvent(eventId, EVENT_TYPE_CREATE, content);
await channel.sendEvent(eventId, EVENT_TYPE_CREATE, content);
cache.set(stixRelation.id, 'hit');
}
}
Expand All @@ -447,7 +466,6 @@ const createSseMiddleware = () => {
const entityTypeFilters = findFiltersFromKey(filters.filters, 'entity_type', 'eq');
const entityTypeFilter = entityTypeFilters.length > 0 ? entityTypeFilters[0] : undefined;
const entityTypeFilterValues = entityTypeFilter?.values ?? [];
// eslint-disable-next-line no-restricted-syntax
for (const id of entityTypeFilterValues) {
// consider the operator
if (entityTypeFilter.operator === 'not_eq') {
Expand Down Expand Up @@ -501,7 +519,7 @@ const createSseMiddleware = () => {
// From or to are visible, consider it as a dependency
const origin = { referer: EVENT_TYPE_DEPENDENCIES };
const content = { data: stix, message, origin, version: EVENT_CURRENT_VERSION };
channel.sendEvent(eventId, type, content);
await channel.sendEvent(eventId, type, content);
}
}
};
Expand Down Expand Up @@ -600,18 +618,18 @@ const createSseMiddleware = () => {
const { newDocument: previous } = jsonpatch.applyPatch(structuredClone(stix), evenContext.reverse_patch);
const isPreviouslyVisible = await isStixMatchFilterGroup(context, user, previous, streamFilters);
if (isPreviouslyVisible && !isCurrentlyVisible && publishDeletion) { // No longer visible
client.sendEvent(eventId, EVENT_TYPE_DELETE, eventData);
await client.sendEvent(eventId, EVENT_TYPE_DELETE, eventData);
cache.set(stix.id, 'hit');
} else if (!isPreviouslyVisible && isCurrentlyVisible) { // Newly visible
const isValidResolution = await resolveAndPublishDependencies(context, noDependencies, cache, channel, req, eventId, stix);
if (isValidResolution) {
client.sendEvent(eventId, EVENT_TYPE_CREATE, eventData);
await client.sendEvent(eventId, EVENT_TYPE_CREATE, eventData);
cache.set(stix.id, 'hit');
}
} else if (isCurrentlyVisible) { // Just an update
const isValidResolution = await resolveAndPublishDependencies(context, noDependencies, cache, channel, req, eventId, stix);
if (isValidResolution) {
client.sendEvent(eventId, event, eventData);
await client.sendEvent(eventId, event, eventData);
cache.set(stix.id, 'hit');
}
} else if (isRelation && publishDependencies) { // Update but not visible - relation type
Expand All @@ -630,20 +648,20 @@ const createSseMiddleware = () => {
// At least one container is matching the filter, so publishing the event
if (countRelatedContainers > 0) {
await resolveAndPublishMissingRefs(context, cache, channel, req, eventId, stix);
client.sendEvent(eventId, event, eventData);
await client.sendEvent(eventId, event, eventData);
cache.set(stix.id, 'hit');
}
}
} else if (isCurrentlyVisible) {
if (type === EVENT_TYPE_DELETE) {
if (publishDeletion) {
client.sendEvent(eventId, event, eventData);
await client.sendEvent(eventId, event, eventData);
cache.set(stix.id, 'hit');
}
} else { // Create and merge
const isValidResolution = await resolveAndPublishDependencies(context, noDependencies, cache, channel, req, eventId, stix);
if (isValidResolution) {
client.sendEvent(eventId, event, eventData);
await client.sendEvent(eventId, event, eventData);
cache.set(stix.id, 'hit');
}
}
Expand Down Expand Up @@ -689,7 +707,7 @@ const createSseMiddleware = () => {
const message = generateCreateMessage(instance);
const origin = { referer: EVENT_TYPE_INIT };
const eventData = { data: stixData, message, origin, version: EVENT_CURRENT_VERSION };
channel.sendEvent(eventId, EVENT_TYPE_CREATE, eventData);
await channel.sendEvent(eventId, EVENT_TYPE_CREATE, eventData);
cache.set(stixData.id, 'hit');
}
} else {
Expand Down