Skip to content

Commit f2b70ce

Browse files
committed
add polling cache support
Signed-off-by: liran2000 <[email protected]>
1 parent 8fac595 commit f2b70ce

File tree

8 files changed

+224
-75
lines changed

8 files changed

+224
-75
lines changed

tools/flagd-http-connector/README.md

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ The implementation is using Java HttpClient.
1515

1616
## Use cases and benefits
1717
* Reduce infrastructure/devops work, without additional containers needed.
18+
* Reduce latency, since the data is fetched in-process.
19+
* Reduce external network traffic from the Http source even without a flagd separate container / proxy when
20+
polling cache is used.
1821
* Use as an additional provider for fallback / internal backup service via multi-provider.
1922

2023
### What happens if the Http source is down when application is starting ?
@@ -24,25 +27,43 @@ source downtime window, initial payload is taken from cache to avoid starting w
2427
the source is back up. Therefore, the cache ttl expected to be higher than the expected source
2528
down-time to recover from during initialization.
2629

30+
### Polling cache
31+
The polling cache is used to store the payload fetched from the URL.
32+
Used when usePollingCache is configured as true.
33+
A key advantage of this cache is that it enables a single microservice within a cluster to handle the polling of a
34+
URL, effectively acting as a flagd/proxy while all other services leverage the shared cache.
35+
This approach optimizes resource usage by preventing redundant polling across services.
36+
2737
### Sample flow
2838
Sample flow can use:
2939
- Github as the flags payload source.
30-
- Redis cache as a fail-safe initialization cache.
40+
- Redis cache as a fail-safe initialization cache and as a polling cache.
3141

3242
Sample flow of initialization during Github down-time window, showing that application can still use flags
3343
values as fetched from cache.
3444
```mermaid
3545
sequenceDiagram
36-
participant Provider
37-
participant Github
46+
box Cluster
47+
participant micro-service-1
48+
participant micro-service-2
49+
participant micro-service-3
3850
participant Redis
51+
end
52+
participant Github
3953
4054
break source downtime
41-
Provider->>Github: initialize
42-
Github->>Provider: failure
55+
micro-service-1->>Github: initialize
56+
Github->>micro-service-1: failure
4357
end
44-
Provider->>Redis: fetch
45-
Redis->>Provider: last payload
58+
micro-service-1->>Redis: fetch
59+
Redis->>micro-service-1: failsafe payload
60+
Note right of micro-service-1: polling interval passed
61+
micro-service-1->>Github: fetch
62+
Github->>micro-service-1: payload
63+
micro-service-2->>Redis: fetch
64+
Redis->>micro-service-2: payload
65+
micro-service-3->>Redis: fetch
66+
Redis->>micro-service-3: payload
4667
4768
```
4869

@@ -97,7 +118,6 @@ The Http Connector can be configured using the following properties in the `Http
97118
| payloadCacheOptions | PayloadCacheOptions | Options for configuring the payload cache. Default is null. |
98119
| payloadCache | PayloadCache | The payload cache to use for caching responses. Default is null. |
99120
| useHttpCache | Boolean | Whether to use HTTP caching for the requests. Default is false. |
121+
| useFailsafeCache | Boolean | Whether to use a failsafe cache for initialization. Default is false. |
122+
| usePollingCache | Boolean | Whether to use a polling cache for initialization. Default is false. |
100123
| PayloadCacheOptions.updateIntervalSeconds | Integer | The interval, in seconds, at which the cache is updated. By default, this is set to 30 minutes. The goal is to avoid overloading fallback cache writes, since the cache serves only as a fallback mechanism. Typically, this value can be tuned to be shorter than the cache's TTL, balancing the need to minimize unnecessary updates while still handling edge cases effectively. |
101-
102-
103-
Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,21 @@
1515
* conditionally update the cache and {@link #get()} to retrieve the cached payload.</p>
1616
*/
1717
@Slf4j
18-
public class PayloadCacheWrapper {
18+
public class FailSafeCache {
19+
public static final String FAILSAFE_PAYLOAD_CACHE_KEY = FailSafeCache.class.getSimpleName()
20+
+ ".failsafe-payload";
1921
private long lastUpdateTimeMs;
2022
private long updateIntervalMs;
2123
private PayloadCache payloadCache;
2224

2325
/**
24-
* Constructor for PayloadCacheWrapper.
26+
* Constructor for FailSafeCache.
2527
*
2628
* @param payloadCache the payload cache to be used
2729
* @param payloadCacheOptions the options for configuring the cache
2830
*/
2931
@Builder
30-
public PayloadCacheWrapper(PayloadCache payloadCache, PayloadCacheOptions payloadCacheOptions) {
32+
public FailSafeCache(PayloadCache payloadCache, PayloadCacheOptions payloadCacheOptions) {
3133
if (payloadCacheOptions.getUpdateIntervalSeconds() < 1) {
3234
throw new IllegalArgumentException("pollIntervalSeconds must be larger than 0");
3335
}
@@ -48,7 +50,7 @@ public void updatePayloadIfNeeded(String payload) {
4850

4951
try {
5052
log.debug("updating payload");
51-
payloadCache.put(payload);
53+
payloadCache.put(FAILSAFE_PAYLOAD_CACHE_KEY, payload);
5254
lastUpdateTimeMs = getCurrentTimeMillis();
5355
} catch (Exception e) {
5456
log.error("failed updating cache", e);
@@ -66,7 +68,7 @@ protected long getCurrentTimeMillis() {
6668
*/
6769
public String get() {
6870
try {
69-
return payloadCache.get();
71+
return payloadCache.get(FAILSAFE_PAYLOAD_CACHE_KEY);
7072
} catch (Exception e) {
7173
log.error("failed getting from cache", e);
7274
return null;

tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpConnector.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,19 @@
3838
@Slf4j
3939
public class HttpConnector implements QueueSource {
4040

41+
public static final String POLLING_PAYLOAD_CACHE_KEY = HttpConnector.class.getSimpleName() + ".polling-payload";
4142
private Integer pollIntervalSeconds;
4243
private Integer requestTimeoutSeconds;
4344
private BlockingQueue<QueuePayload> queue;
4445
private HttpClient client;
4546
private ExecutorService httpClientExecutor;
4647
private ScheduledExecutorService scheduler;
4748
private Map<String, String> headers;
48-
private PayloadCacheWrapper payloadCacheWrapper;
49+
private FailSafeCache failSafeCache;
4950
private PayloadCache payloadCache;
5051
private HttpCacheFetcher httpCacheFetcher;
52+
private int payloadCachePollTtlSeconds;
53+
private boolean usePollingCache;
5154

5255
@NonNull
5356
private String url;
@@ -77,15 +80,17 @@ public HttpConnector(HttpConnectorOptions httpConnectorOptions) {
7780
.build();
7881
this.queue = new LinkedBlockingQueue<>(httpConnectorOptions.getLinkedBlockingQueueCapacity());
7982
this.payloadCache = httpConnectorOptions.getPayloadCache();
80-
if (payloadCache != null) {
81-
this.payloadCacheWrapper = PayloadCacheWrapper.builder()
83+
if (payloadCache != null && Boolean.TRUE.equals(httpConnectorOptions.getUseFailsafeCache())) {
84+
this.failSafeCache = FailSafeCache.builder()
8285
.payloadCache(payloadCache)
8386
.payloadCacheOptions(httpConnectorOptions.getPayloadCacheOptions())
8487
.build();
8588
}
8689
if (Boolean.TRUE.equals(httpConnectorOptions.getUseHttpCache())) {
8790
httpCacheFetcher = new HttpCacheFetcher();
8891
}
92+
payloadCachePollTtlSeconds = pollIntervalSeconds; // safety margin
93+
this.usePollingCache = Boolean.TRUE.equals(httpConnectorOptions.getUsePollingCache());
8994
}
9095

9196
@Override
@@ -98,18 +103,18 @@ public BlockingQueue<QueuePayload> getStreamQueue() {
98103
boolean success = fetchAndUpdate();
99104
if (!success) {
100105
log.info("failed initial fetch");
101-
if (payloadCache != null) {
102-
updateFromCache();
106+
if (failSafeCache != null) {
107+
updateFromFailsafeCache();
103108
}
104109
}
105110
Runnable pollTask = buildPollTask();
106111
scheduler.scheduleWithFixedDelay(pollTask, pollIntervalSeconds, pollIntervalSeconds, TimeUnit.SECONDS);
107112
return queue;
108113
}
109114

110-
private void updateFromCache() {
115+
private void updateFromFailsafeCache() {
111116
log.info("taking initial payload from cache to avoid starting with default values");
112-
String flagData = payloadCache.get();
117+
String flagData = failSafeCache.get();
113118
if (flagData == null) {
114119
log.debug("got null from cache");
115120
return;
@@ -124,6 +129,14 @@ protected Runnable buildPollTask() {
124129
}
125130

126131
private boolean fetchAndUpdate() {
132+
if (payloadCache != null && usePollingCache) {
133+
log.debug("checking cache for polling payload");
134+
String payload = payloadCache.get(POLLING_PAYLOAD_CACHE_KEY);
135+
if (payload != null) {
136+
log.debug("got payload from polling cache key, skipping update");
137+
return false;
138+
}
139+
}
127140
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
128141
.uri(URI.create(url))
129142
.timeout(Duration.ofSeconds(requestTimeoutSeconds))
@@ -159,10 +172,18 @@ private boolean fetchAndUpdate() {
159172
log.warn("Unable to offer file content to queue: queue is full");
160173
return false;
161174
}
162-
if (payloadCacheWrapper != null) {
175+
if (payloadCache != null) {
163176
log.debug("scheduling cache update if needed");
164-
scheduler.execute(() ->
165-
payloadCacheWrapper.updatePayloadIfNeeded(payload)
177+
scheduler.execute(() -> {
178+
if (failSafeCache != null) {
179+
log.debug("updating payload in failsafe cache if needed");
180+
failSafeCache.updatePayloadIfNeeded(payload);
181+
}
182+
if (payloadCache != null) {
183+
log.debug("updating polling payload in cache");
184+
payloadCache.put(POLLING_PAYLOAD_CACHE_KEY, payload, payloadCachePollTtlSeconds);
185+
}
186+
}
166187
);
167188
}
168189
return true;

tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpConnectorOptions.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ public class HttpConnectorOptions {
4040
private PayloadCache payloadCache;
4141
@Builder.Default
4242
private Boolean useHttpCache;
43+
@Builder.Default
44+
private Boolean useFailsafeCache;
45+
@Builder.Default
46+
private Boolean usePollingCache;
4347
@NonNull
4448
private String url;
4549

@@ -50,9 +54,11 @@ public class HttpConnectorOptions {
5054
public HttpConnectorOptions(Integer pollIntervalSeconds, Integer linkedBlockingQueueCapacity,
5155
Integer scheduledThreadPoolSize, Integer requestTimeoutSeconds, Integer connectTimeoutSeconds, String url,
5256
Map<String, String> headers, ExecutorService httpClientExecutor, String proxyHost, Integer proxyPort,
53-
PayloadCacheOptions payloadCacheOptions, PayloadCache payloadCache, Boolean useHttpCache) {
57+
PayloadCacheOptions payloadCacheOptions, PayloadCache payloadCache, Boolean useHttpCache,
58+
Boolean useFailsafeCache, Boolean usePollingCache) {
5459
validate(url, pollIntervalSeconds, linkedBlockingQueueCapacity, scheduledThreadPoolSize, requestTimeoutSeconds,
55-
connectTimeoutSeconds, proxyHost, proxyPort, payloadCacheOptions, payloadCache);
60+
connectTimeoutSeconds, proxyHost, proxyPort, payloadCacheOptions, payloadCache, useFailsafeCache,
61+
usePollingCache);
5662
if (pollIntervalSeconds != null) {
5763
this.pollIntervalSeconds = pollIntervalSeconds;
5864
}
@@ -90,13 +96,19 @@ public HttpConnectorOptions(Integer pollIntervalSeconds, Integer linkedBlockingQ
9096
if (useHttpCache != null) {
9197
this.useHttpCache = useHttpCache;
9298
}
99+
if (useFailsafeCache != null) {
100+
this.useFailsafeCache = useFailsafeCache;
101+
}
102+
if (usePollingCache != null) {
103+
this.usePollingCache = usePollingCache;
104+
}
93105
}
94106

95107
@SneakyThrows
96108
private void validate(String url, Integer pollIntervalSeconds, Integer linkedBlockingQueueCapacity,
97109
Integer scheduledThreadPoolSize, Integer requestTimeoutSeconds, Integer connectTimeoutSeconds,
98110
String proxyHost, Integer proxyPort, PayloadCacheOptions payloadCacheOptions,
99-
PayloadCache payloadCache) {
111+
PayloadCache payloadCache, Boolean useFailsafeCache, Boolean usePollingCache) {
100112
new URL(url).toURI();
101113
if (linkedBlockingQueueCapacity != null
102114
&& (linkedBlockingQueueCapacity < 1 || linkedBlockingQueueCapacity > 1000)) {
@@ -128,5 +140,26 @@ private void validate(String url, Integer pollIntervalSeconds, Integer linkedBlo
128140
if (payloadCache != null && payloadCacheOptions == null) {
129141
throw new IllegalArgumentException("payloadCacheOptions must be set if payloadCache is set");
130142
}
143+
if ((Boolean.TRUE.equals(useFailsafeCache) || Boolean.TRUE.equals(usePollingCache)) && payloadCache == null) {
144+
throw new IllegalArgumentException(
145+
"payloadCache must be set if useFailsafeCache or usePollingCache is set");
146+
}
147+
148+
if (payloadCache != null && Boolean.TRUE.equals(usePollingCache)) {
149+
150+
// verify payloadCache overrides put(String key, String payload, int ttlSeconds)
151+
boolean overridesTtlPutMethod = false;
152+
try {
153+
var method = payloadCache.getClass().getMethod("put", String.class, String.class, int.class);
154+
// Check if the method is declared in the class and not inherited
155+
overridesTtlPutMethod = method.getDeclaringClass() != PayloadCache.class;
156+
} catch (NoSuchMethodException e) {
157+
// Method does not exist
158+
}
159+
if (!overridesTtlPutMethod) {
160+
throw new IllegalArgumentException("when usePollingCache is used, payloadCache must override "
161+
+ "put(String key, String payload, int ttlSeconds)");
162+
}
163+
}
131164
}
132165
}

tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/PayloadCache.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,18 @@
55
*/
66
public interface PayloadCache {
77

8-
void put(String payload);
8+
void put(String key, String payload);
99

10-
String get();
10+
String get(String key);
11+
12+
/**
13+
* Put a payload into the cache with a time-to-live (TTL) value.
14+
* Must implement if HttpConnectorOptions.usePollingCache is true.
15+
* @param key
16+
* @param payload
17+
* @param ttlSeconds
18+
*/
19+
default void put(String key, String payload, int ttlSeconds) {
20+
throw new UnsupportedOperationException("put with ttl not supported");
21+
}
1122
}

0 commit comments

Comments
 (0)