Skip to content

Commit b2970ad

Browse files
committed
Add callback support to RotatingStoreVerticle for keyset key fail-fast feature
- Add Consumer<Boolean> refreshCallback field - Add constructor overload to accept callback - Call callback on refresh success/failure - Maintains backward compatibility with existing constructor
1 parent 2701812 commit b2970ad

File tree

1 file changed

+55
-40
lines changed

1 file changed

+55
-40
lines changed

src/main/java/com/uid2/shared/vertx/RotatingStoreVerticle.java

Lines changed: 55 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import java.util.concurrent.atomic.AtomicInteger;
1717
import java.util.concurrent.atomic.AtomicLong;
18+
import java.util.function.Consumer;
1819

1920
public class RotatingStoreVerticle extends AbstractVerticle {
2021
private static final Logger LOGGER = LoggerFactory.getLogger(RotatingStoreVerticle.class);
@@ -31,47 +32,55 @@ public class RotatingStoreVerticle extends AbstractVerticle {
3132
private final AtomicLong latestVersion = new AtomicLong(-1L);
3233
private final AtomicLong latestEntryCount = new AtomicLong(-1L);
3334
private final AtomicInteger storeRefreshIsFailing = new AtomicInteger(0);
35+
private final Consumer<Boolean> refreshCallback;
3436

3537
private final long refreshIntervalMs;
3638

3739
public RotatingStoreVerticle(String storeName, long refreshIntervalMs, IMetadataVersionedStore versionedStore) {
40+
this(storeName, refreshIntervalMs, versionedStore, null);
41+
}
42+
43+
public RotatingStoreVerticle(String storeName, long refreshIntervalMs, IMetadataVersionedStore versionedStore,
44+
Consumer<Boolean> refreshCallback) {
3845
this.healthComponent = HealthManager.instance.registerComponent(storeName + "-rotator");
3946
this.healthComponent.setHealthStatus(false, "not started");
4047

4148
this.storeName = storeName;
4249
this.counterStoreRefreshed = Counter
43-
.builder("uid2_config_store_refreshed_total")
44-
.tag("store", storeName)
45-
.description("counter for how many times " + storeName + " store is refreshed")
46-
.register(Metrics.globalRegistry);
50+
.builder("uid2_config_store_refreshed_total")
51+
.tag("store", storeName)
52+
.description("counter for how many times " + storeName + " store is refreshed")
53+
.register(Metrics.globalRegistry);
4754
this.counterStoreRefreshTimeMs = Counter
48-
.builder("uid2_config_store_refreshtime_ms_total")
49-
.tag("store", storeName)
50-
.description("counter for total time (ms) " + storeName + " store spend in refreshing")
51-
.register(Metrics.globalRegistry);
55+
.builder("uid2_config_store_refreshtime_ms_total")
56+
.tag("store", storeName)
57+
.description("counter for total time (ms) " + storeName + " store spend in refreshing")
58+
.register(Metrics.globalRegistry);
5259
this.counterStoreRefreshFailures = Counter
53-
.builder("uid2_config_store_refresh_failures_total")
54-
.tag("store", storeName)
55-
.description("counter for number of " + storeName + " store refresh failures")
56-
.register(Metrics.globalRegistry);
60+
.builder("uid2_config_store_refresh_failures_total")
61+
.tag("store", storeName)
62+
.description("counter for number of " + storeName + " store refresh failures")
63+
.register(Metrics.globalRegistry);
5764
this.gaugeStoreVersion = Gauge
58-
.builder("uid2_config_store_version", () -> this.latestVersion.get())
59-
.tag("store", storeName)
60-
.description("gauge for " + storeName + " store version")
61-
.register(Metrics.globalRegistry);
65+
.builder("uid2_config_store_version", () -> this.latestVersion.get())
66+
.tag("store", storeName)
67+
.description("gauge for " + storeName + " store version")
68+
.register(Metrics.globalRegistry);
6269
this.gaugeStoreEntryCount = Gauge
63-
.builder("uid2_config_store_entry_count", () -> this.latestEntryCount.get())
64-
.tag("store", storeName)
65-
.description("gauge for " + storeName + " store total entry count")
66-
.register(Metrics.globalRegistry);
70+
.builder("uid2_config_store_entry_count", () -> this.latestEntryCount.get())
71+
.tag("store", storeName)
72+
.description("gauge for " + storeName + " store total entry count")
73+
.register(Metrics.globalRegistry);
6774
this.gaugeConsecutiveRefreshFailures = Gauge
68-
.builder("uid2_config_store_consecutive_refresh_failures", () -> this.storeRefreshIsFailing.get())
69-
.tag("store", storeName)
70-
.description("gauge for number of consecutive " + storeName + " store refresh failures")
71-
.register(Metrics.globalRegistry);
75+
.builder("uid2_config_store_consecutive_refresh_failures", () -> this.storeRefreshIsFailing.get())
76+
.tag("store", storeName)
77+
.description("gauge for number of consecutive " + storeName + " store refresh failures")
78+
.register(Metrics.globalRegistry);
7279
this.versionedStore = versionedStore;
7380
this.refreshIntervalMs = refreshIntervalMs;
7481
this.storeRefreshTimer = Metrics.timer("uid2_store_refresh_duration", "store_name", storeName);
82+
this.refreshCallback = refreshCallback;
83+
this.refreshCallback = refreshCallback;
7584
}
7685

7786
@Override
@@ -98,7 +107,8 @@ private void startRefresh(Promise<Void> promise) {
98107
this.startBackgroundRefresh();
99108
} else {
100109
this.healthComponent.setHealthStatus(false, ar.cause().getMessage());
101-
LOGGER.error("Failed " + this.storeName + " loading. Trying again in " + refreshIntervalMs + "ms", ar.cause());
110+
LOGGER.error("Failed " + this.storeName + " loading. Trying again in " + refreshIntervalMs + "ms",
111+
ar.cause());
102112
vertx.setTimer(refreshIntervalMs, id -> this.startRefresh(promise));
103113
}
104114
});
@@ -109,23 +119,28 @@ private void startBackgroundRefresh() {
109119
final long start = System.nanoTime();
110120

111121
vertx.executeBlocking(() -> {
112-
this.refresh();
113-
return null;
114-
}).onComplete(asyncResult -> {
115-
final long end = System.nanoTime();
116-
final long elapsed = ((end - start) / 1000000);
117-
this.counterStoreRefreshTimeMs.increment(elapsed);
118-
if (asyncResult.failed()) {
119-
this.counterStoreRefreshFailures.increment();
120-
this.storeRefreshIsFailing.set(1);
121-
LOGGER.error("Failed to load " + this.storeName + ", " + elapsed + " ms", asyncResult.cause());
122-
} else {
123-
this.counterStoreRefreshed.increment();
124-
this.storeRefreshIsFailing.set(0);
125-
LOGGER.trace("Successfully refreshed " + this.storeName + ", " + elapsed + " ms");
122+
this.refresh();
123+
return null;
124+
}).onComplete(asyncResult -> {
125+
final long end = System.nanoTime();
126+
final long elapsed = ((end - start) / 1000000);
127+
this.counterStoreRefreshTimeMs.increment(elapsed);
128+
if (asyncResult.failed()) {
129+
this.counterStoreRefreshFailures.increment();
130+
this.storeRefreshIsFailing.set(1);
131+
LOGGER.error("Failed to load " + this.storeName + ", " + elapsed + " ms", asyncResult.cause());
132+
if (this.refreshCallback != null) {
133+
this.refreshCallback.accept(false);
134+
}
135+
} else {
136+
this.counterStoreRefreshed.increment();
137+
this.storeRefreshIsFailing.set(0);
138+
LOGGER.trace("Successfully refreshed " + this.storeName + ", " + elapsed + " ms");
139+
if (this.refreshCallback != null) {
140+
this.refreshCallback.accept(true);
126141
}
127142
}
128-
);
143+
});
129144
});
130145
}
131146

0 commit comments

Comments
 (0)