Skip to content

Commit 19801e7

Browse files
authored
[backend] Introduce doYield method (#12627) (#12628)
1 parent 18b8b6e commit 19801e7

File tree

7 files changed

+54
-172
lines changed

7 files changed

+54
-172
lines changed

opencti-platform/opencti-graphql/src/database/engine.js

Lines changed: 16 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import {
2424
isEmptyField,
2525
isInferredIndex,
2626
isNotEmptyField,
27-
MAX_EVENT_LOOP_PROCESSING_TIME,
2827
offsetToCursor,
2928
pascalize,
3029
READ_DATA_INDICES,
@@ -222,6 +221,7 @@ import { getPirWithAccessCheck } from '../modules/pir/pir-checkPirAccess';
222221
import { asyncFilter, asyncMap, uniqAsyncMap } from '../utils/data-processing';
223222
import { ENTITY_TYPE_PIR } from '../modules/pir/pir-types';
224223
import { isMetricsName } from '../modules/metrics/metrics-utils';
224+
import { doYield } from '../utils/eventloop-utils';
225225

226226
const ELK_ENGINE = 'elk';
227227
const OPENSEARCH_ENGINE = 'opensearch';
@@ -1575,8 +1575,8 @@ const elDataConverter = (esHit) => {
15751575
export const elConvertHitsToMap = async (elements, opts) => {
15761576
const { mapWithAllIds = false } = opts;
15771577
const convertedHitsMap = {};
1578-
let startProcessingTime = new Date().getTime();
15791578
for (let n = 0; n < elements.length; n += 1) {
1579+
await doYield();
15801580
const element = elements[n];
15811581
convertedHitsMap[element.internal_id] = element;
15821582
if (mapWithAllIds) {
@@ -1589,13 +1589,6 @@ export const elConvertHitsToMap = async (elements, opts) => {
15891589
convertedHitsMap[id] = element;
15901590
});
15911591
}
1592-
// Prevent event loop locking more than MAX_EVENT_LOOP_PROCESSING_TIME
1593-
if (new Date().getTime() - startProcessingTime > MAX_EVENT_LOOP_PROCESSING_TIME) {
1594-
startProcessingTime = new Date().getTime();
1595-
await new Promise((resolve) => {
1596-
setImmediate(resolve);
1597-
});
1598-
}
15991592
}
16001593
return convertedHitsMap;
16011594
};
@@ -3841,19 +3834,12 @@ const buildRegardingOfFilter = async (context, user, elements, filters) => {
38413834
sideIdManualInferred.set(sideId, toTypes);
38423835
}
38433836
};
3844-
let startProcessingTime = new Date().getTime();
38453837
for (let relIndex = 0; relIndex < relationships.length; relIndex += 1) {
3838+
await doYield();
38463839
const relation = relationships[relIndex];
38473840
const relType = isInferredIndex(relation._index) ? 'inferred' : 'manual';
38483841
addTypeSide(relation.fromId, relType);
38493842
addTypeSide(relation.toId, relType);
3850-
// Prevent event loop locking more than MAX_EVENT_LOOP_PROCESSING_TIME
3851-
if (new Date().getTime() - startProcessingTime > MAX_EVENT_LOOP_PROCESSING_TIME) {
3852-
startProcessingTime = new Date().getTime();
3853-
await new Promise((resolve) => {
3854-
setImmediate(resolve);
3855-
});
3856-
}
38573843
}
38583844
}
38593845
return (element) => {
@@ -4182,24 +4168,17 @@ export const elRemoveRelationConnection = async (context, user, elementsImpact,
41824168
const elIdsCache = {};
41834169
const indexCache = {};
41844170
const pirInformationCache = {};
4185-
let startProcessingTime = new Date().getTime();
41864171
for (let idIndex = 0; idIndex < dataIds.length; idIndex += 1) {
4172+
await doYield();
41874173
const element = dataIds[idIndex];
41884174
elIdsCache[element.internal_id] = element._id;
41894175
indexCache[element.internal_id] = element._index;
41904176
pirInformationCache[element.internal_id] = element.pir_information;
4191-
// Prevent event loop locking more than MAX_EVENT_LOOP_PROCESSING_TIME
4192-
if (new Date().getTime() - startProcessingTime > MAX_EVENT_LOOP_PROCESSING_TIME) {
4193-
startProcessingTime = new Date().getTime();
4194-
await new Promise((resolve) => {
4195-
setImmediate(resolve);
4196-
});
4197-
}
41984177
}
41994178
// Split by max operations, create the bulk
42004179
const groupsOfImpacts = R.splitEvery(MAX_BULK_OPERATIONS, impacts);
4201-
startProcessingTime = new Date().getTime();
42024180
for (let i = 0; i < groupsOfImpacts.length; i += 1) {
4181+
await doYield();
42034182
const impactsBulk = groupsOfImpacts[i];
42044183
const bodyUpdateRaw = impactsBulk.map(([impactId, elementMeta]) => {
42054184
return Object.entries(elementMeta).map(([typeAndIndex, cleanupIds]) => {
@@ -4256,22 +4235,15 @@ export const elRemoveRelationConnection = async (context, user, elementsImpact,
42564235
if (bodyUpdate.length > 0) {
42574236
await elBulk({ refresh: forceRefresh, timeout: BULK_TIMEOUT, body: bodyUpdate });
42584237
}
4259-
// Prevent event loop locking more than MAX_EVENT_LOOP_PROCESSING_TIME
4260-
if (new Date().getTime() - startProcessingTime > MAX_EVENT_LOOP_PROCESSING_TIME) {
4261-
startProcessingTime = new Date().getTime();
4262-
await new Promise((resolve) => {
4263-
setImmediate(resolve);
4264-
});
4265-
}
42664238
}
42674239
}
42684240
};
42694241

42704242
export const computeDeleteElementsImpacts = async (cleanupRelations, toBeRemovedIds, relationsToRemoveMap) => {
42714243
// Update all rel connections that will remain
42724244
const elementsImpact = {};
4273-
let startProcessingTime = new Date().getTime();
42744245
for (let i = 0; i < cleanupRelations.length; i += 1) {
4246+
await doYield();
42754247
const relation = cleanupRelations[i];
42764248
const fromWillNotBeRemoved = !relationsToRemoveMap.has(relation.fromId) && !toBeRemovedIds.includes(relation.fromId);
42774249
const isFromCleanup = fromWillNotBeRemoved && isImpactedTypeAndSide(relation.entity_type, relation.fromType, relation.toType, ROLE_FROM);
@@ -4303,13 +4275,6 @@ export const computeDeleteElementsImpacts = async (cleanupRelations, toBeRemoved
43034275
}
43044276
}
43054277
}
4306-
// Prevent event loop locking more than MAX_EVENT_LOOP_PROCESSING_TIME
4307-
if (new Date().getTime() - startProcessingTime > MAX_EVENT_LOOP_PROCESSING_TIME) {
4308-
startProcessingTime = new Date().getTime();
4309-
await new Promise((resolve) => {
4310-
setImmediate(resolve);
4311-
});
4312-
}
43134278
}
43144279
return elementsImpact;
43154280
};
@@ -4528,15 +4493,22 @@ const createDeleteOperationElement = async (context, user, mainElement, deletedE
45284493
export const prepareElementForIndexing = async (element) => {
45294494
const thing = {};
45304495
const keyItems = Object.keys(element);
4531-
let startProcessingTime = new Date().getTime();
45324496
for (let index = 0; index < keyItems.length; index += 1) {
4497+
await doYield();
45334498
const key = keyItems[index];
45344499
const value = element[key];
45354500
if (Array.isArray(value)) { // Array of Date, objects, string or number
45364501
const preparedArray = [];
4537-
let innerProcessingTime = new Date().getTime();
4538-
let extendLoopSplit = 0;
4502+
let yieldCount = 0;
45394503
for (let valueIndex = 0; valueIndex < value.length; valueIndex += 1) {
4504+
if (await doYield()) {
4505+
// If we extend the preparation 5 times, log a warn
4506+
// It will help to understand what kind of key have so many elements
4507+
if (yieldCount === 5) {
4508+
logApp.warn('[ENGINE] Element preparation too many values', { id: element.id, key, size: value.length });
4509+
}
4510+
yieldCount += 1;
4511+
}
45404512
const valueElement = value[valueIndex];
45414513
if (valueElement) {
45424514
if (isDateAttribute(key)) { // Date is an object but natively supported
@@ -4551,19 +4523,6 @@ export const prepareElementForIndexing = async (element) => {
45514523
preparedArray.push(valueElement);
45524524
}
45534525
}
4554-
// Prevent event loop locking more than MAX_EVENT_LOOP_PROCESSING_TIME
4555-
if (new Date().getTime() - innerProcessingTime > MAX_EVENT_LOOP_PROCESSING_TIME) {
4556-
// If we extends the preparation 5 times, log a warn
4557-
// It will help to understand what kind of key have so much elements
4558-
if (extendLoopSplit === 5) {
4559-
logApp.warn('[ENGINE] Element preparation too many values', { id: element.id, key, size: value.length });
4560-
}
4561-
extendLoopSplit += 1;
4562-
innerProcessingTime = new Date().getTime();
4563-
await new Promise((resolve) => {
4564-
setImmediate(resolve);
4565-
});
4566-
}
45674526
}
45684527
thing[key] = preparedArray;
45694528
} else if (isDateAttribute(key)) { // Date is an object but natively supported
@@ -4579,13 +4538,6 @@ export const prepareElementForIndexing = async (element) => {
45794538
} else { // For all other types (numeric, ...), no transform
45804539
thing[key] = value;
45814540
}
4582-
// Prevent event loop locking more than MAX_EVENT_LOOP_PROCESSING_TIME
4583-
if (new Date().getTime() - startProcessingTime > MAX_EVENT_LOOP_PROCESSING_TIME) {
4584-
startProcessingTime = new Date().getTime();
4585-
await new Promise((resolve) => {
4586-
setImmediate(resolve);
4587-
});
4588-
}
45894541
}
45904542
return thing;
45914543
};

opencti-platform/opencti-graphql/src/database/middleware.js

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import {
3232
isInferredIndex,
3333
isNotEmptyField,
3434
isObjectPathTargetMultipleAttribute,
35-
MAX_EVENT_LOOP_PROCESSING_TIME,
3635
READ_DATA_INDICES,
3736
READ_DATA_INDICES_INFERRED,
3837
READ_INDEX_HISTORY,
@@ -237,6 +236,7 @@ import { RELATION_ACCESSES_TO } from '../schema/internalRelationship';
237236
import { generateVulnerabilitiesUpdates } from '../utils/vulnerabilities';
238237
import { idLabel } from '../schema/schema-labels';
239238
import { pirExplanation } from '../modules/attributes/internalRelationship-registrationAttributes';
239+
import { doYield } from '../utils/eventloop-utils';
240240

241241
// region global variables
242242
const MAX_BATCH_SIZE = nconf.get('elasticsearch:batch_loader_max_size') ?? 300;
@@ -369,22 +369,15 @@ const loadElementMetaDependencies = async (context, user, elements, args = {}) =
369369
const [key, values] = entries[index];
370370
const invalidRelations = [];
371371
const resolvedElementsWithRelation = [];
372-
let startProcessingTime = new Date().getTime();
373372
for (let valueIndex = 0; valueIndex < values.length; valueIndex += 1) {
373+
await doYield();
374374
const v = values[valueIndex];
375375
const resolvedElement = toResolvedElements[v.toId];
376376
if (resolvedElement) {
377377
resolvedElementsWithRelation.push({ ...resolvedElement, i_relation: v });
378378
} else {
379379
invalidRelations.push({ relation_id: v.id, target_id: v.toId });
380380
}
381-
// Prevent event loop locking more than MAX_EVENT_LOOP_PROCESSING_TIME
382-
if (new Date().getTime() - startProcessingTime > MAX_EVENT_LOOP_PROCESSING_TIME) {
383-
startProcessingTime = new Date().getTime();
384-
await new Promise((resolve) => {
385-
setImmediate(resolve);
386-
});
387-
}
388381
}
389382
if (invalidRelations.length > 0) {
390383
// Some targets can be unresolved in case of potential inconsistency between relation and target
@@ -440,8 +433,8 @@ export const loadElementsWithDependencies = async (context, user, elements, opts
440433
}
441434
const [fromAndToMap, depsElementsMap, fileMarkingsMap] = await Promise.all([fromAndToPromise, depsPromise, fileMarkingsPromise]);
442435
const loadedElements = [];
443-
let startProcessingTime = new Date().getTime();
444436
for (let i = 0; i < elements.length; i += 1) {
437+
await doYield();
445438
const element = elements[i];
446439
const files = [];
447440
if (isNotEmptyField(element.x_opencti_files) && isNotEmptyField(fileMarkingsMap)) {
@@ -482,13 +475,6 @@ export const loadElementsWithDependencies = async (context, user, elements, opts
482475
} else {
483476
loadedElements.push(R.mergeRight(element, { ...deps }));
484477
}
485-
// Prevent event loop locking more than MAX_EVENT_LOOP_PROCESSING_TIME
486-
if (new Date().getTime() - startProcessingTime > MAX_EVENT_LOOP_PROCESSING_TIME) {
487-
startProcessingTime = new Date().getTime();
488-
await new Promise((resolve) => {
489-
setImmediate(resolve);
490-
});
491-
}
492478
}
493479
return loadedElements;
494480
};
@@ -867,7 +853,6 @@ const inputResolveRefs = async (context, user, input, type, entitySetting) => {
867853
}
868854
const resolutionsMap = new Map();
869855
const resolvedIds = new Set();
870-
let startProcessingTime = new Date().getTime();
871856
for (let i = 0; i < resolvedElements.length; i += 1) {
872857
const resolvedElement = resolvedElements[i];
873858
const instanceIds = getInstanceIds(resolvedElement);
@@ -879,17 +864,11 @@ const inputResolveRefs = async (context, user, input, type, entitySetting) => {
879864
}
880865
});
881866
for (let configIndex = 0; configIndex < matchingConfigs.length; configIndex += 1) {
867+
await doYield();
882868
const c = matchingConfigs[configIndex];
883869
const data = { ...resolvedElement, i_group: c };
884870
const dataKey = `${resolvedElement.internal_id}|${c.destKey}`;
885871
resolutionsMap.set(dataKey, data);
886-
// Prevent event loop locking more than MAX_EVENT_LOOP_PROCESSING_TIME
887-
if (new Date().getTime() - startProcessingTime > MAX_EVENT_LOOP_PROCESSING_TIME) {
888-
startProcessingTime = new Date().getTime();
889-
await new Promise((resolve) => {
890-
setImmediate(resolve);
891-
});
892-
}
893872
}
894873
}
895874
const groupByTypeElements = R.groupBy((e) => e.i_group.destKey, resolutionsMap.values());

opencti-platform/opencti-graphql/src/database/utils.js

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@ import { isStixRefRelationship, RELATION_OBJECT_MARKING } from '../schema/stixRe
1515
import { schemaAttributesDefinition } from '../schema/schema-attributes';
1616
import { getDraftContext } from '../utils/draftContext';
1717
import { INPUT_OBJECTS } from '../schema/general';
18+
import { doYield } from '../utils/eventloop-utils';
1819

1920
export const ES_INDEX_PREFIX = conf.get('elasticsearch:index_prefix') || 'opencti';
2021
const rabbitmqPrefix = conf.get('rabbitmq:queue_prefix');
2122
export const RABBIT_QUEUE_PREFIX = rabbitmqPrefix ? `${rabbitmqPrefix}_` : '';
2223

23-
export const MAX_EVENT_LOOP_PROCESSING_TIME = 50;
24-
2524
export const REDACTED_INFORMATION = '*** Redacted ***';
2625
export const RESTRICTED_INFORMATION = 'Restricted';
2726

@@ -390,18 +389,11 @@ export const isObjectPathTargetMultipleAttribute = (instance, object_path) => {
390389

391390
export const asyncListTransformation = async (elements, preparatoryFunction, opts = {}) => {
392391
const preparedElements = [];
393-
let startProcessingTime = new Date().getTime();
394392
for (let n = 0; n < elements.length; n += 1) {
393+
await doYield();
395394
const element = elements[n];
396395
const preparedElement = await preparatoryFunction(element, opts);
397396
preparedElements.push(preparedElement);
398-
// Prevent event loop locking more than MAX_EVENT_LOOP_PROCESSING_TIME
399-
if (new Date().getTime() - startProcessingTime > MAX_EVENT_LOOP_PROCESSING_TIME) {
400-
startProcessingTime = new Date().getTime();
401-
await new Promise((resolve) => {
402-
setImmediate(resolve);
403-
});
404-
}
405397
}
406398
return preparedElements;
407399
};

opencti-platform/opencti-graphql/src/domain/user.js

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,7 @@ import {
3232
} from '../database/middleware-loader';
3333
import { delEditContext, notify, setEditContext } from '../database/redis';
3434
import { killUserSessions } from '../database/session';
35-
import {
36-
buildPagination,
37-
isEmptyField,
38-
isNotEmptyField,
39-
MAX_EVENT_LOOP_PROCESSING_TIME,
40-
READ_INDEX_INTERNAL_OBJECTS,
41-
READ_INDEX_STIX_DOMAIN_OBJECTS,
42-
READ_RELATIONSHIPS_INDICES
43-
} from '../database/utils';
35+
import { buildPagination, isEmptyField, isNotEmptyField, READ_INDEX_INTERNAL_OBJECTS, READ_INDEX_STIX_DOMAIN_OBJECTS, READ_RELATIONSHIPS_INDICES } from '../database/utils';
4436
import { extractEntityRepresentativeName } from '../database/entity-representative';
4537
import { publishUserAction } from '../listener/UserActionListener';
4638
import { authorizedMembers } from '../schema/attribute-definition';
@@ -90,6 +82,7 @@ import { addServiceAccountIntoUserCount, addUserEmailSendCount, addUserIntoServi
9082
import { sendMail } from '../database/smtp';
9183
import { checkEnterpriseEdition } from '../enterprise-edition/ee';
9284
import { ENTITY_TYPE_EMAIL_TEMPLATE } from '../modules/emailTemplate/emailTemplate-types';
85+
import { doYield } from '../utils/eventloop-utils';
9386

9487
const BEARER = 'Bearer ';
9588
const BASIC = 'Basic ';
@@ -1473,8 +1466,8 @@ export const buildCompleteUsers = async (context, clients) => {
14731466
const groupsRoles = new Map();
14741467
const groupsMarkings = new Map();
14751468
const rolesCapabilities = new Map();
1476-
let startProcessingTime = new Date().getTime();
14771469
for (let index = 0; index < relations.length; index += 1) {
1470+
await doYield();
14781471
const { fromId, entity_type, toId } = relations[index];
14791472
// group <- RELATION_ACCESSES_TO -> marking
14801473
if (entity_type === RELATION_ACCESSES_TO) {
@@ -1537,13 +1530,6 @@ export const buildCompleteUsers = async (context, clients) => {
15371530
groupsRoles.set(fromId, [toId]);
15381531
}
15391532
}
1540-
// Prevent event loop locking more than MAX_EVENT_LOOP_PROCESSING_TIME
1541-
if (new Date().getTime() - startProcessingTime > MAX_EVENT_LOOP_PROCESSING_TIME) {
1542-
startProcessingTime = new Date().getTime();
1543-
await new Promise((resolve) => {
1544-
setImmediate(resolve);
1545-
});
1546-
}
15471533
}
15481534
const ids = [...Array.from(groupIds), ...Array.from(roleIds), ...Array.from(organizationIds), ...Array.from(capabilityIds)];
15491535
const resolvedObject = await internalFindByIds(context, SYSTEM_USER, ids, { toMap: true });

0 commit comments

Comments
 (0)