1515
1616import java .util .concurrent .atomic .AtomicInteger ;
1717import java .util .concurrent .atomic .AtomicLong ;
18- import java .util .function .Consumer ;
1918
2019public class RotatingStoreVerticle extends AbstractVerticle {
2120 private static final Logger LOGGER = LoggerFactory .getLogger (RotatingStoreVerticle .class );
@@ -32,7 +31,7 @@ public class RotatingStoreVerticle extends AbstractVerticle {
3231 private final AtomicLong latestVersion = new AtomicLong (-1L );
3332 private final AtomicLong latestEntryCount = new AtomicLong (-1L );
3433 private final AtomicInteger storeRefreshIsFailing = new AtomicInteger (0 );
35- private final Consumer < Boolean > refreshCallback ;
34+ private final Runnable refreshCallback ;
3635
3736 private final long refreshIntervalMs ;
3837
@@ -41,7 +40,7 @@ public RotatingStoreVerticle(String storeName, long refreshIntervalMs, IMetadata
4140 }
4241
4342 public RotatingStoreVerticle (String storeName , long refreshIntervalMs , IMetadataVersionedStore versionedStore ,
44- Consumer < Boolean > refreshCallback ) {
43+ Runnable refreshCallback ) {
4544 this .healthComponent = HealthManager .instance .registerComponent (storeName + "-rotator" );
4645 this .healthComponent .setHealthStatus (false , "not started" );
4746
@@ -103,6 +102,9 @@ private void startRefresh(Promise<Void> promise) {
103102 promise .complete ();
104103 storeRefreshTimer .record (java .time .Duration .ofMillis (startupRefreshTimeMs ));
105104 LOGGER .info ("Successful " + this .storeName + " loading. Starting Background Refresh" );
105+ if (this .refreshCallback != null ) {
106+ this .refreshCallback .run ();
107+ }
106108 this .startBackgroundRefresh ();
107109 } else {
108110 this .healthComponent .setHealthStatus (false , ar .cause ().getMessage ());
@@ -127,15 +129,12 @@ private void startBackgroundRefresh() {
127129 this .counterStoreRefreshFailures .increment ();
128130 this .storeRefreshIsFailing .set (1 );
129131 LOGGER .error ("Failed to load " + this .storeName + ", " + elapsed + " ms" , asyncResult .cause ());
130- if (this .refreshCallback != null ) {
131- this .refreshCallback .accept (false );
132- }
133132 } else {
134133 this .counterStoreRefreshed .increment ();
135134 this .storeRefreshIsFailing .set (0 );
136135 LOGGER .trace ("Successfully refreshed " + this .storeName + ", " + elapsed + " ms" );
137136 if (this .refreshCallback != null ) {
138- this .refreshCallback .accept ( true );
137+ this .refreshCallback .run ( );
139138 }
140139 }
141140 }
0 commit comments