From d497750efb46bd78b910bbae20309e052d341129 Mon Sep 17 00:00:00 2001 From: Amit Prinz Setter Date: Thu, 12 Jun 2025 10:36:18 -0700 Subject: [PATCH 1/2] system store - reduce load time (part of performance effort 4.20) Signed-off-by: Amit Prinz Setter --- config.js | 4 ++-- src/server/bg_services/md_aggregator.js | 6 +++--- src/server/system_services/system_store.js | 10 +++++++--- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/config.js b/config.js index 5a270ae03c..8febed46f1 100644 --- a/config.js +++ b/config.js @@ -235,8 +235,8 @@ config.ROOT_KEY_MOUNT = '/etc/noobaa-server/root_keys'; config.DB_TYPE = /** @type {nb.DBType} */ (process.env.DB_TYPE || 'postgres'); -config.POSTGRES_DEFAULT_MAX_CLIENTS = 10; -config.POSTGRES_MD_MAX_CLIENTS = (process.env.LOCAL_MD_SERVER === 'true') ? 70 : 10; +config.POSTGRES_DEFAULT_MAX_CLIENTS = 5; +config.POSTGRES_MD_MAX_CLIENTS = (process.env.LOCAL_MD_SERVER === 'true') ? 70 : 5; /////////////////// // SYSTEM CONFIG // diff --git a/src/server/bg_services/md_aggregator.js b/src/server/bg_services/md_aggregator.js index 8958af7ef0..5d3372523c 100644 --- a/src/server/bg_services/md_aggregator.js +++ b/src/server/bg_services/md_aggregator.js @@ -68,7 +68,7 @@ async function run_md_aggregator(md_store, system_store, target_now, delay) { }); if (changes) { const update = _.omit(changes, 'more_updates'); - await system_store.make_changes({ update }); + await system_store.make_changes({ update }, false); update_range = !changes.more_updates; if (update_range) { await system_store.make_changes({ @@ -78,7 +78,7 @@ async function run_md_aggregator(md_store, system_store, target_now, delay) { global_last_update: range.till_time, }] } - }); + }, false); } await P.delay(delay); } else { @@ -206,7 +206,7 @@ function find_next_range({ }, })) } - }); + }, false); } // on normal operation the time_diff to close can be closed within a single MD_AGGREGATOR_INTERVAL diff --git a/src/server/system_services/system_store.js b/src/server/system_services/system_store.js index 12d865883a..33c8c0bbd6 100644 --- a/src/server/system_services/system_store.js +++ b/src/server/system_services/system_store.js @@ -394,17 +394,21 @@ class SystemStore extends EventEmitter { if (this.data) { load_time = this.data.time; } + let res; const since_load = Date.now() - load_time; if (since_load < this.START_REFRESH_THRESHOLD) { - return this.data; + res = this.data; } else if (since_load < this.FORCE_REFRESH_THRESHOLD) { dbg.warn(`system_store.refresh: system_store.data.time > START_REFRESH_THRESHOLD, since_load = ${since_load}, START_REFRESH_THRESHOLD = ${this.START_REFRESH_THRESHOLD}`); this.load().catch(_.noop); - return this.data; + res = this.data; } else { dbg.warn(`system_store.refresh: system_store.data.time > FORCE_REFRESH_THRESHOLD, since_load = ${since_load}, FORCE_REFRESH_THRESHOLD = ${this.FORCE_REFRESH_THRESHOLD}`); - return this.load(); + res = this.load(); } + //call refresh periodically + P.delay_unblocking(this.START_REFRESH_THRESHOLD).then(this.refresh); + return res; } async load(since) { From 768227bb55ed0e4df441dc6982594cb7451b92cf Mon Sep 17 00:00:00 2001 From: Amit Prinz Setter Date: Wed, 9 Jul 2025 10:43:55 -0700 Subject: [PATCH 2/2] system store - endpoints load from core instead of db Signed-off-by: Amit Prinz Setter --- src/api/system_api.js | 7 +++ src/endpoint/endpoint.js | 2 +- src/server/bg_services/cluster_hb.js | 2 +- src/server/system_services/system_server.js | 10 +++- src/server/system_services/system_store.js | 59 +++++++++++++++++---- src/util/postgres_client.js | 1 + 6 files changed, 69 insertions(+), 12 deletions(-) diff --git a/src/api/system_api.js b/src/api/system_api.js index c8b627b4f7..f054243f4f 100644 --- a/src/api/system_api.js +++ b/src/api/system_api.js @@ -459,6 +459,13 @@ module.exports = { auth: { system: 'admin' } + }, + + get_system_store: { + method: 'GET', + auth: { + system: false + } } }, diff --git a/src/endpoint/endpoint.js b/src/endpoint/endpoint.js index 8787a40ad7..7a169b13a6 100755 --- a/src/endpoint/endpoint.js +++ b/src/endpoint/endpoint.js @@ -171,7 +171,7 @@ async function main(options = {}) { // Load a system store instance for the current process and register for changes. // We do not wait for it in becasue the result or errors are not relevent at // this point (and should not kill the process); - system_store.get_instance().load(); + system_store.get_instance({source: system_store.SOURCE.CORE}).load(); // Register the process as an md_server. await md_server.register_rpc(); } diff --git a/src/server/bg_services/cluster_hb.js b/src/server/bg_services/cluster_hb.js index 3d1d58c582..5af0d4df41 100644 --- a/src/server/bg_services/cluster_hb.js +++ b/src/server/bg_services/cluster_hb.js @@ -93,7 +93,7 @@ function do_heartbeat({ skip_server_monitor } = {}) { update: { clusters: [update] } - }, false); + }, true); }); }) .then(() => { diff --git a/src/server/system_services/system_server.js b/src/server/system_services/system_server.js index ef46b53229..2f3524130b 100644 --- a/src/server/system_services/system_server.js +++ b/src/server/system_services/system_server.js @@ -19,7 +19,7 @@ const config = require('../../../config'); const { BucketStatsStore } = require('../analytic_services/bucket_stats_store'); const { EndpointStatsStore } = require('../analytic_services/endpoint_stats_store'); const os_utils = require('../../util/os_utils'); -const { RpcError } = require('../../rpc'); +const { RpcError, RPC_BUFFERS } = require('../../rpc'); const nb_native = require('../../util/nb_native'); const Dispatcher = require('../notifications/dispatcher'); const size_utils = require('../../util/size_utils'); @@ -298,6 +298,12 @@ function get_system_status(req) { }; } +async function get_system_store() { + return { + //[RPC_BUFFERS]: Buffer.from(JSON.stringify(await system_store.recent_db_data())), + [RPC_BUFFERS]: {data: Buffer.from(JSON.stringify(await system_store.recent_db_data()))}, + }; +} async function _update_system_state(system_id, mode) { const update = { @@ -1595,3 +1601,5 @@ exports.rotate_master_key = rotate_master_key; exports.disable_master_key = disable_master_key; exports.enable_master_key = enable_master_key; exports.upgrade_master_keys = upgrade_master_keys; + +exports.get_system_store = get_system_store; diff --git a/src/server/system_services/system_store.js b/src/server/system_services/system_store.js index 33c8c0bbd6..ab1ac157e5 100644 --- a/src/server/system_services/system_store.js +++ b/src/server/system_services/system_store.js @@ -38,8 +38,9 @@ const size_utils = require('../../util/size_utils'); const os_utils = require('../../util/os_utils'); const config = require('../../../config'); const db_client = require('../../util/db_client'); +const { decode_json } = require('../../util/postgres_client'); -const { RpcError } = require('../../rpc'); +const { RpcError, RPC_BUFFERS } = require('../../rpc'); const master_key_manager = require('./master_key_manager'); const COLLECTIONS = [{ @@ -152,6 +153,10 @@ const COLLECTIONS_BY_NAME = _.keyBy(COLLECTIONS, 'name'); const accounts_by_email_lowercase = []; +const SOURCE = Object.freeze({ + DB: 'DB', + CORE: 'CORE', +}); /** * @@ -352,6 +357,8 @@ class SystemStore extends EventEmitter { this.START_REFRESH_THRESHOLD = 10 * 60 * 1000; this.FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000; this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5; + this.source = (process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") === -1) ? SOURCE.DB : SOURCE.CORE; + dbg.log0("system store source is", this.source); this._load_serial = new semaphore.Semaphore(1, { warning_timeout: this.START_REFRESH_THRESHOLD }); for (const col of COLLECTIONS) { db_client.instance().define_collection(col); @@ -406,8 +413,6 @@ class SystemStore extends EventEmitter { dbg.warn(`system_store.refresh: system_store.data.time > FORCE_REFRESH_THRESHOLD, since_load = ${since_load}, FORCE_REFRESH_THRESHOLD = ${this.FORCE_REFRESH_THRESHOLD}`); res = this.load(); } - //call refresh periodically - P.delay_unblocking(this.START_REFRESH_THRESHOLD).then(this.refresh); return res; } @@ -418,8 +423,6 @@ class SystemStore extends EventEmitter { try { dbg.log3('SystemStore: loading ... this.last_update_time =', this.last_update_time, ", since =", since); - const new_data = new SystemStoreData(); - // If we get a load request with an timestamp older then our last update time // we ensure we load everyting from that timestamp by updating our last_update_time. if (!_.isUndefined(since) && since < this.last_update_time) { @@ -427,9 +430,16 @@ class SystemStore extends EventEmitter { this.last_update_time = since; } this.master_key_manager.load_root_key(); + const new_data = new SystemStoreData(); let millistamp = time_utils.millistamp(); await this._register_for_changes(); - await this._read_new_data_from_db(new_data); + if (this.source === SOURCE.DB) { + await this._read_new_data_from_db(new_data); + } else { + this.data = new SystemStoreData(); + await this._read_new_data_from_core(this.data); + } + const secret = await os_utils.read_server_secret(); this._server_secret = secret; if (dbg.should_log(1)) { //param should match below logs' level @@ -439,8 +449,10 @@ class SystemStore extends EventEmitter { depth: 4 })); } - this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data); - this.data = _.cloneDeep(this.old_db_data); + if (this.source === SOURCE.DB) { + this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data); + this.data = _.cloneDeep(this.old_db_data); + } millistamp = time_utils.millistamp(); this.data.rebuild(); dbg.log1('SystemStore: rebuild took', time_utils.millitook(millistamp)); @@ -462,6 +474,12 @@ class SystemStore extends EventEmitter { }); } + //return the latest copy of in-memory data + async recent_db_data() { + //return this.db_clone; + return this._load_serial.surround(async () => this.old_db_data); + } + _update_data_from_new(data, new_data) { COLLECTIONS.forEach(col => { const old_items = data[col.name]; @@ -527,6 +545,28 @@ class SystemStore extends EventEmitter { this.last_update_time = now; } + async _read_new_data_from_core(target) { + dbg.log3("_read_new_data_from_core begins"); + const res = await server_rpc.client.system.get_system_store(); + const ss = JSON.parse(res[RPC_BUFFERS].data.toString()); + dbg.log3("_read_new_data_from_core new system store", ss); + for (const key of Object.keys(ss)) { + const collection = COLLECTIONS_BY_NAME[key]; + if (collection) { + target[key] = []; + _.each(ss[key], item => { + //these two lines will transform string values into appropriately typed objects + //(SensitiveString, ObjectId) according to schema + const after = decode_json(collection.schema, item); + db_client.instance().validate(key, after); + target[key].push(after); + }); + } else { + target[key] = ss[key]; + } + } + } + _check_schema(col, item, warn) { return db_client.instance().validate(col.name, item, warn); } @@ -619,7 +659,7 @@ class SystemStore extends EventEmitter { if (any_news) { if (this.is_standalone) { await this.load(last_update); - } else if (publish) { + } else /*if (publish)*/ { // notify all the cluster (including myself) to reload await server_rpc.client.redirector.publish_to_cluster({ method_api: 'server_inter_process_api', @@ -855,3 +895,4 @@ SystemStore._instance = undefined; // EXPORTS exports.SystemStore = SystemStore; exports.get_instance = SystemStore.get_instance; +exports.SOURCE = SOURCE; diff --git a/src/util/postgres_client.js b/src/util/postgres_client.js index 6225175ea6..16ffa0359a 100644 --- a/src/util/postgres_client.js +++ b/src/util/postgres_client.js @@ -1951,3 +1951,4 @@ PostgresClient._instance = undefined; // EXPORTS exports.PostgresClient = PostgresClient; exports.instance = PostgresClient.instance; +exports.decode_json = decode_json;