Skip to content

Commit 3828879

Browse files
committed
system store - load from core - two steps publish - PR notes
Signed-off-by: Amit Prinz Setter <[email protected]>
1 parent 4df1a31 commit 3828879

File tree

4 files changed

+19
-19
lines changed

4 files changed

+19
-19
lines changed

config.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ config.INTERNAL_STORAGE_POOL_NAME = 'system-internal-storage-pool';
250250
config.ALLOW_BUCKET_CREATE_ON_INTERNAL = true;
251251
config.BUCKET_AUTOCONF_TIER2_ENABLED = false;
252252
config.SYSTEM_STORE_LOAD_CONCURRENCY = parseInt(process.env.SYSTEM_STORE_LOAD_CONCURRENCY, 10) || 5;
253-
config.SYSTEM_STORE_SOURCE = process.env.SYSTEM_STORE_SOURCE || "db";
253+
config.SYSTEM_STORE_SOURCE = process.env.SYSTEM_STORE_SOURCE || "DB";
254254
//////////////////////////
255255
// MD AGGREGATOR CONFIG //
256256
//////////////////////////

src/api/server_inter_process_api.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ module.exports = {
1919
properties: {
2020
since: { idate: true },
2121
load_from_core_step: {
22-
type: 'string'
22+
type: 'string',
23+
enum: ['CORE', 'ENDPOINT']
2324
}
2425
}
2526
},

src/server/common_services/server_inter_process.js

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,8 @@ const server_rpc = require('../server_rpc');
1717
*
1818
*/
1919
async function load_system_store(req) {
20-
//if endpoints load from core, and this load is for core
21-
//(ie, the first load_system_store() out of two),
22-
//then endpoints skip it.
23-
//endpoints will be updated in the next load_system_store()
24-
//once core's in memory system store is updated.
25-
const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") !== -1;
26-
if (is_endpoint && req?.rpc_params?.load_from_core_step === 'core') {
27-
return;
28-
}
29-
3020
await system_store.load(
31-
req && req.rpc_params && req.rpc_params.since
21+
req && req.rpc_params && req.rpc_params.since, req?.rpc_params?.load_from_core_step.toUpperCase()
3222
);
3323
}
3424

src/server/system_services/system_store.js

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -357,9 +357,9 @@ class SystemStore extends EventEmitter {
357357
this.START_REFRESH_THRESHOLD = 10 * 60 * 1000;
358358
this.FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000;
359359
this.SYSTEM_STORE_LOAD_CONCURRENCY = config.SYSTEM_STORE_LOAD_CONCURRENCY || 5;
360-
//load from core if enabled and this is an endpoint
360+
//load from core is enabled and this is an endpoint
361361
const is_endpoint = process.env.HOSTNAME && process.env.HOSTNAME.indexOf("endpoint") !== -1;
362-
this.source = (config.SYSTEM_STORE_SOURCE.toLowerCase() === 'core' && is_endpoint) ? SOURCE.CORE : SOURCE.DB;
362+
this.source = (config.SYSTEM_STORE_SOURCE.toUpperCase() === 'CORE' && is_endpoint) ? SOURCE.CORE : SOURCE.DB;
363363
dbg.log0("system store source is", this.source);
364364
this._load_serial = new semaphore.Semaphore(1, { warning_timeout: this.START_REFRESH_THRESHOLD });
365365
for (const col of COLLECTIONS) {
@@ -416,7 +416,16 @@ class SystemStore extends EventEmitter {
416416
}
417417
}
418418

419-
async load(since) {
419+
async load(since, load_from_core_step) {
420+
//if endpoints load from core, and this load is for core
421+
//(ie, the first load_system_store() out of two with load_from_core_step === 'CORE'),
422+
//then endpoints skip it.
423+
//endpoints will be updated in the next load_system_store()
424+
//once core's in memory system store is updated.
425+
if (this.source.toUpperCase() === 'CORE' && load_from_core_step && load_from_core_step.toUpperCase() === 'CORE') {
426+
return;
427+
}
428+
420429
// serializing load requests since we have to run a fresh load after the previous one will finish
421430
// because it might not see the latest changes if we don't reload right after make_changes.
422431
return this._load_serial.surround(async () => {
@@ -674,20 +683,20 @@ class SystemStore extends EventEmitter {
674683
method_api: 'server_inter_process_api',
675684
method_name: 'load_system_store',
676685
target: '',
677-
request_params: { since: last_update, load_from_core_step: 'core' }
686+
request_params: { since: last_update, load_from_core_step: 'CORE' }
678687
});
679688

680689
//if endpoints are loading system store from core, we need to wait until
681690
//above publish_to_cluster() will update core's in-memory db.
682691
//the next publist_to_cluster() will make endpoints load the updated
683692
//system store from core
684-
if (config.SYSTEM_STORE_SOURCE.toLowerCase() === 'core') {
693+
if (config.SYSTEM_STORE_SOURCE.toUpperCase() === 'CORE') {
685694
dbg.log2("second phase publish");
686695
await server_rpc.client.redirector.publish_to_cluster({
687696
method_api: 'server_inter_process_api',
688697
method_name: 'load_system_store',
689698
target: '',
690-
request_params: { since: last_update, load_from_core_step: 'endpoint' }
699+
request_params: { since: last_update, load_from_core_step: 'ENDPOINT' }
691700
});
692701
}
693702
}

0 commit comments

Comments
 (0)