Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ config.ROOT_KEY_MOUNT = '/etc/noobaa-server/root_keys';

config.DB_TYPE = /** @type {nb.DBType} */ (process.env.DB_TYPE || 'postgres');

config.POSTGRES_DEFAULT_MAX_CLIENTS = 10;
config.POSTGRES_MD_MAX_CLIENTS = (process.env.LOCAL_MD_SERVER === 'true') ? 70 : 10;
config.POSTGRES_DEFAULT_MAX_CLIENTS = 5;
config.POSTGRES_MD_MAX_CLIENTS = (process.env.LOCAL_MD_SERVER === 'true') ? 70 : 5;
Comment on lines +238 to +239
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you changing this?


///////////////////
// SYSTEM CONFIG //
Expand Down
7 changes: 7 additions & 0 deletions src/api/system_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,13 @@ module.exports = {
auth: {
system: 'admin'
}
},

get_system_store: {
method: 'GET',
auth: {
system: false
}
}
},

Expand Down
2 changes: 1 addition & 1 deletion src/endpoint/endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ async function main(options = {}) {
// Load a system store instance for the current process and register for changes.
// We do not wait for it in becasue the result or errors are not relevent at
// this point (and should not kill the process);
system_store.get_instance().load();
system_store.get_instance({source: system_store.SOURCE.CORE}).load();
// Register the process as an md_server.
await md_server.register_rpc();
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/bg_services/cluster_hb.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ function do_heartbeat({ skip_server_monitor } = {}) {
update: {
clusters: [update]
}
}, false);
}, true);
});
})
.then(() => {
Expand Down
6 changes: 3 additions & 3 deletions src/server/bg_services/md_aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async function run_md_aggregator(md_store, system_store, target_now, delay) {
});
if (changes) {
const update = _.omit(changes, 'more_updates');
await system_store.make_changes({ update });
await system_store.make_changes({ update }, false);
update_range = !changes.more_updates;
if (update_range) {
await system_store.make_changes({
Expand All @@ -78,7 +78,7 @@ async function run_md_aggregator(md_store, system_store, target_now, delay) {
global_last_update: range.till_time,
}]
}
});
}, false);
}
await P.delay(delay);
} else {
Expand Down Expand Up @@ -206,7 +206,7 @@ function find_next_range({
},
}))
}
});
}, false);
}

// on normal operation the time_diff to close can be closed within a single MD_AGGREGATOR_INTERVAL
Expand Down
10 changes: 9 additions & 1 deletion src/server/system_services/system_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const config = require('../../../config');
const { BucketStatsStore } = require('../analytic_services/bucket_stats_store');
const { EndpointStatsStore } = require('../analytic_services/endpoint_stats_store');
const os_utils = require('../../util/os_utils');
const { RpcError } = require('../../rpc');
const { RpcError, RPC_BUFFERS } = require('../../rpc');
const nb_native = require('../../util/nb_native');
const Dispatcher = require('../notifications/dispatcher');
const size_utils = require('../../util/size_utils');
Expand Down Expand Up @@ -298,6 +298,12 @@ function get_system_status(req) {
};
}

async function get_system_store() {
return {
//[RPC_BUFFERS]: Buffer.from(JSON.stringify(await system_store.recent_db_data())),
[RPC_BUFFERS]: {data: Buffer.from(JSON.stringify(await system_store.recent_db_data()))},
};
}

async function _update_system_state(system_id, mode) {
const update = {
Expand Down Expand Up @@ -1595,3 +1601,5 @@ exports.rotate_master_key = rotate_master_key;
exports.disable_master_key = disable_master_key;
exports.enable_master_key = enable_master_key;
exports.upgrade_master_keys = upgrade_master_keys;

exports.get_system_store = get_system_store;
65 changes: 55 additions & 10 deletions src/server/system_services/system_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ const size_utils = require('../../util/size_utils');
const os_utils = require('../../util/os_utils');
const config = require('../../../config');
const db_client = require('../../util/db_client');
const { decode_json } = require('../../util/postgres_client');

const { RpcError } = require('../../rpc');
const { RpcError, RPC_BUFFERS } = require('../../rpc');
const master_key_manager = require('./master_key_manager');

const COLLECTIONS = [{
Expand Down Expand Up @@ -152,6 +153,10 @@ const COLLECTIONS_BY_NAME = _.keyBy(COLLECTIONS, 'name');

const accounts_by_email_lowercase = [];

const SOURCE = Object.freeze({
DB: 'DB',
CORE: 'CORE',
});

/**
*
Expand Down Expand Up @@ -352,6 +357,8 @@ class SystemStore extends EventEmitter {
this.START_REFRESH_THRESHOLD = 10 * 60 * 1000;
this.FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000;
this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5;
this.source = (process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") === -1) ? SOURCE.DB : SOURCE.CORE;
dbg.log0("system store source is", this.source);
this._load_serial = new semaphore.Semaphore(1, { warning_timeout: this.START_REFRESH_THRESHOLD });
for (const col of COLLECTIONS) {
db_client.instance().define_collection(col);
Expand Down Expand Up @@ -394,17 +401,19 @@ class SystemStore extends EventEmitter {
if (this.data) {
load_time = this.data.time;
}
let res;
const since_load = Date.now() - load_time;
if (since_load < this.START_REFRESH_THRESHOLD) {
return this.data;
res = this.data;
} else if (since_load < this.FORCE_REFRESH_THRESHOLD) {
dbg.warn(`system_store.refresh: system_store.data.time > START_REFRESH_THRESHOLD, since_load = ${since_load}, START_REFRESH_THRESHOLD = ${this.START_REFRESH_THRESHOLD}`);
this.load().catch(_.noop);
return this.data;
res = this.data;
} else {
dbg.warn(`system_store.refresh: system_store.data.time > FORCE_REFRESH_THRESHOLD, since_load = ${since_load}, FORCE_REFRESH_THRESHOLD = ${this.FORCE_REFRESH_THRESHOLD}`);
return this.load();
res = this.load();
}
return res;
}

async load(since) {
Expand All @@ -414,18 +423,23 @@ class SystemStore extends EventEmitter {
try {
dbg.log3('SystemStore: loading ... this.last_update_time =', this.last_update_time, ", since =", since);

const new_data = new SystemStoreData();

// If we get a load request with an timestamp older then our last update time
// we ensure we load everyting from that timestamp by updating our last_update_time.
if (!_.isUndefined(since) && since < this.last_update_time) {
dbg.log0('SystemStore.load: Got load request with a timestamp', since, 'older than my last update time', this.last_update_time);
this.last_update_time = since;
}
this.master_key_manager.load_root_key();
const new_data = new SystemStoreData();
let millistamp = time_utils.millistamp();
await this._register_for_changes();
await this._read_new_data_from_db(new_data);
if (this.source === SOURCE.DB) {
await this._read_new_data_from_db(new_data);
} else {
this.data = new SystemStoreData();
await this._read_new_data_from_core(this.data);
}
Comment on lines +436 to +441
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Verify the data initialization logic for CORE source.

The conditional loading logic has a potential issue: for CORE source, this.data is reassigned to a new SystemStoreData() instance, but the original new_data instance is discarded. This could cause inconsistencies.

                if (this.source === SOURCE.DB) {
                    await this._read_new_data_from_db(new_data);
                } else {
-                   this.data = new SystemStoreData();
-                   await this._read_new_data_from_core(this.data);
+                   await this._read_new_data_from_core(new_data);
                }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (this.source === SOURCE.DB) {
await this._read_new_data_from_db(new_data);
} else {
this.data = new SystemStoreData();
await this._read_new_data_from_core(this.data);
}
if (this.source === SOURCE.DB) {
await this._read_new_data_from_db(new_data);
} else {
await this._read_new_data_from_core(new_data);
}
🤖 Prompt for AI Agents
In src/server/system_services/system_store.js around lines 436 to 441, the code
reassigns this.data to a new SystemStoreData() instance when the source is CORE,
discarding the original new_data instance. To fix this, avoid reassigning
this.data and instead use the existing new_data instance for CORE source,
ensuring consistent data handling across sources.


const secret = await os_utils.read_server_secret();
this._server_secret = secret;
if (dbg.should_log(1)) { //param should match below logs' level
Expand All @@ -435,8 +449,10 @@ class SystemStore extends EventEmitter {
depth: 4
}));
}
this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data);
this.data = _.cloneDeep(this.old_db_data);
if (this.source === SOURCE.DB) {
this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data);
this.data = _.cloneDeep(this.old_db_data);
}
millistamp = time_utils.millistamp();
this.data.rebuild();
dbg.log1('SystemStore: rebuild took', time_utils.millitook(millistamp));
Expand All @@ -458,6 +474,12 @@ class SystemStore extends EventEmitter {
});
}

//return the latest copy of in-memory data
async recent_db_data() {
//return this.db_clone;
return this._load_serial.surround(async () => this.old_db_data);
}

_update_data_from_new(data, new_data) {
COLLECTIONS.forEach(col => {
const old_items = data[col.name];
Expand Down Expand Up @@ -523,6 +545,28 @@ class SystemStore extends EventEmitter {
this.last_update_time = now;
}

async _read_new_data_from_core(target) {
dbg.log3("_read_new_data_from_core begins");
const res = await server_rpc.client.system.get_system_store();
const ss = JSON.parse(res[RPC_BUFFERS].data.toString());
dbg.log3("_read_new_data_from_core new system store", ss);
for (const key of Object.keys(ss)) {
const collection = COLLECTIONS_BY_NAME[key];
if (collection) {
target[key] = [];
_.each(ss[key], item => {
//these two lines will transform string values into appropriately typed objects
//(SensitiveString, ObjectId) according to schema
const after = decode_json(collection.schema, item);
db_client.instance().validate(key, after);
target[key].push(after);
});
} else {
target[key] = ss[key];
}
}
}

_check_schema(col, item, warn) {
return db_client.instance().validate(col.name, item, warn);
}
Expand Down Expand Up @@ -615,7 +659,7 @@ class SystemStore extends EventEmitter {
if (any_news) {
if (this.is_standalone) {
await this.load(last_update);
} else if (publish) {
} else /*if (publish)*/ {
// notify all the cluster (including myself) to reload
await server_rpc.client.redirector.publish_to_cluster({
method_api: 'server_inter_process_api',
Expand Down Expand Up @@ -851,3 +895,4 @@ SystemStore._instance = undefined;
// EXPORTS
exports.SystemStore = SystemStore;
exports.get_instance = SystemStore.get_instance;
exports.SOURCE = SOURCE;
1 change: 1 addition & 0 deletions src/util/postgres_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1951,3 +1951,4 @@ PostgresClient._instance = undefined;
// EXPORTS
exports.PostgresClient = PostgresClient;
exports.instance = PostgresClient.instance;
exports.decode_json = decode_json;