Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 38 additions & 30 deletions opencti-platform/opencti-graphql/src/graphql/sseMiddleware.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { once } from 'events';
import * as jsonpatch from 'fast-json-patch';
import { Promise } from 'bluebird';
import { LRUCache } from 'lru-cache';
import { now } from 'moment';
import conf, { basePath, logApp } from '../config/conf';
Expand Down Expand Up @@ -74,12 +74,8 @@ const createBroadcastClient = (channel) => {
setChannelDelay: (d) => channel.setDelay(d),
setLastEventId: (id) => channel.setLastEventId(id),
close: () => channel.close(),
sendEvent: (eventId, topic, event) => {
channel.sendEvent(eventId, topic, event);
},
sendConnected: (streamInfo) => {
channel.sendEvent(undefined, 'connected', streamInfo);
},
sendEvent: async (eventId, topic, event) => channel.sendEvent(eventId, topic, event),
sendConnected: async (streamInfo) => channel.sendEvent(undefined, 'connected', streamInfo),
};
};

Expand Down Expand Up @@ -235,20 +231,27 @@ const createSseMiddleware = () => {

const initBroadcasting = async (req, res, client, processor) => {
const broadcasterInfo = processor ? await processor.info() : {};
req.on('close', () => {
let closed = false;
const close = () => {
if (closed) {
return;
}
client.close();
delete broadcastClients[client.id];
logApp.info(`[STREAM] Closing stream processor for ${client.id}`);
processor.shutdown();
});
closed = true;
};
req.on('close', close);
res.on('close', close);
res.writeHead(200, {
Connection: 'keep-alive',
'Content-Type': 'text/event-stream; charset=utf-8',
'Access-Control-Allow-Origin': '*',
'Cache-Control': 'no-cache, no-transform', // no-transform is required for dev proxy
});
client.sendConnected({ ...broadcasterInfo, connectionId: client.id });
broadcastClients[client.id] = client;
await client.sendConnected({ ...broadcasterInfo, connectionId: client.id });
};
const createSseChannel = (req, res, startId) => {
let lastEventId = startId;
Expand All @@ -266,8 +269,8 @@ const createSseMiddleware = () => {
setLastEventId: (id) => {
lastEventId = id;
},
connected: () => !res.finished,
sendEvent: (eventId, topic, event) => {
connected: () => !res.finished && res.writable,
sendEvent: async (eventId, topic, event) => {
// Write on an already terminated response
if (res.finished || !res.writable) {
return;
Expand All @@ -293,7 +296,9 @@ const createSseMiddleware = () => {
message += '\n';
}
message += '\n';
res.write(message);
if (!res.write(message)) {
await once(res, 'drain');
}
res.flush();
},
close: () => {
Expand All @@ -309,11 +314,15 @@ const createSseMiddleware = () => {
}
},
};
const heartTimer = () => {
if (lastEventId) {
const [idTime] = lastEventId.split('-');
const idDate = utcDate(parseInt(idTime, 10)).toISOString();
channel.sendEvent(lastEventId, 'heartbeat', idDate);
const heartTimer = async () => {
try {
if (lastEventId) {
const [idTime] = lastEventId.split('-');
const idDate = utcDate(parseInt(idTime, 10)).toISOString();
await channel.sendEvent(lastEventId, 'heartbeat', idDate);
}
} catch {
// ignore
}
};
const heartbeatInterval = setInterval(heartTimer, HEARTBEAT_PERIOD);
Expand All @@ -338,7 +347,7 @@ const createSseMiddleware = () => {
const { id: eventId, event, data } = elements[index];
const instanceAccessible = await isUserCanAccessStixElement(context, user, data.data);
if (instanceAccessible) {
client.sendEvent(eventId, event, data);
await client.sendEvent(eventId, event, data);
}
}
client.setLastEventId(lastEventId);
Expand Down Expand Up @@ -395,7 +404,7 @@ const createSseMiddleware = () => {
const message = generateCreateMessage(missingInstance);
const origin = { referer: EVENT_TYPE_DEPENDENCIES };
const content = { data: missingData, message, origin, version: EVENT_CURRENT_VERSION };
channel.sendEvent(eventId, EVENT_TYPE_CREATE, content);
await channel.sendEvent(eventId, EVENT_TYPE_CREATE, content);
cache.set(missingData.id, 'hit');
await wait(channel.delay);
}
Expand All @@ -421,7 +430,7 @@ const createSseMiddleware = () => {
const message = generateCreateMessage(missingRelation);
const origin = { referer: EVENT_TYPE_DEPENDENCIES };
const content = { data: stixRelation, message, origin, version: EVENT_CURRENT_VERSION };
channel.sendEvent(eventId, EVENT_TYPE_CREATE, content);
await channel.sendEvent(eventId, EVENT_TYPE_CREATE, content);
cache.set(stixRelation.id, 'hit');
}
}
Expand All @@ -447,7 +456,6 @@ const createSseMiddleware = () => {
const entityTypeFilters = findFiltersFromKey(filters.filters, 'entity_type', 'eq');
const entityTypeFilter = entityTypeFilters.length > 0 ? entityTypeFilters[0] : undefined;
const entityTypeFilterValues = entityTypeFilter?.values ?? [];
// eslint-disable-next-line no-restricted-syntax
for (const id of entityTypeFilterValues) {
// consider the operator
if (entityTypeFilter.operator === 'not_eq') {
Expand Down Expand Up @@ -501,7 +509,7 @@ const createSseMiddleware = () => {
// From or to are visible, consider it as a dependency
const origin = { referer: EVENT_TYPE_DEPENDENCIES };
const content = { data: stix, message, origin, version: EVENT_CURRENT_VERSION };
channel.sendEvent(eventId, type, content);
await channel.sendEvent(eventId, type, content);
}
}
};
Expand Down Expand Up @@ -600,18 +608,18 @@ const createSseMiddleware = () => {
const { newDocument: previous } = jsonpatch.applyPatch(structuredClone(stix), evenContext.reverse_patch);
const isPreviouslyVisible = await isStixMatchFilterGroup(context, user, previous, streamFilters);
if (isPreviouslyVisible && !isCurrentlyVisible && publishDeletion) { // No longer visible
client.sendEvent(eventId, EVENT_TYPE_DELETE, eventData);
await client.sendEvent(eventId, EVENT_TYPE_DELETE, eventData);
cache.set(stix.id, 'hit');
} else if (!isPreviouslyVisible && isCurrentlyVisible) { // Newly visible
const isValidResolution = await resolveAndPublishDependencies(context, noDependencies, cache, channel, req, eventId, stix);
if (isValidResolution) {
client.sendEvent(eventId, EVENT_TYPE_CREATE, eventData);
await client.sendEvent(eventId, EVENT_TYPE_CREATE, eventData);
cache.set(stix.id, 'hit');
}
} else if (isCurrentlyVisible) { // Just an update
const isValidResolution = await resolveAndPublishDependencies(context, noDependencies, cache, channel, req, eventId, stix);
if (isValidResolution) {
client.sendEvent(eventId, event, eventData);
await client.sendEvent(eventId, event, eventData);
cache.set(stix.id, 'hit');
}
} else if (isRelation && publishDependencies) { // Update but not visible - relation type
Expand All @@ -630,20 +638,20 @@ const createSseMiddleware = () => {
// At least one container is matching the filter, so publishing the event
if (countRelatedContainers > 0) {
await resolveAndPublishMissingRefs(context, cache, channel, req, eventId, stix);
client.sendEvent(eventId, event, eventData);
await client.sendEvent(eventId, event, eventData);
cache.set(stix.id, 'hit');
}
}
} else if (isCurrentlyVisible) {
if (type === EVENT_TYPE_DELETE) {
if (publishDeletion) {
client.sendEvent(eventId, event, eventData);
await client.sendEvent(eventId, event, eventData);
cache.set(stix.id, 'hit');
}
} else { // Create and merge
const isValidResolution = await resolveAndPublishDependencies(context, noDependencies, cache, channel, req, eventId, stix);
if (isValidResolution) {
client.sendEvent(eventId, event, eventData);
await client.sendEvent(eventId, event, eventData);
cache.set(stix.id, 'hit');
}
}
Expand Down Expand Up @@ -689,7 +697,7 @@ const createSseMiddleware = () => {
const message = generateCreateMessage(instance);
const origin = { referer: EVENT_TYPE_INIT };
const eventData = { data: stixData, message, origin, version: EVENT_CURRENT_VERSION };
channel.sendEvent(eventId, EVENT_TYPE_CREATE, eventData);
await channel.sendEvent(eventId, EVENT_TYPE_CREATE, eventData);
cache.set(stixData.id, 'hit');
}
} else {
Expand Down