3030import static java .net .http .HttpClient .Builder .NO_PROXY ;
3131
3232/**
33- * HttpConnector is responsible for managing HTTP connections and polling data from a specified URL
34- * at regular intervals. It implements the QueueSource interface to enqueue and dequeue change messages.
33+ * HttpConnector is responsible for polling data from a specified URL at regular intervals.
34+ * Notice rate limits for polling http sources like Github.
35+ * It implements the QueueSource interface to enqueue and dequeue change messages.
3536 * The class supports configurable parameters such as poll interval, request timeout, and proxy settings.
3637 * It uses a ScheduledExecutorService to schedule polling tasks and an ExecutorService for HTTP client execution.
3738 * The class also provides methods to initialize, retrieve the stream queue, and shutdown the connector gracefully.
39+ * It supports optional fail-safe initialization via cache.
40+ *
41+ * See readme - Http Connector section.
3842 */
3943@ Slf4j
4044public class HttpConnector implements QueueSource {
4145
4246 private static final int DEFAULT_POLL_INTERVAL_SECONDS = 60 ;
4347 private static final int DEFAULT_LINKED_BLOCKING_QUEUE_CAPACITY = 100 ;
44- private static final int DEFAULT_SCHEDULED_THREAD_POOL_SIZE = 1 ;
48+ private static final int DEFAULT_SCHEDULED_THREAD_POOL_SIZE = 2 ;
4549 private static final int DEFAULT_REQUEST_TIMEOUT_SECONDS = 10 ;
4650 private static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 10 ;
4751
@@ -52,19 +56,19 @@ public class HttpConnector implements QueueSource {
5256 private ExecutorService httpClientExecutor ;
5357 private ScheduledExecutorService scheduler ;
5458 private Map <String , String > headers ;
59+ private PayloadCacheWrapper payloadCacheWrapper ;
60+ private PayloadCache payloadCache ;
61+
5562 @ NonNull
5663 private String url ;
5764
58- // TODO init failure backup cache redis
59-
60- // todo update provider readme
61-
6265 @ Builder
6366 public HttpConnector (Integer pollIntervalSeconds , Integer linkedBlockingQueueCapacity ,
6467 Integer scheduledThreadPoolSize , Integer requestTimeoutSeconds , Integer connectTimeoutSeconds , String url ,
65- Map <String , String > headers , ExecutorService httpClientExecutor , String proxyHost , Integer proxyPort ) {
68+ Map <String , String > headers , ExecutorService httpClientExecutor , String proxyHost , Integer proxyPort ,
69+ PayloadCacheOptions payloadCacheOptions , PayloadCache payloadCache ) {
6670 validate (url , pollIntervalSeconds , linkedBlockingQueueCapacity , scheduledThreadPoolSize , requestTimeoutSeconds ,
67- connectTimeoutSeconds , proxyHost , proxyPort );
71+ connectTimeoutSeconds , proxyHost , proxyPort , payloadCacheOptions , payloadCache );
6872 this .pollIntervalSeconds = pollIntervalSeconds == null ? DEFAULT_POLL_INTERVAL_SECONDS : pollIntervalSeconds ;
6973 int thisLinkedBlockingQueueCapacity = linkedBlockingQueueCapacity == null ? DEFAULT_LINKED_BLOCKING_QUEUE_CAPACITY : linkedBlockingQueueCapacity ;
7074 int thisScheduledThreadPoolSize = scheduledThreadPoolSize == null ? DEFAULT_SCHEDULED_THREAD_POOL_SIZE : scheduledThreadPoolSize ;
@@ -89,12 +93,20 @@ public HttpConnector(Integer pollIntervalSeconds, Integer linkedBlockingQueueCap
8993 .executor (this .httpClientExecutor )
9094 .build ();
9195 this .queue = new LinkedBlockingQueue <>(thisLinkedBlockingQueueCapacity );
96+ this .payloadCache = payloadCache ;
97+ if (payloadCache != null ) {
98+ this .payloadCacheWrapper = PayloadCacheWrapper .builder ()
99+ .payloadCache (payloadCache )
100+ .payloadCacheOptions (payloadCacheOptions )
101+ .build ();
102+ }
92103 }
93104
94105 @ SneakyThrows
95106 private void validate (String url , Integer pollIntervalSeconds , Integer linkedBlockingQueueCapacity ,
96- Integer scheduledThreadPoolSize , Integer requestTimeoutSeconds , Integer connectTimeoutSeconds ,
97- String proxyHost , Integer proxyPort ) {
107+ Integer scheduledThreadPoolSize , Integer requestTimeoutSeconds , Integer connectTimeoutSeconds ,
108+ String proxyHost , Integer proxyPort , PayloadCacheOptions payloadCacheOptions ,
109+ PayloadCache payloadCache ) {
98110 new URL (url ).toURI ();
99111 if (pollIntervalSeconds != null && (pollIntervalSeconds < 1 || pollIntervalSeconds > 600 )) {
100112 throw new IllegalArgumentException ("pollIntervalSeconds must be between 1 and 600" );
@@ -119,6 +131,12 @@ private void validate(String url, Integer pollIntervalSeconds, Integer linkedBlo
119131 } else if (proxyHost == null && proxyPort != null ) {
120132 throw new IllegalArgumentException ("proxyHost must be set if proxyPort is set" );
121133 }
134+ if (payloadCacheOptions != null && payloadCache == null ) {
135+ throw new IllegalArgumentException ("payloadCache must be set if payloadCacheOptions is set" );
136+ }
137+ if (payloadCache != null && payloadCacheOptions == null ) {
138+ throw new IllegalArgumentException ("payloadCacheOptions must be set if payloadCache is set" );
139+ }
122140 }
123141
124142 @ Override
@@ -128,51 +146,79 @@ public void init() throws Exception {
128146
129147 @ Override
130148 public BlockingQueue <QueuePayload > getStreamQueue () {
149+ boolean success = fetchAndUpdate ();
150+ if (!success ) {
151+ log .info ("failed initial fetch" );
152+ if (payloadCache != null ) {
153+ updateFromCache ();
154+ }
155+ }
131156 Runnable pollTask = buildPollTask ();
132-
133- // run first poll immediately and wait for it to finish
134- pollTask .run ();
135-
136157 scheduler .scheduleAtFixedRate (pollTask , pollIntervalSeconds , pollIntervalSeconds , TimeUnit .SECONDS );
137158 return queue ;
138159 }
139160
161+ private void updateFromCache () {
162+ log .info ("taking initial payload from cache to avoid starting with default values" );
163+ String flagData = payloadCache .get ();
164+ if (flagData == null ) {
165+ log .debug ("got null from cache" );
166+ return ;
167+ }
168+ if (!this .queue .offer (new QueuePayload (QueuePayloadType .DATA , flagData ))) {
169+ log .warn ("init: Unable to offer file content to queue: queue is full" );
170+ }
171+ }
172+
140173 protected Runnable buildPollTask () {
141- return () -> {
142- HttpRequest .Builder requestBuilder = HttpRequest .newBuilder ()
143- .uri (URI .create (url ))
144- .timeout (Duration .ofSeconds (requestTimeoutSeconds ))
145- .GET ();
146- headers .forEach (requestBuilder ::header );
147- HttpRequest request = requestBuilder
148- .build ();
174+ return this ::fetchAndUpdate ;
175+ }
149176
150- HttpResponse <String > response ;
151- try {
152- log .debug ("fetching response" );
153- response = execute (request );
154- } catch (IOException e ) {
155- log .info ("could not fetch" , e );
156- return ;
157- } catch (Exception e ) {
158- log .debug ("exception" , e );
159- return ;
160- }
161- log .debug ("fetched response" );
162- if (response .statusCode () != 200 ) {
163- log .info ("received non-successful status code: {} {}" , response .statusCode (), response .body ());
164- return ;
165- }
166- if (!this .queue .offer (new QueuePayload (QueuePayloadType .DATA , response .body ()))) {
167- log .warn ("Unable to offer file content to queue: queue is full" );
168- }
169- };
177+ private boolean fetchAndUpdate () {
178+ HttpRequest .Builder requestBuilder = HttpRequest .newBuilder ()
179+ .uri (URI .create (url ))
180+ .timeout (Duration .ofSeconds (requestTimeoutSeconds ))
181+ .GET ();
182+ headers .forEach (requestBuilder ::header );
183+ HttpRequest request = requestBuilder
184+ .build ();
185+
186+ HttpResponse <String > response ;
187+ try {
188+ log .debug ("fetching response" );
189+ response = execute (request );
190+ } catch (IOException e ) {
191+ log .info ("could not fetch" , e );
192+ return false ;
193+ } catch (Exception e ) {
194+ log .debug ("exception" , e );
195+ return false ;
196+ }
197+ log .debug ("fetched response" );
198+ String payload = response .body ();
199+ if (response .statusCode () != 200 ) {
200+ log .info ("received non-successful status code: {} {}" , response .statusCode (), payload );
201+ return false ;
202+ }
203+ if (payload == null ) {
204+ log .debug ("payload is null" );
205+ return false ;
206+ }
207+ if (!this .queue .offer (new QueuePayload (QueuePayloadType .DATA , payload ))) {
208+ log .warn ("Unable to offer file content to queue: queue is full" );
209+ return false ;
210+ }
211+ if (payloadCacheWrapper != null ) {
212+ log .debug ("scheduling cache update if needed" );
213+ scheduler .execute (() ->
214+ payloadCacheWrapper .updatePayloadIfNeeded (payload )
215+ );
216+ }
217+ return payload != null ;
170218 }
171219
172220 protected HttpResponse <String > execute (HttpRequest request ) throws IOException , InterruptedException {
173- HttpResponse <String > response ;
174- response = client .send (request , HttpResponse .BodyHandlers .ofString ());
175- return response ;
221+ return client .send (request , HttpResponse .BodyHandlers .ofString ());
176222 }
177223
178224 @ Override
0 commit comments