Skip to content

Commit 575ec79

Browse files
authored
Merge pull request #6838 from Countly/deletion-manager-observability
fix: restructuring
2 parents b7ff356 + 0f26a50 commit 575ec79

File tree

2 files changed

+52
-100
lines changed

2 files changed

+52
-100
lines changed

api/jobs/deletionManagerJob.js

Lines changed: 11 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,6 @@ class DeletionManagerJob extends Job {
4949
return 1;
5050
}
5151

52-
/**
53-
* Check if ClickHouse is enabled
54-
* @returns {boolean} true if ClickHouse is enabled, false otherwise
55-
*/
56-
isClickhouseEnabled() {
57-
return !!(common.queryRunner
58-
&& typeof common.queryRunner.isAdapterAvailable === 'function'
59-
&& common.queryRunner.isAdapterAvailable('clickhouse'));
60-
}
61-
6252
/**
6353
* Run the job
6454
* @param {done} done callback
@@ -69,7 +59,7 @@ class DeletionManagerJob extends Job {
6959
const summary = [];
7060

7161
try {
72-
if (this.isClickhouseEnabled()) {
62+
if (deletionManager.isClickhouseEnabled()) {
7363
const pre = await this.shouldDeferDueToClickhousePressure();
7464
if (pre && pre.defer) {
7565
log.d("Run deferred due to ClickHouse pressure", pre || {});
@@ -120,7 +110,7 @@ class DeletionManagerJob extends Job {
120110
if (task.db === "countly_drill" && task.collection === "drill_events") {
121111
const mongoOk = await this.deleteMongo(task);
122112
let chScheduledOk = true;
123-
const clickhouseEnabled = this.isClickhouseEnabled();
113+
const clickhouseEnabled = deletionManager.isClickhouseEnabled();
124114
const hasClickhouse = clickhouseEnabled && !!(clickHouseRunner && clickHouseRunner.deleteGranularDataByQuery);
125115
if (hasClickhouse) {
126116
chScheduledOk = await this.deleteClickhouse(task);
@@ -167,7 +157,7 @@ class DeletionManagerJob extends Job {
167157
}
168158
}
169159

170-
if (this.isClickhouseEnabled()) {
160+
if (deletionManager.isClickhouseEnabled()) {
171161
await this.processAwaitingValidation(summary);
172162
}
173163

@@ -211,8 +201,8 @@ class DeletionManagerJob extends Job {
211201

212202
for (const task of awaiting) {
213203
try {
214-
if (task.db === "countly_drill" && task.collection === "drill_events" && clickHouseRunner && clickHouseRunner.getMutationStatus) {
215-
const status = await this.getClickhouseMutationStatus(task);
204+
if (task.db === "countly_drill" && task.collection === "drill_events" && chHealth && typeof chHealth.getMutationStatus === 'function') {
205+
const status = await chHealth.getMutationStatus({ validation_command_id: task.validation_command_id, table: task.collection, database: task.db });
216206
if (status && status.is_done) {
217207
await common.db.collection("deletion_manager").updateOne(
218208
{ _id: task._id },
@@ -355,34 +345,6 @@ class DeletionManagerJob extends Job {
355345
}
356346
}
357347

358-
/**
359-
* Retrieves ClickHouse mutation status for a task via system.mutations.
360-
* @param {object} task - Deletion task
361-
* @returns {Promise<object|null>} Object with keys: is_done:boolean, is_killed:boolean, latest_fail_reason:string|null
362-
*/
363-
async getClickhouseMutationStatus(task) {
364-
if (!common.queryRunner) {
365-
return null;
366-
}
367-
const commandId = task.validation_command_id;
368-
const queryDef = {
369-
name: 'GET_MUTATION_STATUS',
370-
adapters: {
371-
clickhouse: {
372-
handler: clickHouseRunner.getMutationStatus
373-
}
374-
}
375-
};
376-
try {
377-
const res = await common.queryRunner.executeQuery(queryDef, { validation_command_id: commandId, table: task.collection, database: task.db }, {});
378-
return res ? { is_done: !!res.is_done, is_killed: !!res.is_killed, latest_fail_reason: res.latest_fail_reason || null } : null;
379-
}
380-
catch (err) {
381-
log.e("getClickhouseMutationStatus failed", { taskId: task._id, error: err?.message || err + "" });
382-
return null;
383-
}
384-
}
385-
386348
/**
387349
* Marks a task as failed or schedules it for a retry based on the number of previous failures.
388350
* @param {Object} task - The task object to update.
@@ -428,20 +390,15 @@ class DeletionManagerJob extends Job {
428390
* @returns {Promise<Object|null>} Object with metrics or null if fetch failed
429391
*/
430392
async getClickhouseLoadMetrics() {
431-
if (!common.queryRunner || !clickHouseRunner || !clickHouseRunner.getClickhouseHealthMetrics) {
393+
if (!chHealth || typeof chHealth.getOperationalSnapshot !== 'function') {
432394
return null;
433395
}
434396
try {
435-
const queryDef = {
436-
name: 'GET_CH_HEALTH_METRICS',
437-
adapters: {
438-
clickhouse: {
439-
handler: clickHouseRunner.getClickhouseHealthMetrics
440-
}
441-
}
442-
};
443-
const res = await common.queryRunner.executeQuery(queryDef, {}, {});
444-
return res || null;
397+
const snapshot = await chHealth.getOperationalSnapshot({ database: 'countly_drill', table: 'drill_events' });
398+
return snapshot ? {
399+
max_parts_per_partition: snapshot.max_parts_per_partition,
400+
total_merge_tree_parts: snapshot.total_merge_tree_parts
401+
} : null;
445402
}
446403
catch (e) {
447404
log.e('CH health metrics fetch failed', e?.message || e + "");

api/utils/deletionManager.js

Lines changed: 41 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ const common = require('./common.js'),
33
plugins = require('../../plugins/pluginManager.js'),
44
manager = {};
55

6-
let clickHouseRunner;
6+
let chHealth = null;
77
try {
8-
clickHouseRunner = require('../../plugins/clickhouse/api/queries/clickhouseCoreQueries.js');
8+
chHealth = require('../../plugins/clickhouse/api/health.js');
99
}
10-
catch (e) {
11-
clickHouseRunner = null;
10+
catch {
11+
//
1212
}
1313

1414
(function() {
@@ -20,6 +20,16 @@ catch (e) {
2020
DELETED: "deleted"
2121
};
2222

23+
/**
24+
* Check if ClickHouse is enabled
25+
* @returns {boolean} true if ClickHouse is enabled, false otherwise
26+
*/
27+
manager.isClickhouseEnabled = function() {
28+
return !!(common.queryRunner
29+
&& typeof common.queryRunner.isAdapterAvailable === 'function'
30+
&& common.queryRunner.isAdapterAvailable('clickhouse'));
31+
};
32+
2333
plugins.register("/master", function() {
2434
common.db.collection('deletion_manager').ensureIndex({"deletion_completion_ts": 1}, {expireAfterSeconds: 3 * 24 * 60 * 60}, function() {});
2535
});
@@ -50,23 +60,27 @@ catch (e) {
5060
plugins.register('/system/observability/collect', async function() {
5161
try {
5262
const summary = await getQueueSummary();
53-
const mutations = await getPendingMutationsFromCH();
63+
64+
const metrics = {
65+
summary: {
66+
queued: summary.queued,
67+
running: summary.running,
68+
awaiting_validation: summary.awaiting_validation,
69+
failed: summary.failed,
70+
deleted: summary.deleted,
71+
oldest_wait_sec: summary.oldest_wait_sec
72+
}
73+
};
74+
75+
if (manager.isClickhouseEnabled()) {
76+
metrics.mutations = await getPendingMutationsFromCH();
77+
}
5478

5579
return {
5680
provider: 'deletion',
5781
healthy: summary.failed === 0,
5882
issues: [],
59-
metrics: {
60-
summary: {
61-
queued: summary.queued,
62-
running: summary.running,
63-
awaiting_validation: summary.awaiting_validation,
64-
failed: summary.failed,
65-
deleted: summary.deleted,
66-
oldest_wait_sec: summary.oldest_wait_sec
67-
},
68-
mutations
69-
},
83+
metrics,
7084
date: new Date().toISOString()
7185
};
7286
}
@@ -134,41 +148,22 @@ catch (e) {
134148
* @returns {Promise<Object>} - Operational snapshot
135149
*/
136150
async function getPendingMutationsFromCH() {
137-
if (!clickHouseRunner) {
138-
return { pending: 0, details: [], error: 'clickHouseRunner_not_available'};
139-
}
140-
const res = await clickHouseRunner.listPendingMutations({ database: 'countly_drill', table: 'drill_events' });
141-
const rows = res && res.data ? res.data : [];
142-
return {
143-
pending: rows.length,
144-
details: rows.map(r => ({
145-
command: r.command + '',
146-
is_killed: r.is_killed === 1 || r.is_killed === '1',
147-
latest_fail_reason: r.latest_fail_reason || null
148-
}))
149-
};
150-
}
151-
152-
manager.getDeletionClickhousePressureLimits = async function() {
153151
try {
154-
const doc = await common.db.collection('plugins').findOne(
155-
{ _id: 'plugins' },
156-
{ projection: { 'deletion_manager.ch_max_parts_per_partition': 1, 'deletion_manager.ch_max_total_mergetree_parts': 1 } }
157-
);
158-
const dmCfg = (doc && doc.deletion_manager) ? doc.deletion_manager : {};
152+
const rows = await chHealth.listPendingMutations({ database: 'countly_drill', table: 'drill_events' });
153+
const list = Array.isArray(rows) ? rows : [];
159154
return {
160-
CH_MAX_PARTS_PER_PARTITION: Number(dmCfg.ch_max_parts_per_partition) || 1000,
161-
CH_MAX_TOTAL_MERGETREE_PARTS: Number(dmCfg.ch_max_total_mergetree_parts) || 100000
155+
pending: list.length,
156+
details: list.map(r => ({
157+
command: r.command + '',
158+
is_killed: r.is_killed === 1 || r.is_killed === '1',
159+
latest_fail_reason: r.latest_fail_reason || null
160+
}))
162161
};
163162
}
164163
catch (e) {
165-
log.e('Failed to load deletion manager thresholds from DB, using defaults', e && e.message ? e.message : e);
166-
return {
167-
CH_MAX_PARTS_PER_PARTITION: 1000,
168-
CH_MAX_TOTAL_MERGETREE_PARTS: 100000
169-
};
164+
return { pending: 0, details: [], error: 'clickhouse_health_unavailable' };
170165
}
171-
};
166+
}
172167
})(manager);
173168

174-
module.exports = manager;
169+
module.exports = manager;

0 commit comments

Comments
 (0)