1515
1616import java .util .concurrent .atomic .AtomicInteger ;
1717import java .util .concurrent .atomic .AtomicLong ;
18+ import java .util .function .Consumer ;
1819
1920public class RotatingStoreVerticle extends AbstractVerticle {
2021 private static final Logger LOGGER = LoggerFactory .getLogger (RotatingStoreVerticle .class );
@@ -31,10 +32,16 @@ public class RotatingStoreVerticle extends AbstractVerticle {
3132 private final AtomicLong latestVersion = new AtomicLong (-1L );
3233 private final AtomicLong latestEntryCount = new AtomicLong (-1L );
3334 private final AtomicInteger storeRefreshIsFailing = new AtomicInteger (0 );
35+ private final Consumer <Boolean > refreshCallback ;
3436
3537 private final long refreshIntervalMs ;
3638
3739 public RotatingStoreVerticle (String storeName , long refreshIntervalMs , IMetadataVersionedStore versionedStore ) {
40+ this (storeName , refreshIntervalMs , versionedStore , null );
41+ }
42+
43+ public RotatingStoreVerticle (String storeName , long refreshIntervalMs , IMetadataVersionedStore versionedStore ,
44+ Consumer <Boolean > refreshCallback ) {
3845 this .healthComponent = HealthManager .instance .registerComponent (storeName + "-rotator" );
3946 this .healthComponent .setHealthStatus (false , "not started" );
4047
@@ -72,6 +79,7 @@ public RotatingStoreVerticle(String storeName, long refreshIntervalMs, IMetadata
7279 this .versionedStore = versionedStore ;
7380 this .refreshIntervalMs = refreshIntervalMs ;
7481 this .storeRefreshTimer = Metrics .timer ("uid2_store_refresh_duration" , "store_name" , storeName );
82+ this .refreshCallback = refreshCallback ;
7583 }
7684
7785 @ Override
@@ -119,10 +127,16 @@ private void startBackgroundRefresh() {
119127 this .counterStoreRefreshFailures .increment ();
120128 this .storeRefreshIsFailing .set (1 );
121129 LOGGER .error ("Failed to load " + this .storeName + ", " + elapsed + " ms" , asyncResult .cause ());
130+ if (this .refreshCallback != null ) {
131+ this .refreshCallback .accept (false );
132+ }
122133 } else {
123134 this .counterStoreRefreshed .increment ();
124135 this .storeRefreshIsFailing .set (0 );
125136 LOGGER .trace ("Successfully refreshed " + this .storeName + ", " + elapsed + " ms" );
137+ if (this .refreshCallback != null ) {
138+ this .refreshCallback .accept (true );
139+ }
126140 }
127141 }
128142 );
0 commit comments