@@ -38,8 +38,9 @@ const size_utils = require('../../util/size_utils');
38
38
const os_utils = require ( '../../util/os_utils' ) ;
39
39
const config = require ( '../../../config' ) ;
40
40
const db_client = require ( '../../util/db_client' ) ;
41
+ const { decode_json } = require ( '../../util/postgres_client' ) ;
41
42
42
- const { RpcError } = require ( '../../rpc' ) ;
43
+ const { RpcError, RPC_BUFFERS } = require ( '../../rpc' ) ;
43
44
const master_key_manager = require ( './master_key_manager' ) ;
44
45
45
46
const COLLECTIONS = [ {
@@ -152,6 +153,10 @@ const COLLECTIONS_BY_NAME = _.keyBy(COLLECTIONS, 'name');
152
153
153
154
const accounts_by_email_lowercase = [ ] ;
154
155
156
+ const SOURCE = Object . freeze ( {
157
+ DB : 'DB' ,
158
+ CORE : 'CORE' ,
159
+ } ) ;
155
160
156
161
/**
157
162
*
@@ -352,6 +357,8 @@ class SystemStore extends EventEmitter {
352
357
this . START_REFRESH_THRESHOLD = 10 * 60 * 1000 ;
353
358
this . FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000 ;
354
359
this . SYSTEM_STORE_LOAD_CONCURRENCY = config . SYSTEM_STORE_LOAD_CONCURRENCY || 5 ;
360
+ this . source = ( process . env . HOSTNAME && process . env . HOSTNAME . indexOf ( "endpoint" ) === - 1 ) ? SOURCE . DB : SOURCE . CORE ;
361
+ dbg . log0 ( "system store source is" , this . source ) ;
355
362
this . _load_serial = new semaphore . Semaphore ( 1 , { warning_timeout : this . START_REFRESH_THRESHOLD } ) ;
356
363
for ( const col of COLLECTIONS ) {
357
364
db_client . instance ( ) . define_collection ( col ) ;
@@ -414,18 +421,23 @@ class SystemStore extends EventEmitter {
414
421
try {
415
422
dbg . log3 ( 'SystemStore: loading ... this.last_update_time =' , this . last_update_time , ", since =" , since ) ;
416
423
417
- const new_data = new SystemStoreData ( ) ;
418
-
419
424
// If we get a load request with an timestamp older then our last update time
420
425
// we ensure we load everyting from that timestamp by updating our last_update_time.
421
426
if ( ! _ . isUndefined ( since ) && since < this . last_update_time ) {
422
427
dbg . log0 ( 'SystemStore.load: Got load request with a timestamp' , since , 'older than my last update time' , this . last_update_time ) ;
423
428
this . last_update_time = since ;
424
429
}
425
430
this . master_key_manager . load_root_key ( ) ;
431
+ const new_data = new SystemStoreData ( ) ;
426
432
let millistamp = time_utils . millistamp ( ) ;
427
433
await this . _register_for_changes ( ) ;
428
- await this . _read_new_data_from_db ( new_data ) ;
434
+ if ( this . source === SOURCE . DB ) {
435
+ await this . _read_new_data_from_db ( new_data ) ;
436
+ } else {
437
+ this . data = new SystemStoreData ( ) ;
438
+ await this . _read_new_data_from_core ( this . data ) ;
439
+ }
440
+
429
441
const secret = await os_utils . read_server_secret ( ) ;
430
442
this . _server_secret = secret ;
431
443
if ( dbg . should_log ( 1 ) ) { //param should match below logs' level
@@ -435,8 +447,10 @@ class SystemStore extends EventEmitter {
435
447
depth : 4
436
448
} ) ) ;
437
449
}
438
- this . old_db_data = this . _update_data_from_new ( this . old_db_data || { } , new_data ) ;
439
- this . data = _ . cloneDeep ( this . old_db_data ) ;
450
+ if ( this . source === SOURCE . DB ) {
451
+ this . old_db_data = this . _update_data_from_new ( this . old_db_data || { } , new_data ) ;
452
+ this . data = _ . cloneDeep ( this . old_db_data ) ;
453
+ }
440
454
millistamp = time_utils . millistamp ( ) ;
441
455
this . data . rebuild ( ) ;
442
456
dbg . log1 ( 'SystemStore: rebuild took' , time_utils . millitook ( millistamp ) ) ;
@@ -458,6 +472,11 @@ class SystemStore extends EventEmitter {
458
472
} ) ;
459
473
}
460
474
475
+ //return the latest copy of in-memory data
476
+ async recent_db_data ( ) {
477
+ return this . _load_serial . surround ( async ( ) => this . old_db_data ) ;
478
+ }
479
+
461
480
_update_data_from_new ( data , new_data ) {
462
481
COLLECTIONS . forEach ( col => {
463
482
const old_items = data [ col . name ] ;
@@ -523,6 +542,28 @@ class SystemStore extends EventEmitter {
523
542
this . last_update_time = now ;
524
543
}
525
544
545
+ async _read_new_data_from_core ( target ) {
546
+ dbg . log3 ( "_read_new_data_from_core begins" ) ;
547
+ const res = await server_rpc . client . system . get_system_store ( ) ;
548
+ const ss = JSON . parse ( res [ RPC_BUFFERS ] . data . toString ( ) ) ;
549
+ dbg . log3 ( "_read_new_data_from_core new system store" , ss ) ;
550
+ for ( const key of Object . keys ( ss ) ) {
551
+ const collection = COLLECTIONS_BY_NAME [ key ] ;
552
+ if ( collection ) {
553
+ target [ key ] = [ ] ;
554
+ _ . each ( ss [ key ] , item => {
555
+ //these two lines will transform string values into appropriately typed objects
556
+ //(SensitiveString, ObjectId) according to schema
557
+ const after = decode_json ( collection . schema , item ) ;
558
+ db_client . instance ( ) . validate ( key , after ) ;
559
+ target [ key ] . push ( after ) ;
560
+ } ) ;
561
+ } else {
562
+ target [ key ] = ss [ key ] ;
563
+ }
564
+ }
565
+ }
566
+
526
567
_check_schema ( col , item , warn ) {
527
568
return db_client . instance ( ) . validate ( col . name , item , warn ) ;
528
569
}
@@ -851,3 +892,4 @@ SystemStore._instance = undefined;
851
892
// EXPORTS
852
893
exports . SystemStore = SystemStore ;
853
894
exports . get_instance = SystemStore . get_instance ;
895
+ exports . SOURCE = SOURCE ;
0 commit comments