@@ -298,10 +298,6 @@ public synchronized static AccordService startup(NodeId tcmId)
298
298
299
299
AccordService as = new AccordService (AccordTopology .tcmIdToAccord (tcmId ));
300
300
as .startup ();
301
- instance = as ;
302
-
303
- AccordReplicaMetrics .touch ();
304
-
305
301
replayJournal (as );
306
302
307
303
as .finishInitialization ();
@@ -320,6 +316,8 @@ public synchronized static AccordService startup(NodeId tcmId)
320
316
as .configService .registerListener (as .node .durability ());
321
317
as .node .durability ().start ();
322
318
319
+ instance = as ;
320
+ AccordReplicaMetrics .touch ();
323
321
WatermarkCollector .fetchAndReportWatermarksAsync (as .configService );
324
322
return as ;
325
323
}
@@ -456,6 +454,27 @@ public synchronized void startup()
456
454
// Replay local epochs
457
455
for (ImmutableTopoloyImage image : images )
458
456
configService .reportTopology (image .global );
457
+
458
+ // Subscribe to TCM events
459
+ ChangeListener prevListener = MetadataChangeListener .instance .collector .getAndSet (new ChangeListener ()
460
+ {
461
+ @ Override
462
+ public void notifyPostCommit (ClusterMetadata prev , ClusterMetadata next , boolean fromSnapshot )
463
+ {
464
+ if (state != State .SHUTDOWN )
465
+ configService .maybeReportMetadata (next );
466
+ }
467
+ });
468
+
469
+ Invariants .require ((prevListener instanceof MetadataChangeListener .PreInitStateCollector ),
470
+ "Listener should have been initialized with Accord pre-init state collector, but was " + prevListener .getClass ());
471
+
472
+ MetadataChangeListener .PreInitStateCollector preinit = (MetadataChangeListener .PreInitStateCollector ) prevListener ;
473
+ for (ClusterMetadata item : preinit .getItems ())
474
+ {
475
+ if (item .epoch .getEpoch () > Epoch .FIRST .getEpoch ())
476
+ configService .maybeReportMetadata (item );
477
+ }
459
478
}
460
479
461
480
/**
@@ -476,27 +495,6 @@ public void finishInitialization()
476
495
477
496
if (remote != null )
478
497
remote .forEach (configService ::reportTopology , highestKnown + 1 , Integer .MAX_VALUE );
479
-
480
- // Subscribe to TCM events
481
- ChangeListener prevListener = MetadataChangeListener .instance .collector .getAndSet (new ChangeListener ()
482
- {
483
- @ Override
484
- public void notifyPostCommit (ClusterMetadata prev , ClusterMetadata next , boolean fromSnapshot )
485
- {
486
- if (state != State .SHUTDOWN )
487
- configService .maybeReportMetadata (next );
488
- }
489
- });
490
-
491
- Invariants .require ((prevListener instanceof MetadataChangeListener .PreInitStateCollector ),
492
- "Listener should have been initialized with Accord pre-init state collector, but was " + prevListener .getClass ());
493
-
494
- MetadataChangeListener .PreInitStateCollector preinit = (MetadataChangeListener .PreInitStateCollector ) prevListener ;
495
- for (ClusterMetadata item : preinit .getItems ())
496
- {
497
- if (item .epoch .getEpoch () > minEpoch ())
498
- configService .maybeReportMetadata (item );
499
- }
500
498
}
501
499
catch (InterruptedException e )
502
500
{
0 commit comments