@@ -38,8 +38,9 @@ const size_utils = require('../../util/size_utils');
38
38
const os_utils = require ( '../../util/os_utils' ) ;
39
39
const config = require ( '../../../config' ) ;
40
40
const db_client = require ( '../../util/db_client' ) ;
41
+ const { decode_json } = require ( '../../util/postgres_client' ) ;
41
42
42
- const { RpcError } = require ( '../../rpc' ) ;
43
+ const { RpcError, RPC_BUFFERS } = require ( '../../rpc' ) ;
43
44
const master_key_manager = require ( './master_key_manager' ) ;
44
45
45
46
const COLLECTIONS = [ {
@@ -152,6 +153,10 @@ const COLLECTIONS_BY_NAME = _.keyBy(COLLECTIONS, 'name');
152
153
153
154
const accounts_by_email_lowercase = [ ] ;
154
155
156
+ const SOURCE = Object . freeze ( {
157
+ DB : 'DB' ,
158
+ CORE : 'CORE' ,
159
+ } ) ;
155
160
156
161
/**
157
162
*
@@ -352,9 +357,14 @@ class SystemStore extends EventEmitter {
352
357
this . START_REFRESH_THRESHOLD = 10 * 60 * 1000 ;
353
358
this . FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000 ;
354
359
this . SYSTEM_STORE_LOAD_CONCURRENCY = config . SYSTEM_STORE_LOAD_CONCURRENCY || 5 ;
360
+ this . source = options . source || config . SYSTEM_STORE_SOURCE ;
361
+ this . source = this . source . toUpperCase ( ) ;
362
+ dbg . log0 ( "system store source is" , this . source ) ;
355
363
this . _load_serial = new semaphore . Semaphore ( 1 , { warning_timeout : this . START_REFRESH_THRESHOLD } ) ;
356
- for ( const col of COLLECTIONS ) {
357
- db_client . instance ( ) . define_collection ( col ) ;
364
+ if ( options . skip_define_for_tests !== true ) {
365
+ for ( const col of COLLECTIONS ) {
366
+ db_client . instance ( ) . define_collection ( col ) ;
367
+ }
358
368
}
359
369
js_utils . deep_freeze ( COLLECTIONS ) ;
360
370
js_utils . deep_freeze ( COLLECTIONS_BY_NAME ) ;
@@ -407,14 +417,21 @@ class SystemStore extends EventEmitter {
407
417
}
408
418
}
409
419
410
- async load ( since ) {
420
+ async load ( since , load_source ) {
421
+ //if endpoints load from core, and this load is for core
422
+ //(ie, the first load_system_store() out of two with load_source === 'CORE'),
423
+ //then endpoints skip it.
424
+ //endpoints will be updated in the next load_system_store()
425
+ //once core's in memory system store is updated.
426
+ if ( load_source && ( this . source !== load_source ) ) {
427
+ return ;
428
+ }
429
+
411
430
// serializing load requests since we have to run a fresh load after the previous one will finish
412
431
// because it might not see the latest changes if we don't reload right after make_changes.
413
432
return this . _load_serial . surround ( async ( ) => {
414
433
try {
415
- dbg . log3 ( 'SystemStore: loading ... this.last_update_time =' , this . last_update_time , ", since =" , since ) ;
416
-
417
- const new_data = new SystemStoreData ( ) ;
434
+ dbg . log3 ( 'SystemStore: loading ... this.last_update_time =' , this . last_update_time , ", since =" , since , "load_source =" , load_source ) ;
418
435
419
436
// If we get a load request with an timestamp older then our last update time
420
437
// we ensure we load everyting from that timestamp by updating our last_update_time.
@@ -423,9 +440,25 @@ class SystemStore extends EventEmitter {
423
440
this . last_update_time = since ;
424
441
}
425
442
this . master_key_manager . load_root_key ( ) ;
443
+ const new_data = new SystemStoreData ( ) ;
426
444
let millistamp = time_utils . millistamp ( ) ;
427
445
await this . _register_for_changes ( ) ;
428
- await this . _read_new_data_from_db ( new_data ) ;
446
+ let from_core_failure = false ;
447
+
448
+ if ( this . source === SOURCE . CORE ) {
449
+ try {
450
+ this . data = new SystemStoreData ( ) ;
451
+ await this . _read_new_data_from_core ( this . data ) ;
452
+ } catch ( e ) {
453
+ dbg . error ( "Failed to load system store from core. Will load from db." , e ) ;
454
+ from_core_failure = true ;
455
+ }
456
+ }
457
+
458
+ if ( this . source === SOURCE . DB || from_core_failure ) {
459
+ await this . _read_new_data_from_db ( new_data ) ;
460
+ }
461
+
429
462
const secret = await os_utils . read_server_secret ( ) ;
430
463
this . _server_secret = secret ;
431
464
if ( dbg . should_log ( 1 ) ) { //param should match below logs' level
@@ -435,8 +468,10 @@ class SystemStore extends EventEmitter {
435
468
depth : 4
436
469
} ) ) ;
437
470
}
438
- this . old_db_data = this . _update_data_from_new ( this . old_db_data || { } , new_data ) ;
439
- this . data = _ . cloneDeep ( this . old_db_data ) ;
471
+ if ( this . source === SOURCE . DB || from_core_failure ) {
472
+ this . old_db_data = this . _update_data_from_new ( this . old_db_data || { } , new_data ) ;
473
+ this . data = _ . cloneDeep ( this . old_db_data ) ;
474
+ }
440
475
millistamp = time_utils . millistamp ( ) ;
441
476
this . data . rebuild ( ) ;
442
477
dbg . log1 ( 'SystemStore: rebuild took' , time_utils . millitook ( millistamp ) ) ;
@@ -458,6 +493,14 @@ class SystemStore extends EventEmitter {
458
493
} ) ;
459
494
}
460
495
496
+ //return the latest copy of in-memory data
497
+ async recent_db_data ( ) {
498
+ if ( this . source === SOURCE . CORE ) {
499
+ throw new RpcError ( 'BAD_REQUEST' , 'recent_db_data is not available for CORE source' ) ;
500
+ }
501
+ return this . _load_serial . surround ( async ( ) => this . old_db_data ) ;
502
+ }
503
+
461
504
_update_data_from_new ( data , new_data ) {
462
505
COLLECTIONS . forEach ( col => {
463
506
const old_items = data [ col . name ] ;
@@ -523,6 +566,28 @@ class SystemStore extends EventEmitter {
523
566
this . last_update_time = now ;
524
567
}
525
568
569
+ async _read_new_data_from_core ( target ) {
570
+ dbg . log3 ( "_read_new_data_from_core begins" ) ;
571
+ const res = await server_rpc . client . system . get_system_store ( ) ;
572
+ const ss = JSON . parse ( res [ RPC_BUFFERS ] . data . toString ( ) ) ;
573
+ dbg . log3 ( "_read_new_data_from_core new system store" , ss ) ;
574
+ for ( const key of Object . keys ( ss ) ) {
575
+ const collection = COLLECTIONS_BY_NAME [ key ] ;
576
+ if ( collection ) {
577
+ target [ key ] = [ ] ;
578
+ _ . each ( ss [ key ] , item => {
579
+ //these two lines will transform string values into appropriately typed objects
580
+ //(SensitiveString, ObjectId) according to schema
581
+ const after = decode_json ( collection . schema , item ) ;
582
+ db_client . instance ( ) . validate ( key , after ) ;
583
+ target [ key ] . push ( after ) ;
584
+ } ) ;
585
+ } else {
586
+ target [ key ] = ss [ key ] ;
587
+ }
588
+ }
589
+ }
590
+
526
591
_check_schema ( col , item , warn ) {
527
592
return db_client . instance ( ) . validate ( col . name , item , warn ) ;
528
593
}
@@ -616,12 +681,25 @@ class SystemStore extends EventEmitter {
616
681
if ( this . is_standalone ) {
617
682
await this . load ( last_update ) ;
618
683
} else if ( publish ) {
684
+ dbg . log2 ( "first phase publish" ) ;
619
685
// notify all the cluster (including myself) to reload
620
686
await server_rpc . client . redirector . publish_to_cluster ( {
621
687
method_api : 'server_inter_process_api' ,
622
688
method_name : 'load_system_store' ,
623
689
target : '' ,
624
- request_params : { since : last_update }
690
+ request_params : { since : last_update , load_source : SOURCE . DB }
691
+ } ) ;
692
+
693
+ //if endpoints are loading system store from core, we need to wait until
694
+ //above publish_to_cluster() will update core's in-memory db.
695
+ //the next publist_to_cluster() will make endpoints load the updated
696
+ //system store from core
697
+ dbg . log2 ( "second phase publish" ) ;
698
+ await server_rpc . client . redirector . publish_to_cluster ( {
699
+ method_api : 'server_inter_process_api' ,
700
+ method_name : 'load_system_store' ,
701
+ target : '' ,
702
+ request_params : { since : last_update , load_source : SOURCE . CORE }
625
703
} ) ;
626
704
}
627
705
}
@@ -851,3 +929,4 @@ SystemStore._instance = undefined;
851
929
// EXPORTS
852
930
exports . SystemStore = SystemStore ;
853
931
exports . get_instance = SystemStore . get_instance ;
932
+ exports . SOURCE = SOURCE ;
0 commit comments