Skip to content

Commit 50b9ce7

Browse files
Merge pull request #7109 from Countly/mutation-manager-db-validation
fix: db validation in mutation manager job
2 parents 1343d74 + 07e77de commit 50b9ce7

File tree

1 file changed

+56
-8
lines changed

1 file changed

+56
-8
lines changed

api/jobs/mutationManagerJob.js

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,21 @@ const DEFAULT_JOB_CONFIG = {
1414
};
1515
let jobConfigState = { ...DEFAULT_JOB_CONFIG };
1616

17+
const MONGO_DATABASES = {
18+
countly: () => common.db,
19+
countly_drill: () => common.drillDb,
20+
countly_out: () => common.outDb
21+
};
22+
23+
/**
24+
* Get MongoDB database instance by name
25+
* @param {string} name - Database name
26+
* @returns {Db|null} MongoDB database instance
27+
*/
28+
function getMongoDbInstance(name) {
29+
return name && typeof MONGO_DATABASES[name] !== 'undefined' ? MONGO_DATABASES[name]() : null;
30+
}
31+
1732
let clickHouseRunner;
1833
try {
1934
clickHouseRunner = require('../../plugins/clickhouse/api/queries/clickhouseCoreQueries.js');
@@ -169,11 +184,20 @@ class MutationManagerJob extends Job {
169184
return;
170185
}
171186

187+
const mongoDb = getMongoDbInstance(task.db);
172188
const clickhouseEnabled = mutationManager.isClickhouseEnabled();
173189
const hasClickhouseDelete = clickhouseEnabled && !!(clickHouseRunner && clickHouseRunner.deleteGranularDataByQuery);
174190
const hasClickhouseUpdate = clickhouseEnabled && !!(clickHouseRunner && clickHouseRunner.updateGranularDataByQuery);
175191
const hasClickhouse = (type === 'update' ? hasClickhouseUpdate : hasClickhouseDelete);
176192

193+
if (!mongoDb && !hasClickhouse) {
194+
const reason = `mongo_db_unavailable:${task.db || 'missing'}`;
195+
log.e("Mutation task failed; Mongo database unavailable and ClickHouse disabled", { taskId: task._id, db: task.db });
196+
await this.markFailedOrRetry(task, reason);
197+
summary.push({ query: task.query, status: "failed", error: reason });
198+
return;
199+
}
200+
177201
if (hasClickhouse) {
178202
try {
179203
const pre = await this.shouldDeferDueToClickhousePressure({ database: task.db, table: task.collection });
@@ -192,12 +216,17 @@ class MutationManagerJob extends Job {
192216
}
193217
}
194218

195-
let mongoOk = false;
196-
if (type === 'update') {
197-
mongoOk = await this.updateMongo(task);
219+
let mongoOk = true;
220+
if (mongoDb) {
221+
if (type === 'update') {
222+
mongoOk = await this.updateMongo(task, mongoDb);
223+
}
224+
else {
225+
mongoOk = await this.deleteMongo(task, mongoDb);
226+
}
198227
}
199228
else {
200-
mongoOk = await this.deleteMongo(task);
229+
log.i("Mongo mutation skipped (unavailable database); continuing with ClickHouse only", { taskId: task._id, db: task.db });
201230
}
202231

203232
let chScheduledOk = true;
@@ -353,17 +382,26 @@ class MutationManagerJob extends Job {
353382
/**
354383
* Delete documents from MongoDB
355384
* @param {Object} task - The deletion task
385+
* @param {Object} mongoDb - MongoDB database instance
356386
*/
357-
async deleteMongo(task) {
387+
async deleteMongo(task, mongoDb) {
358388
if (!task.query || !Object.keys(task.query).length) {
359389
await this.markFailedOrRetry(task, "empty_mongo_query");
360390
return false;
361391
}
362392

393+
const targetDb = mongoDb || getMongoDbInstance(task.db);
394+
if (!targetDb) {
395+
const reason = `mongo_db_unavailable:${task.db || 'missing'}`;
396+
log.e("Mongo deletion skipped (database unavailable)", { taskId: task._id, db: task.db });
397+
await this.markFailedOrRetry(task, reason);
398+
return false;
399+
}
400+
363401
let res;
364402
const start = Date.now();
365403
try {
366-
res = await common.drillDb.collection(task.collection).deleteMany(task.query || {});
404+
res = await targetDb.collection(task.collection).deleteMany(task.query || {});
367405
}
368406
catch (err) {
369407
const duration = Date.now() - start;
@@ -380,8 +418,9 @@ class MutationManagerJob extends Job {
380418
/**
381419
* Update documents in MongoDB
382420
* @param {Object} task - The mutation task
421+
* @param {Object} mongoDb - MongoDB database instance
383422
*/
384-
async updateMongo(task) {
423+
async updateMongo(task, mongoDb) {
385424
if (!task.query || !Object.keys(task.query).length) {
386425
await this.markFailedOrRetry(task, "empty_mongo_query");
387426
return false;
@@ -390,10 +429,19 @@ class MutationManagerJob extends Job {
390429
await this.markFailedOrRetry(task, "empty_mongo_update");
391430
return false;
392431
}
432+
433+
const targetDb = mongoDb || getMongoDbInstance(task.db);
434+
if (!targetDb) {
435+
const reason = `mongo_db_unavailable:${task.db || 'missing'}`;
436+
log.e("Mongo update skipped (database unavailable)", { taskId: task._id, db: task.db });
437+
await this.markFailedOrRetry(task, reason);
438+
return false;
439+
}
440+
393441
let res;
394442
const start = Date.now();
395443
try {
396-
res = await common.drillDb.collection(task.collection).updateMany(task.query, task.update || {});
444+
res = await targetDb.collection(task.collection).updateMany(task.query, task.update || {});
397445
}
398446
catch (err) {
399447
const duration = Date.now() - start;

0 commit comments

Comments
 (0)