1515
1616import java .util .concurrent .atomic .AtomicInteger ;
1717import java .util .concurrent .atomic .AtomicLong ;
18+ import java .util .function .Consumer ;
1819
1920public class RotatingStoreVerticle extends AbstractVerticle {
2021 private static final Logger LOGGER = LoggerFactory .getLogger (RotatingStoreVerticle .class );
@@ -31,47 +32,54 @@ public class RotatingStoreVerticle extends AbstractVerticle {
3132 private final AtomicLong latestVersion = new AtomicLong (-1L );
3233 private final AtomicLong latestEntryCount = new AtomicLong (-1L );
3334 private final AtomicInteger storeRefreshIsFailing = new AtomicInteger (0 );
35+ private final Consumer <Boolean > refreshCallback ;
3436
3537 private final long refreshIntervalMs ;
3638
3739 public RotatingStoreVerticle (String storeName , long refreshIntervalMs , IMetadataVersionedStore versionedStore ) {
40+ this (storeName , refreshIntervalMs , versionedStore , null );
41+ }
42+
43+ public RotatingStoreVerticle (String storeName , long refreshIntervalMs , IMetadataVersionedStore versionedStore ,
44+ Consumer <Boolean > refreshCallback ) {
3845 this .healthComponent = HealthManager .instance .registerComponent (storeName + "-rotator" );
3946 this .healthComponent .setHealthStatus (false , "not started" );
4047
4148 this .storeName = storeName ;
4249 this .counterStoreRefreshed = Counter
43- .builder ("uid2_config_store_refreshed_total" )
44- .tag ("store" , storeName )
45- .description ("counter for how many times " + storeName + " store is refreshed" )
46- .register (Metrics .globalRegistry );
50+ .builder ("uid2_config_store_refreshed_total" )
51+ .tag ("store" , storeName )
52+ .description ("counter for how many times " + storeName + " store is refreshed" )
53+ .register (Metrics .globalRegistry );
4754 this .counterStoreRefreshTimeMs = Counter
48- .builder ("uid2_config_store_refreshtime_ms_total" )
49- .tag ("store" , storeName )
50- .description ("counter for total time (ms) " + storeName + " store spend in refreshing" )
51- .register (Metrics .globalRegistry );
55+ .builder ("uid2_config_store_refreshtime_ms_total" )
56+ .tag ("store" , storeName )
57+ .description ("counter for total time (ms) " + storeName + " store spend in refreshing" )
58+ .register (Metrics .globalRegistry );
5259 this .counterStoreRefreshFailures = Counter
53- .builder ("uid2_config_store_refresh_failures_total" )
54- .tag ("store" , storeName )
55- .description ("counter for number of " + storeName + " store refresh failures" )
56- .register (Metrics .globalRegistry );
60+ .builder ("uid2_config_store_refresh_failures_total" )
61+ .tag ("store" , storeName )
62+ .description ("counter for number of " + storeName + " store refresh failures" )
63+ .register (Metrics .globalRegistry );
5764 this .gaugeStoreVersion = Gauge
58- .builder ("uid2_config_store_version" , () -> this .latestVersion .get ())
59- .tag ("store" , storeName )
60- .description ("gauge for " + storeName + " store version" )
61- .register (Metrics .globalRegistry );
65+ .builder ("uid2_config_store_version" , () -> this .latestVersion .get ())
66+ .tag ("store" , storeName )
67+ .description ("gauge for " + storeName + " store version" )
68+ .register (Metrics .globalRegistry );
6269 this .gaugeStoreEntryCount = Gauge
63- .builder ("uid2_config_store_entry_count" , () -> this .latestEntryCount .get ())
64- .tag ("store" , storeName )
65- .description ("gauge for " + storeName + " store total entry count" )
66- .register (Metrics .globalRegistry );
70+ .builder ("uid2_config_store_entry_count" , () -> this .latestEntryCount .get ())
71+ .tag ("store" , storeName )
72+ .description ("gauge for " + storeName + " store total entry count" )
73+ .register (Metrics .globalRegistry );
6774 this .gaugeConsecutiveRefreshFailures = Gauge
68- .builder ("uid2_config_store_consecutive_refresh_failures" , () -> this .storeRefreshIsFailing .get ())
69- .tag ("store" , storeName )
70- .description ("gauge for number of consecutive " + storeName + " store refresh failures" )
71- .register (Metrics .globalRegistry );
75+ .builder ("uid2_config_store_consecutive_refresh_failures" , () -> this .storeRefreshIsFailing .get ())
76+ .tag ("store" , storeName )
77+ .description ("gauge for number of consecutive " + storeName + " store refresh failures" )
78+ .register (Metrics .globalRegistry );
7279 this .versionedStore = versionedStore ;
7380 this .refreshIntervalMs = refreshIntervalMs ;
7481 this .storeRefreshTimer = Metrics .timer ("uid2_store_refresh_duration" , "store_name" , storeName );
82+ this .refreshCallback = refreshCallback ;
7583 }
7684
7785 @ Override
@@ -98,7 +106,8 @@ private void startRefresh(Promise<Void> promise) {
98106 this .startBackgroundRefresh ();
99107 } else {
100108 this .healthComponent .setHealthStatus (false , ar .cause ().getMessage ());
101- LOGGER .error ("Failed " + this .storeName + " loading. Trying again in " + refreshIntervalMs + "ms" , ar .cause ());
109+ LOGGER .error ("Failed " + this .storeName + " loading. Trying again in " + refreshIntervalMs + "ms" ,
110+ ar .cause ());
102111 vertx .setTimer (refreshIntervalMs , id -> this .startRefresh (promise ));
103112 }
104113 });
@@ -109,23 +118,28 @@ private void startBackgroundRefresh() {
109118 final long start = System .nanoTime ();
110119
111120 vertx .executeBlocking (() -> {
112- this .refresh ();
113- return null ;
114- }).onComplete (asyncResult -> {
115- final long end = System .nanoTime ();
116- final long elapsed = ((end - start ) / 1000000 );
117- this .counterStoreRefreshTimeMs .increment (elapsed );
118- if (asyncResult .failed ()) {
119- this .counterStoreRefreshFailures .increment ();
120- this .storeRefreshIsFailing .set (1 );
121- LOGGER .error ("Failed to load " + this .storeName + ", " + elapsed + " ms" , asyncResult .cause ());
122- } else {
123- this .counterStoreRefreshed .increment ();
124- this .storeRefreshIsFailing .set (0 );
125- LOGGER .trace ("Successfully refreshed " + this .storeName + ", " + elapsed + " ms" );
121+ this .refresh ();
122+ return null ;
123+ }).onComplete (asyncResult -> {
124+ final long end = System .nanoTime ();
125+ final long elapsed = ((end - start ) / 1000000 );
126+ this .counterStoreRefreshTimeMs .increment (elapsed );
127+ if (asyncResult .failed ()) {
128+ this .counterStoreRefreshFailures .increment ();
129+ this .storeRefreshIsFailing .set (1 );
130+ LOGGER .error ("Failed to load " + this .storeName + ", " + elapsed + " ms" , asyncResult .cause ());
131+ if (this .refreshCallback != null ) {
132+ this .refreshCallback .accept (false );
133+ }
134+ } else {
135+ this .counterStoreRefreshed .increment ();
136+ this .storeRefreshIsFailing .set (0 );
137+ LOGGER .trace ("Successfully refreshed " + this .storeName + ", " + elapsed + " ms" );
138+ if (this .refreshCallback != null ) {
139+ this .refreshCallback .accept (true );
126140 }
127141 }
128- );
142+ } );
129143 });
130144 }
131145
0 commit comments