Skip to content

Commit 5cc71b2

Browse files
committed
system store - load from core - two steps publish
Signed-off-by: Amit Prinz Setter <[email protected]>
1 parent d25b8d5 commit 5cc71b2

File tree

3 files changed

+33
-4
lines changed

3 files changed

+33
-4
lines changed

src/api/server_inter_process_api.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ module.exports = {
1717
params: {
1818
type: 'object',
1919
properties: {
20-
since: { idate: true }
20+
since: { idate: true },
21+
load_from_core_step: {
22+
type: 'string'
23+
}
2124
}
2225
},
2326
auth: {

src/server/common_services/server_inter_process.js

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,20 @@ const dbg = require('../../util/debug_module')(__filename);
1313
const system_store = require('../system_services/system_store').get_instance();
1414
const server_rpc = require('../server_rpc');
1515

16-
1716
/**
1817
*
1918
*/
2019
async function load_system_store(req) {
20+
//if endpoints load from core, and this load is for core
21+
//(ie, the first load_system_store() out of two),
22+
//then endpoints skip it.
23+
//endpoints will be updated in the next load_system_store()
24+
//once core's in memory system store is updated.
25+
const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") !== -1;
26+
if (is_endpoint && req?.rpc_params?.load_from_core_step === 'core') {
27+
return;
28+
}
29+
2130
await system_store.load(
2231
req && req.rpc_params && req.rpc_params.since
2332
);

src/server/system_services/system_store.js

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,9 @@ class SystemStore extends EventEmitter {
357357
this.START_REFRESH_THRESHOLD = 10 * 60 * 1000;
358358
this.FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000;
359359
this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5;
360-
this.source = config.SYSTEM_STORE_SOURCE.toLocaleLowerCase() === 'core' ? SOURCE.CORE : SOURCE.DB;
360+
//load from core if enabled and this is an endpoint
361+
const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") !== -1;
362+
this.source = (config.SYSTEM_STORE_SOURCE.toLowerCase() === 'core' && is_endpoint) ? SOURCE.CORE : SOURCE.DB;
361363
dbg.log0("system store source is", this.source);
362364
this._load_serial = new semaphore.Semaphore(1, { warning_timeout: this.START_REFRESH_THRESHOLD });
363365
for (const col of COLLECTIONS) {
@@ -666,13 +668,28 @@ class SystemStore extends EventEmitter {
666668
if (this.is_standalone) {
667669
await this.load(last_update);
668670
} else if (publish) {
671+
dbg.log2("first phase publish");
669672
// notify all the cluster (including myself) to reload
670673
await server_rpc.client.redirector.publish_to_cluster({
671674
method_api: 'server_inter_process_api',
672675
method_name: 'load_system_store',
673676
target: '',
674-
request_params: { since: last_update }
677+
request_params: { since: last_update, load_from_core_step: 'core' }
675678
});
679+
680+
//if endpoints are loading system store from core, we need to wait until
681+
//above publish_to_cluster() will update core's in-memory db.
682+
//the next publist_to_cluster() will make endpoints load the updated
683+
//system store from core
684+
if (config.SYSTEM_STORE_SOURCE.toLowerCase() === 'core') {
685+
dbg.log2("second phase publish");
686+
await server_rpc.client.redirector.publish_to_cluster({
687+
method_api: 'server_inter_process_api',
688+
method_name: 'load_system_store',
689+
target: '',
690+
request_params: { since: last_update, load_from_core_step: 'endpoint' }
691+
});
692+
}
676693
}
677694
}
678695
}

0 commit comments

Comments
 (0)