Skip to content

Commit c5d93b5

Browse files
committed
refactor for using options
Signed-off-by: liran2000 <[email protected]>
1 parent d8f6943 commit c5d93b5

File tree

12 files changed

+1223
-364
lines changed

12 files changed

+1223
-364
lines changed

providers/flagd/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ If the `in-process` mode is not used, and before the provider is ready, the `get
5656

5757
#### Http Connector
5858
HttpConnector is responsible for polling data from a specified URL at regular intervals.
59-
It is implementing Http cache mechanism with 'ETag' header, then when receiving 304 Not Modified response, reducing traffic and
59+
It is leveraging Http cache mechanism with 'ETag' header, then when receiving 304 Not Modified response, reducing traffic and
6060
changes updates. Can be enabled via useHttpCache option.
6161
One of its benefits is to reduce infrastructure/devops work, without additional containers needed.
6262
The implementation is using Java HttpClient.
Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync;
1+
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync.http;
22

33
import java.net.http.HttpClient;
44
import java.net.http.HttpRequest;
@@ -12,15 +12,16 @@
1212
* to potentially receive a 304 Not Modified response, reducing data transfer.
1313
* Updates the cached ETag and Last-Modified values upon receiving a 200 OK response.
1414
* It does not store the cached response, assuming not needed after first successful fetching.
15+
* Non thread-safe.
1516
*
1617
* @param httpClient the HTTP client used to send the request
1718
* @param httpRequestBuilder the builder for constructing the HTTP request
1819
* @return the HTTP response received from the server
1920
*/
2021
@Slf4j
2122
public class HttpCacheFetcher {
22-
private static String cachedETag = null;
23-
private static String cachedLastModified = null;
23+
private String cachedETag = null;
24+
private String cachedLastModified = null;
2425

2526
@SneakyThrows
2627
public HttpResponse<String> fetchContent(HttpClient httpClient, HttpRequest.Builder httpRequestBuilder) {
@@ -35,8 +36,10 @@ public HttpResponse<String> fetchContent(HttpClient httpClient, HttpRequest.Buil
3536
HttpResponse<String> httpResponse = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
3637

3738
if (httpResponse.statusCode() == 200) {
38-
cachedETag = httpResponse.headers().firstValue("ETag").orElse(null);
39-
cachedLastModified = httpResponse.headers().firstValue("Last-Modified").orElse(null);
39+
if (httpResponse.headers() != null) {
40+
cachedETag = httpResponse.headers().firstValue("ETag").orElse(null);
41+
cachedLastModified = httpResponse.headers().firstValue("Last-Modified").orElse(null);
42+
}
4043
log.debug("fetched new content");
4144
} else if (httpResponse.statusCode() == 304) {
4245
log.debug("got 304 Not Modified");
Lines changed: 18 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,21 @@
1-
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync;
1+
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync.http;
22

33
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
44
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
55
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
66
import dev.openfeature.contrib.providers.flagd.util.ConcurrentUtils;
77
import lombok.Builder;
88
import lombok.NonNull;
9-
import lombok.SneakyThrows;
109
import lombok.extern.slf4j.Slf4j;
1110

1211
import java.io.IOException;
1312
import java.net.InetSocketAddress;
1413
import java.net.ProxySelector;
1514
import java.net.URI;
16-
import java.net.URL;
1715
import java.net.http.HttpClient;
1816
import java.net.http.HttpRequest;
1917
import java.net.http.HttpResponse;
2018
import java.time.Duration;
21-
import java.util.HashMap;
2219
import java.util.Map;
2320
import java.util.concurrent.BlockingQueue;
2421
import java.util.concurrent.ExecutorService;
@@ -43,12 +40,6 @@
4340
@Slf4j
4441
public class HttpConnector implements QueueSource {
4542

46-
private static final int DEFAULT_POLL_INTERVAL_SECONDS = 60;
47-
private static final int DEFAULT_LINKED_BLOCKING_QUEUE_CAPACITY = 100;
48-
private static final int DEFAULT_SCHEDULED_THREAD_POOL_SIZE = 2;
49-
private static final int DEFAULT_REQUEST_TIMEOUT_SECONDS = 10;
50-
private static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 10;
51-
5243
private Integer pollIntervalSeconds;
5344
private Integer requestTimeoutSeconds;
5445
private BlockingQueue<QueuePayload> queue;
@@ -64,85 +55,36 @@ public class HttpConnector implements QueueSource {
6455
private String url;
6556

6657
@Builder
67-
public HttpConnector(Integer pollIntervalSeconds, Integer linkedBlockingQueueCapacity,
68-
Integer scheduledThreadPoolSize, Integer requestTimeoutSeconds, Integer connectTimeoutSeconds, String url,
69-
Map<String, String> headers, ExecutorService httpClientExecutor, String proxyHost, Integer proxyPort,
70-
PayloadCacheOptions payloadCacheOptions, PayloadCache payloadCache, Boolean useHttpCache) {
71-
validate(url, pollIntervalSeconds, linkedBlockingQueueCapacity, scheduledThreadPoolSize, requestTimeoutSeconds,
72-
connectTimeoutSeconds, proxyHost, proxyPort, payloadCacheOptions, payloadCache);
73-
this.pollIntervalSeconds = pollIntervalSeconds == null ? DEFAULT_POLL_INTERVAL_SECONDS : pollIntervalSeconds;
74-
int thisLinkedBlockingQueueCapacity = linkedBlockingQueueCapacity == null ? DEFAULT_LINKED_BLOCKING_QUEUE_CAPACITY : linkedBlockingQueueCapacity;
75-
int thisScheduledThreadPoolSize = scheduledThreadPoolSize == null ? DEFAULT_SCHEDULED_THREAD_POOL_SIZE : scheduledThreadPoolSize;
76-
this.requestTimeoutSeconds = requestTimeoutSeconds == null ? DEFAULT_REQUEST_TIMEOUT_SECONDS : requestTimeoutSeconds;
77-
int thisConnectTimeoutSeconds = connectTimeoutSeconds == null ? DEFAULT_CONNECT_TIMEOUT_SECONDS : connectTimeoutSeconds;
58+
public HttpConnector(HttpConnectorOptions httpConnectorOptions) {
59+
this.pollIntervalSeconds = httpConnectorOptions.getPollIntervalSeconds();
60+
this.requestTimeoutSeconds = httpConnectorOptions.getRequestTimeoutSeconds();
7861
ProxySelector proxySelector = NO_PROXY;
79-
if (proxyHost != null && proxyPort != null) {
80-
proxySelector = ProxySelector.of(new InetSocketAddress(proxyHost, proxyPort));
81-
}
82-
83-
this.url = url;
84-
this.headers = headers;
85-
this.httpClientExecutor = httpClientExecutor == null ? Executors.newFixedThreadPool(1) :
86-
httpClientExecutor;
87-
scheduler = Executors.newScheduledThreadPool(thisScheduledThreadPoolSize);
88-
if (headers == null) {
89-
this.headers = new HashMap<>();
90-
}
62+
if (httpConnectorOptions.getProxyHost() != null && httpConnectorOptions.getProxyPort() != null) {
63+
proxySelector = ProxySelector.of(new InetSocketAddress(httpConnectorOptions.getProxyHost(),
64+
httpConnectorOptions.getProxyPort()));
65+
}
66+
this.url = httpConnectorOptions.getUrl();
67+
this.headers = httpConnectorOptions.getHeaders();
68+
this.httpClientExecutor = httpConnectorOptions.getHttpClientExecutor();
69+
scheduler = Executors.newScheduledThreadPool(httpConnectorOptions.getScheduledThreadPoolSize());
9170
this.client = HttpClient.newBuilder()
92-
.connectTimeout(Duration.ofSeconds(thisConnectTimeoutSeconds))
71+
.connectTimeout(Duration.ofSeconds(httpConnectorOptions.getConnectTimeoutSeconds()))
9372
.proxy(proxySelector)
9473
.executor(this.httpClientExecutor)
9574
.build();
96-
this.queue = new LinkedBlockingQueue<>(thisLinkedBlockingQueueCapacity);
97-
this.payloadCache = payloadCache;
75+
this.queue = new LinkedBlockingQueue<>(httpConnectorOptions.getLinkedBlockingQueueCapacity());
76+
this.payloadCache = httpConnectorOptions.getPayloadCache();
9877
if (payloadCache != null) {
9978
this.payloadCacheWrapper = PayloadCacheWrapper.builder()
10079
.payloadCache(payloadCache)
101-
.payloadCacheOptions(payloadCacheOptions)
80+
.payloadCacheOptions(httpConnectorOptions.getPayloadCacheOptions())
10281
.build();
10382
}
104-
if (Boolean.TRUE.equals(useHttpCache)) {
83+
if (Boolean.TRUE.equals(httpConnectorOptions.getUseHttpCache())) {
10584
httpCacheFetcher = new HttpCacheFetcher();
10685
}
10786
}
10887

109-
@SneakyThrows
110-
private void validate(String url, Integer pollIntervalSeconds, Integer linkedBlockingQueueCapacity,
111-
Integer scheduledThreadPoolSize, Integer requestTimeoutSeconds, Integer connectTimeoutSeconds,
112-
String proxyHost, Integer proxyPort, PayloadCacheOptions payloadCacheOptions,
113-
PayloadCache payloadCache) {
114-
new URL(url).toURI();
115-
if (pollIntervalSeconds != null && (pollIntervalSeconds < 1 || pollIntervalSeconds > 600)) {
116-
throw new IllegalArgumentException("pollIntervalSeconds must be between 1 and 600");
117-
}
118-
if (linkedBlockingQueueCapacity != null && (linkedBlockingQueueCapacity < 1 || linkedBlockingQueueCapacity > 1000)) {
119-
throw new IllegalArgumentException("linkedBlockingQueueCapacity must be between 1 and 1000");
120-
}
121-
if (scheduledThreadPoolSize != null && (scheduledThreadPoolSize < 1 || scheduledThreadPoolSize > 10)) {
122-
throw new IllegalArgumentException("scheduledThreadPoolSize must be between 1 and 10");
123-
}
124-
if (requestTimeoutSeconds != null && (requestTimeoutSeconds < 1 || requestTimeoutSeconds > 60)) {
125-
throw new IllegalArgumentException("requestTimeoutSeconds must be between 1 and 60");
126-
}
127-
if (connectTimeoutSeconds != null && (connectTimeoutSeconds < 1 || connectTimeoutSeconds > 60)) {
128-
throw new IllegalArgumentException("connectTimeoutSeconds must be between 1 and 60");
129-
}
130-
if (proxyPort != null && (proxyPort < 1 || proxyPort > 65535)) {
131-
throw new IllegalArgumentException("proxyPort must be between 1 and 65535");
132-
}
133-
if (proxyHost != null && proxyPort == null ) {
134-
throw new IllegalArgumentException("proxyPort must be set if proxyHost is set");
135-
} else if (proxyHost == null && proxyPort != null) {
136-
throw new IllegalArgumentException("proxyHost must be set if proxyPort is set");
137-
}
138-
if (payloadCacheOptions != null && payloadCache == null) {
139-
throw new IllegalArgumentException("payloadCache must be set if payloadCacheOptions is set");
140-
}
141-
if (payloadCache != null && payloadCacheOptions == null) {
142-
throw new IllegalArgumentException("payloadCacheOptions must be set if payloadCache is set");
143-
}
144-
}
145-
14688
@Override
14789
public void init() throws Exception {
14890
log.info("init Http Connector");
@@ -158,7 +100,7 @@ public BlockingQueue<QueuePayload> getStreamQueue() {
158100
}
159101
}
160102
Runnable pollTask = buildPollTask();
161-
scheduler.scheduleAtFixedRate(pollTask, pollIntervalSeconds, pollIntervalSeconds, TimeUnit.SECONDS);
103+
scheduler.scheduleWithFixedDelay(pollTask, pollIntervalSeconds, pollIntervalSeconds, TimeUnit.SECONDS);
162104
return queue;
163105
}
164106

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync.http;
2+
3+
import java.net.URL;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.Executors;
8+
import lombok.Builder;
9+
import lombok.Getter;
10+
import lombok.NonNull;
11+
import lombok.SneakyThrows;
12+
13+
@Getter
14+
public class HttpConnectorOptions {
15+
16+
@Builder.Default
17+
private Integer pollIntervalSeconds = 60;
18+
@Builder.Default
19+
private Integer connectTimeoutSeconds = 10;
20+
@Builder.Default
21+
private Integer requestTimeoutSeconds = 10;
22+
@Builder.Default
23+
private Integer linkedBlockingQueueCapacity = 100;
24+
@Builder.Default
25+
private Integer scheduledThreadPoolSize = 2;
26+
@Builder.Default
27+
private Map<String, String> headers = new HashMap<>();
28+
@Builder.Default
29+
private ExecutorService httpClientExecutor = Executors.newFixedThreadPool(1);
30+
@Builder.Default
31+
private String proxyHost;
32+
@Builder.Default
33+
private Integer proxyPort;
34+
@Builder.Default
35+
private PayloadCacheOptions payloadCacheOptions;
36+
@Builder.Default
37+
private PayloadCache payloadCache;
38+
@Builder.Default
39+
private Boolean useHttpCache;
40+
@NonNull
41+
private String url;
42+
43+
@Builder
44+
public HttpConnectorOptions(Integer pollIntervalSeconds, Integer linkedBlockingQueueCapacity,
45+
Integer scheduledThreadPoolSize, Integer requestTimeoutSeconds, Integer connectTimeoutSeconds, String url,
46+
Map<String, String> headers, ExecutorService httpClientExecutor, String proxyHost, Integer proxyPort,
47+
PayloadCacheOptions payloadCacheOptions, PayloadCache payloadCache, Boolean useHttpCache) {
48+
validate(url, pollIntervalSeconds, linkedBlockingQueueCapacity, scheduledThreadPoolSize, requestTimeoutSeconds,
49+
connectTimeoutSeconds, proxyHost, proxyPort, payloadCacheOptions, payloadCache);
50+
if (pollIntervalSeconds != null) {
51+
this.pollIntervalSeconds = pollIntervalSeconds;
52+
}
53+
if (linkedBlockingQueueCapacity != null) {
54+
this.linkedBlockingQueueCapacity = linkedBlockingQueueCapacity;
55+
}
56+
if (scheduledThreadPoolSize != null) {
57+
this.scheduledThreadPoolSize = scheduledThreadPoolSize;
58+
}
59+
if (requestTimeoutSeconds != null) {
60+
this.requestTimeoutSeconds = requestTimeoutSeconds;
61+
}
62+
if (connectTimeoutSeconds != null) {
63+
this.connectTimeoutSeconds = connectTimeoutSeconds;
64+
}
65+
this.url = url;
66+
if (headers != null) {
67+
this.headers = headers;
68+
}
69+
if (httpClientExecutor != null) {
70+
this.httpClientExecutor = httpClientExecutor;
71+
}
72+
if (proxyHost != null) {
73+
this.proxyHost = proxyHost;
74+
}
75+
if (proxyPort != null) {
76+
this.proxyPort = proxyPort;
77+
}
78+
if (payloadCache != null) {
79+
this.payloadCache = payloadCache;
80+
}
81+
if (payloadCacheOptions != null) {
82+
this.payloadCacheOptions = payloadCacheOptions;
83+
}
84+
if (useHttpCache != null) {
85+
this.useHttpCache = useHttpCache;
86+
}
87+
}
88+
89+
@SneakyThrows
90+
private void validate(String url, Integer pollIntervalSeconds, Integer linkedBlockingQueueCapacity,
91+
Integer scheduledThreadPoolSize, Integer requestTimeoutSeconds, Integer connectTimeoutSeconds,
92+
String proxyHost, Integer proxyPort, PayloadCacheOptions payloadCacheOptions,
93+
PayloadCache payloadCache) {
94+
new URL(url).toURI();
95+
if (linkedBlockingQueueCapacity != null && (linkedBlockingQueueCapacity < 1 || linkedBlockingQueueCapacity > 1000)) {
96+
throw new IllegalArgumentException("linkedBlockingQueueCapacity must be between 1 and 1000");
97+
}
98+
if (scheduledThreadPoolSize != null && (scheduledThreadPoolSize < 1 || scheduledThreadPoolSize > 10)) {
99+
throw new IllegalArgumentException("scheduledThreadPoolSize must be between 1 and 10");
100+
}
101+
if (requestTimeoutSeconds != null && (requestTimeoutSeconds < 1 || requestTimeoutSeconds > 60)) {
102+
throw new IllegalArgumentException("requestTimeoutSeconds must be between 1 and 60");
103+
}
104+
if (connectTimeoutSeconds != null && (connectTimeoutSeconds < 1 || connectTimeoutSeconds > 60)) {
105+
throw new IllegalArgumentException("connectTimeoutSeconds must be between 1 and 60");
106+
}
107+
if (pollIntervalSeconds != null && (pollIntervalSeconds < 1 || pollIntervalSeconds > 600)) {
108+
throw new IllegalArgumentException("pollIntervalSeconds must be between 1 and 600");
109+
}
110+
if (proxyPort != null && (proxyPort < 1 || proxyPort > 65535)) {
111+
throw new IllegalArgumentException("proxyPort must be between 1 and 65535");
112+
}
113+
if (proxyHost != null && proxyPort == null ) {
114+
throw new IllegalArgumentException("proxyPort must be set if proxyHost is set");
115+
} else if (proxyHost == null && proxyPort != null) {
116+
throw new IllegalArgumentException("proxyHost must be set if proxyPort is set");
117+
}
118+
if (payloadCacheOptions != null && payloadCache == null) {
119+
throw new IllegalArgumentException("payloadCache must be set if payloadCacheOptions is set");
120+
}
121+
if (payloadCache != null && payloadCacheOptions == null) {
122+
throw new IllegalArgumentException("payloadCacheOptions must be set if payloadCache is set");
123+
}
124+
}
125+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync;
1+
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync.http;
22

33
public interface PayloadCache {
44
public void put(String payload);
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync;
1+
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync.http;
22

33
import lombok.Builder;
44
import lombok.Getter;
Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync;
1+
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync.http;
22

33
import lombok.Builder;
4-
import lombok.Getter;
54
import lombok.extern.slf4j.Slf4j;
65

76
/**
@@ -23,28 +22,32 @@ public class PayloadCacheWrapper {
2322

2423
@Builder
2524
public PayloadCacheWrapper(PayloadCache payloadCache, PayloadCacheOptions payloadCacheOptions) {
26-
if (payloadCacheOptions.getUpdateIntervalSeconds() < 500) {
27-
throw new IllegalArgumentException("pollIntervalSeconds must be larger than 500");
25+
if (payloadCacheOptions.getUpdateIntervalSeconds() < 1) {
26+
throw new IllegalArgumentException("pollIntervalSeconds must be larger than 0");
2827
}
29-
this.updateIntervalMs = payloadCacheOptions.getUpdateIntervalSeconds() * 1000;
28+
this.updateIntervalMs = payloadCacheOptions.getUpdateIntervalSeconds() * 1000L;
3029
this.payloadCache = payloadCache;
3130
}
3231

3332
public void updatePayloadIfNeeded(String payload) {
34-
if ((System.currentTimeMillis() - lastUpdateTimeMs) < updateIntervalMs) {
33+
if ((getCurrentTimeMillis() - lastUpdateTimeMs) < updateIntervalMs) {
3534
log.debug("not updating payload, updateIntervalMs not reached");
3635
return;
3736
}
3837

3938
try {
4039
log.debug("updating payload");
4140
payloadCache.put(payload);
42-
lastUpdateTimeMs = System.currentTimeMillis();
41+
lastUpdateTimeMs = getCurrentTimeMillis();
4342
} catch (Exception e) {
4443
log.error("failed updating cache", e);
4544
}
4645
}
4746

47+
protected long getCurrentTimeMillis() {
48+
return System.currentTimeMillis();
49+
}
50+
4851
public String get() {
4952
try {
5053
return payloadCache.get();

0 commit comments

Comments
 (0)