Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion config.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ config.INTERNAL_STORAGE_POOL_NAME = 'system-internal-storage-pool';
config.ALLOW_BUCKET_CREATE_ON_INTERNAL = true;
config.BUCKET_AUTOCONF_TIER2_ENABLED = false;
config.SYSTEM_STORE_LOAD_CONCURRENCY = parseInt(process.env.SYSTEM_STORE_LOAD_CONCURRENCY, 10) || 5;

// SYSTEM_STORE_SOURCE determines the preffered source for loading system_store data
// This can be either "DB" to load from the DB or "CORE" to load from the system_server in noobaa-core
config.SYSTEM_STORE_SOURCE = process.env.SYSTEM_STORE_SOURCE?.toUpperCase() || "DB";
//////////////////////////
// MD AGGREGATOR CONFIG //
//////////////////////////
Expand Down
6 changes: 5 additions & 1 deletion src/api/server_inter_process_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ module.exports = {
params: {
type: 'object',
properties: {
since: { idate: true }
since: { idate: true },
load_source: {
type: 'string',
enum: ['DB', 'CORE']
}
}
},
auth: {
Expand Down
13 changes: 13 additions & 0 deletions src/api/system_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,19 @@ module.exports = {
auth: {
system: 'admin'
}
},

get_system_store: {
method: 'GET',
reply: {
type: 'object',
properties: {
// [RPC_BUFFERS].data
},
},
auth: {
system: false
}
}
},

Expand Down
4 changes: 2 additions & 2 deletions src/server/common_services/server_inter_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ const dbg = require('../../util/debug_module')(__filename);
const system_store = require('../system_services/system_store').get_instance();
const server_rpc = require('../server_rpc');


/**
*
*/
async function load_system_store(req) {
await system_store.load(
req && req.rpc_params && req.rpc_params.since
req?.rpc_params?.since,
req?.rpc_params?.load_source.toUpperCase()
);
}

Expand Down
13 changes: 12 additions & 1 deletion src/server/system_services/system_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const config = require('../../../config');
const { BucketStatsStore } = require('../analytic_services/bucket_stats_store');
const { EndpointStatsStore } = require('../analytic_services/endpoint_stats_store');
const os_utils = require('../../util/os_utils');
const { RpcError } = require('../../rpc');
const { RpcError, RPC_BUFFERS } = require('../../rpc');
const nb_native = require('../../util/nb_native');
const Dispatcher = require('../notifications/dispatcher');
const size_utils = require('../../util/size_utils');
Expand Down Expand Up @@ -298,6 +298,15 @@ function get_system_status(req) {
};
}

async function get_system_store() {
try {
return {
[RPC_BUFFERS]: {data: Buffer.from(JSON.stringify(await system_store.recent_db_data()))},
};
} catch (e) {
dbg.error("Failed getting system store", e);
}
}

async function _update_system_state(system_id, mode) {
const update = {
Expand Down Expand Up @@ -1595,3 +1604,5 @@ exports.rotate_master_key = rotate_master_key;
exports.disable_master_key = disable_master_key;
exports.enable_master_key = enable_master_key;
exports.upgrade_master_keys = upgrade_master_keys;

exports.get_system_store = get_system_store;
101 changes: 90 additions & 11 deletions src/server/system_services/system_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ const size_utils = require('../../util/size_utils');
const os_utils = require('../../util/os_utils');
const config = require('../../../config');
const db_client = require('../../util/db_client');
const { decode_json } = require('../../util/postgres_client');

const { RpcError } = require('../../rpc');
const { RpcError, RPC_BUFFERS } = require('../../rpc');
const master_key_manager = require('./master_key_manager');

const COLLECTIONS = [{
Expand Down Expand Up @@ -152,6 +153,10 @@ const COLLECTIONS_BY_NAME = _.keyBy(COLLECTIONS, 'name');

const accounts_by_email_lowercase = [];

const SOURCE = Object.freeze({
DB: 'DB',
CORE: 'CORE',
});

/**
*
Expand Down Expand Up @@ -352,9 +357,14 @@ class SystemStore extends EventEmitter {
this.START_REFRESH_THRESHOLD = 10 * 60 * 1000;
this.FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000;
this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5;
this.source = options.source || config.SYSTEM_STORE_SOURCE;
this.source = this.source.toUpperCase();
dbg.log0("system store source is", this.source);
this._load_serial = new semaphore.Semaphore(1, { warning_timeout: this.START_REFRESH_THRESHOLD });
for (const col of COLLECTIONS) {
db_client.instance().define_collection(col);
if (options.skip_define_for_tests !== true) {
for (const col of COLLECTIONS) {
db_client.instance().define_collection(col);
}
}
js_utils.deep_freeze(COLLECTIONS);
js_utils.deep_freeze(COLLECTIONS_BY_NAME);
Expand Down Expand Up @@ -407,14 +417,21 @@ class SystemStore extends EventEmitter {
}
}

async load(since) {
async load(since, load_source) {
//if endpoints load from core, and this load is for core
//(ie, the first load_system_store() out of two with load_source === 'CORE'),
//then endpoints skip it.
//endpoints will be updated in the next load_system_store()
//once core's in memory system store is updated.
if (load_source && (this.source !== load_source)) {
return;
}

// serializing load requests since we have to run a fresh load after the previous one will finish
// because it might not see the latest changes if we don't reload right after make_changes.
return this._load_serial.surround(async () => {
try {
dbg.log3('SystemStore: loading ... this.last_update_time =', this.last_update_time, ", since =", since);

const new_data = new SystemStoreData();
dbg.log3('SystemStore: loading ... this.last_update_time =', this.last_update_time, ", since =", since, "load_source =", load_source);

// If we get a load request with an timestamp older then our last update time
// we ensure we load everyting from that timestamp by updating our last_update_time.
Expand All @@ -423,9 +440,25 @@ class SystemStore extends EventEmitter {
this.last_update_time = since;
}
this.master_key_manager.load_root_key();
const new_data = new SystemStoreData();
let millistamp = time_utils.millistamp();
await this._register_for_changes();
await this._read_new_data_from_db(new_data);
let from_core_failure = false;

if (this.source === SOURCE.CORE) {
try {
this.data = new SystemStoreData();
await this._read_new_data_from_core(this.data);
} catch (e) {
dbg.error("Failed to load system store from core. Will load from db.", e);
from_core_failure = true;
}
}

if (this.source === SOURCE.DB || from_core_failure) {
await this._read_new_data_from_db(new_data);
}

const secret = await os_utils.read_server_secret();
this._server_secret = secret;
if (dbg.should_log(1)) { //param should match below logs' level
Expand All @@ -435,8 +468,10 @@ class SystemStore extends EventEmitter {
depth: 4
}));
}
this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data);
this.data = _.cloneDeep(this.old_db_data);
if (this.source === SOURCE.DB || from_core_failure) {
this.old_db_data = this._update_data_from_new(this.old_db_data || {}, new_data);
this.data = _.cloneDeep(this.old_db_data);
}
millistamp = time_utils.millistamp();
this.data.rebuild();
dbg.log1('SystemStore: rebuild took', time_utils.millitook(millistamp));
Expand All @@ -458,6 +493,14 @@ class SystemStore extends EventEmitter {
});
}

//return the latest copy of in-memory data
async recent_db_data() {
if (this.source === SOURCE.CORE) {
throw new RpcError('BAD_REQUEST', 'recent_db_data is not available for CORE source');
}
return this._load_serial.surround(async () => this.old_db_data);
}

_update_data_from_new(data, new_data) {
COLLECTIONS.forEach(col => {
const old_items = data[col.name];
Expand Down Expand Up @@ -523,6 +566,28 @@ class SystemStore extends EventEmitter {
this.last_update_time = now;
}

async _read_new_data_from_core(target) {
dbg.log3("_read_new_data_from_core begins");
const res = await server_rpc.client.system.get_system_store();
const ss = JSON.parse(res[RPC_BUFFERS].data.toString());
dbg.log3("_read_new_data_from_core new system store", ss);
for (const key of Object.keys(ss)) {
const collection = COLLECTIONS_BY_NAME[key];
if (collection) {
target[key] = [];
_.each(ss[key], item => {
//these two lines will transform string values into appropriately typed objects
//(SensitiveString, ObjectId) according to schema
const after = decode_json(collection.schema, item);
db_client.instance().validate(key, after);
target[key].push(after);
});
} else {
target[key] = ss[key];
}
}
}
Comment on lines +569 to +589
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

❓ Verification inconclusive

Monitor RPC data transfer and implement operational safeguards

Since endpoints will now fetch system store data via RPC from core, consider implementing:

  1. Metrics to monitor the frequency and size of these RPC calls
  2. Rate limiting to prevent endpoints from overwhelming the core
  3. Circuit breaker pattern for RPC failures
  4. Alerts for unusually large data transfers

Let me verify if there are any existing rate limiting or monitoring mechanisms:


🏁 Script executed:

#!/bin/bash
# Search for rate limiting or monitoring in RPC client code
rg -A 5 "rate.*limit|throttle|circuit.*breaker" --glob "src/rpc/**/*.js"

# Check if there are any metrics collection for RPC calls
rg -A 5 "metrics|monitor|stats.*rpc" --glob "src/rpc/**/*.js"

Length of output: 138


Implement RPC Monitoring, Rate Limiting, and Resilience Patterns

No existing rate-limit, metrics, or circuit-breaker logic was found around your RPC client (verification via rg returned no matches), so let’s add operational safeguards:

• In src/server/system_services/system_store.js (around the _read_new_data_from_core call at lines 545–565):
– Instrument a counter and histogram (e.g. Prometheus) for calls to server_rpc.client.system.get_system_store() (call count, payload size, latency).
– Emit alerts if payload size or latency exceeds thresholds.

• In your RPC client layer (e.g. src/rpc/client/*.js):
– Implement rate limiting or token bucket to cap calls per second.
– Wrap the get_system_store invocation in a circuit-breaker (e.g. with a library like opossum) to fail fast on repeated errors.

• Add configuration knobs for thresholds and integrate with your observability stack (alerts, dashboards).

These changes will help you detect abnormal patterns, protect the core from overload, and degrade gracefully on failures.

🤖 Prompt for AI Agents
In src/server/system_services/system_store.js around lines 545 to 565, add
Prometheus metrics instrumentation for the call to
server_rpc.client.system.get_system_store(), including a counter for call count,
a histogram for payload size and latency, and emit alerts if thresholds are
exceeded. Additionally, in the RPC client layer (e.g., src/rpc/client/*.js),
implement rate limiting (such as a token bucket) to cap calls per second and
wrap the get_system_store call in a circuit breaker (using a library like
opossum) to fail fast on repeated errors. Add configuration options for these
thresholds and integrate the metrics and alerts with the observability stack for
monitoring and graceful degradation.

⚠️ Potential issue

Add comprehensive error handling and improve validation

The method needs error handling for RPC calls and JSON parsing. Also, validation should happen before data transformation.

 async _read_new_data_from_core(target) {
     dbg.log3("_read_new_data_from_core begins");
+    try {
         const res = await server_rpc.client.system.get_system_store();
+        if (!res || !res[RPC_BUFFERS] || !res[RPC_BUFFERS].data) {
+            throw new Error('Invalid response from core system store');
+        }
         const ss = JSON.parse(res[RPC_BUFFERS].data.toString());
         dbg.log3("_read_new_data_from_core new system store", ss);
         for (const key of Object.keys(ss)) {
             const collection = COLLECTIONS_BY_NAME[key];
             if (collection) {
                 target[key] = [];
                 _.each(ss[key], item => {
+                    // Validate before transformation
+                    db_client.instance().validate(key, item, 'warn');
                     //these two lines will transform string values into appropriately typed objects
                     //(SensitiveString, ObjectId) according to schema
                     const after = decode_json(collection.schema, item);
-                    db_client.instance().validate(key, after);
+                    // Validate again after transformation to ensure correctness
+                    db_client.instance().validate(key, after, 'error');
                     target[key].push(after);
                 });
             } else {
                 target[key] = ss[key];
             }
         }
+    } catch (err) {
+        dbg.error('_read_new_data_from_core: Failed to read data from core', err);
+        throw new RpcError('INTERNAL_ERROR', `Failed to load system store from core: ${err.message}`);
+    }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async _read_new_data_from_core(target) {
dbg.log3("_read_new_data_from_core begins");
const res = await server_rpc.client.system.get_system_store();
const ss = JSON.parse(res[RPC_BUFFERS].data.toString());
dbg.log3("_read_new_data_from_core new system store", ss);
for (const key of Object.keys(ss)) {
const collection = COLLECTIONS_BY_NAME[key];
if (collection) {
target[key] = [];
_.each(ss[key], item => {
//these two lines will transform string values into appropriately typed objects
//(SensitiveString, ObjectId) according to schema
const after = decode_json(collection.schema, item);
db_client.instance().validate(key, after);
target[key].push(after);
});
} else {
target[key] = ss[key];
}
}
}
async _read_new_data_from_core(target) {
dbg.log3("_read_new_data_from_core begins");
try {
const res = await server_rpc.client.system.get_system_store();
if (!res || !res[RPC_BUFFERS] || !res[RPC_BUFFERS].data) {
throw new Error('Invalid response from core system store');
}
const ss = JSON.parse(res[RPC_BUFFERS].data.toString());
dbg.log3("_read_new_data_from_core new system store", ss);
for (const key of Object.keys(ss)) {
const collection = COLLECTIONS_BY_NAME[key];
if (collection) {
target[key] = [];
_.each(ss[key], item => {
// Validate before transformation
db_client.instance().validate(key, item, 'warn');
// these two lines will transform string values into appropriately typed objects
// (SensitiveString, ObjectId) according to schema
const after = decode_json(collection.schema, item);
// Validate again after transformation to ensure correctness
db_client.instance().validate(key, after, 'error');
target[key].push(after);
});
} else {
target[key] = ss[key];
}
}
} catch (err) {
dbg.error('_read_new_data_from_core: Failed to read data from core', err);
throw new RpcError('INTERNAL_ERROR', `Failed to load system store from core: ${err.message}`);
}
}
🤖 Prompt for AI Agents
In src/server/system_services/system_store.js around lines 545 to 565, add
try-catch blocks to handle errors from the RPC call and JSON parsing to prevent
crashes. Move the validation step to occur before the decode_json transformation
to ensure data integrity early. Log or handle any errors appropriately within
the catch block to maintain robustness.


_check_schema(col, item, warn) {
return db_client.instance().validate(col.name, item, warn);
}
Expand Down Expand Up @@ -616,12 +681,25 @@ class SystemStore extends EventEmitter {
if (this.is_standalone) {
await this.load(last_update);
} else if (publish) {
dbg.log2("first phase publish");
// notify all the cluster (including myself) to reload
await server_rpc.client.redirector.publish_to_cluster({
method_api: 'server_inter_process_api',
method_name: 'load_system_store',
target: '',
request_params: { since: last_update }
request_params: { since: last_update, load_source: SOURCE.DB }
});

//if endpoints are loading system store from core, we need to wait until
//above publish_to_cluster() will update core's in-memory db.
//the next publist_to_cluster() will make endpoints load the updated
//system store from core
dbg.log2("second phase publish");
await server_rpc.client.redirector.publish_to_cluster({
method_api: 'server_inter_process_api',
method_name: 'load_system_store',
target: '',
request_params: { since: last_update, load_source: SOURCE.CORE }
});
}
}
Expand Down Expand Up @@ -851,3 +929,4 @@ SystemStore._instance = undefined;
// EXPORTS
exports.SystemStore = SystemStore;
exports.get_instance = SystemStore.get_instance;
exports.SOURCE = SOURCE;
15 changes: 15 additions & 0 deletions src/test/integration_tests/db/test_system_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const mocha = require('mocha');
const assert = require('assert');
const coretest = require('../../utils/coretest/coretest');
const db_client = require('../../../util/db_client');
const { SystemStore, SOURCE } = require('../../../server/system_services/system_store');

// setup coretest first to prepare the env
coretest.setup();
Expand Down Expand Up @@ -138,4 +139,18 @@ mocha.describe('system_store', function() {
assert.strictEqual(data2.systems[0].name, 'new_name');
});

mocha.it("Load from core", async function() {

const system_store_from_core = new SystemStore({
source: SOURCE.CORE,
skip_define_for_tests: true
});

const from_db = await system_store.load();
const from_core = await system_store_from_core.load(undefined, SOURCE.CORE);

assert.deepStrictEqual(from_db.data, from_core.data);

});

});
1 change: 1 addition & 0 deletions src/util/postgres_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1951,3 +1951,4 @@ PostgresClient._instance = undefined;
// EXPORTS
exports.PostgresClient = PostgresClient;
exports.instance = PostgresClient.instance;
exports.decode_json = decode_json;