@@ -341,10 +341,6 @@ class SystemStore extends EventEmitter {
341
341
*/
342
342
static get_instance ( options = { } ) {
343
343
const { standalone } = options ;
344
- //load from core if enabled and this is an endpoint
345
- const is_endpoint = process . env . HOSTNAME && process . env . HOSTNAME . indexOf ( "endpoint" ) !== - 1 ;
346
- this . source = options . source ||
347
- ( ( config . SYSTEM_STORE_SOURCE . toUpperCase ( ) === 'CORE' && is_endpoint ) ? SOURCE . CORE : SOURCE . DB ) ;
348
344
SystemStore . _instance = SystemStore . _instance || new SystemStore ( { standalone } ) ;
349
345
return SystemStore . _instance ;
350
346
}
@@ -361,18 +357,13 @@ class SystemStore extends EventEmitter {
361
357
this . START_REFRESH_THRESHOLD = 10 * 60 * 1000 ;
362
358
this . FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000 ;
363
359
this . SYSTEM_STORE_LOAD_CONCURRENCY = config . SYSTEM_STORE_LOAD_CONCURRENCY || 5 ;
364
- this . source = options . source || SOURCE . DB ;
360
+ this . source = options . source || config . SYSTEM_STORE_SOURCE ;
361
+ this . source = this . source . toUpperCase ( ) ;
365
362
dbg . log0 ( "system store source is" , this . source ) ;
366
363
this . _load_serial = new semaphore . Semaphore ( 1 , { warning_timeout : this . START_REFRESH_THRESHOLD } ) ;
367
- for ( const col of COLLECTIONS ) {
368
- try {
364
+ if ( options . skip_define_for_tests !== true ) {
365
+ for ( const col of COLLECTIONS ) {
369
366
db_client . instance ( ) . define_collection ( col ) ;
370
- } catch ( e ) {
371
- if ( e . message ?. indexOf ( "already defined" ) > - 1 ) {
372
- dbg . warn ( "Ignoring already defined error" ) ;
373
- } else {
374
- throw e ;
375
- }
376
367
}
377
368
}
378
369
js_utils . deep_freeze ( COLLECTIONS ) ;
@@ -432,7 +423,7 @@ class SystemStore extends EventEmitter {
432
423
//then endpoints skip it.
433
424
//endpoints will be updated in the next load_system_store()
434
425
//once core's in memory system store is updated.
435
- if ( this . source . toUpperCase ( ) === 'CORE' && load_from_core_step && load_from_core_step . toUpperCase ( ) === 'CORE' ) {
426
+ if ( load_from_core_step && ( this . source !== load_from_core_step ) ) {
436
427
return ;
437
428
}
438
429
@@ -504,6 +495,9 @@ class SystemStore extends EventEmitter {
504
495
505
496
//return the latest copy of in-memory data
506
497
async recent_db_data ( ) {
498
+ if ( this . source === SOURCE . CORE ) {
499
+ throw new RpcError ( 'BAD_REQUEST' , 'recent_db_data is not available for CORE source' ) ;
500
+ }
507
501
return this . _load_serial . surround ( async ( ) => this . old_db_data ) ;
508
502
}
509
503
@@ -693,22 +687,20 @@ class SystemStore extends EventEmitter {
693
687
method_api : 'server_inter_process_api' ,
694
688
method_name : 'load_system_store' ,
695
689
target : '' ,
696
- request_params : { since : last_update , load_from_core_step : 'CORE' }
690
+ request_params : { since : last_update , load_from_core_step : SOURCE . DB }
697
691
} ) ;
698
692
699
693
//if endpoints are loading system store from core, we need to wait until
700
694
//above publish_to_cluster() will update core's in-memory db.
701
695
//the next publist_to_cluster() will make endpoints load the updated
702
696
//system store from core
703
- if ( config . SYSTEM_STORE_SOURCE . toUpperCase ( ) === 'CORE' ) {
704
- dbg . log2 ( "second phase publish" ) ;
705
- await server_rpc . client . redirector . publish_to_cluster ( {
706
- method_api : 'server_inter_process_api' ,
707
- method_name : 'load_system_store' ,
708
- target : '' ,
709
- request_params : { since : last_update , load_from_core_step : 'ENDPOINT' }
710
- } ) ;
711
- }
697
+ dbg . log2 ( "second phase publish" ) ;
698
+ await server_rpc . client . redirector . publish_to_cluster ( {
699
+ method_api : 'server_inter_process_api' ,
700
+ method_name : 'load_system_store' ,
701
+ target : '' ,
702
+ request_params : { since : last_update , load_from_core_step : SOURCE . CORE }
703
+ } ) ;
712
704
}
713
705
}
714
706
}
0 commit comments