2727import java .util .concurrent .TimeoutException ;
2828import java .util .concurrent .atomic .AtomicBoolean ;
2929import java .util .concurrent .atomic .AtomicReference ;
30- import java .util .concurrent .locks .Lock ;
31- import java .util .concurrent .locks .ReentrantLock ;
3230import java .util .function .Supplier ;
3331import javax .annotation .Nonnull ;
3432import javax .annotation .Nullable ;
3937
4038public final class HttpRequestService implements RequestService {
4139 private final HttpSender requestSender ;
40+ // must be a single threaded executor, the code in this class relies on requests being processed
41+ // serially
4242 private final ScheduledExecutorService executorService ;
43- private final PeriodicDelay periodicRequestDelay ;
44- private final PeriodicDelay periodicRetryDelay ;
4543 private final AtomicBoolean isRunning = new AtomicBoolean (false );
4644 private final AtomicBoolean hasStopped = new AtomicBoolean (false );
47- private final AtomicReference < PeriodicDelay > currentDelay ;
45+ private final ConnectionStatus connectionStatus ;
4846 private final AtomicReference <ScheduledFuture <?>> scheduledTask = new AtomicReference <>();
49- private final Lock sendLock = new ReentrantLock ();
5047 private final RetryAfterParser retryAfterParser ;
5148 @ Nullable private Callback callback ;
5249 @ Nullable private Supplier <Request > requestSupplier ;
@@ -89,10 +86,8 @@ public static HttpRequestService create(
8986 RetryAfterParser retryAfterParser ) {
9087 this .requestSender = requestSender ;
9188 this .executorService = executorService ;
92- this .periodicRequestDelay = periodicRequestDelay ;
93- this .periodicRetryDelay = periodicRetryDelay ;
9489 this .retryAfterParser = retryAfterParser ;
95- currentDelay = new AtomicReference <> (periodicRequestDelay );
90+ this . connectionStatus = new ConnectionStatus (periodicRequestDelay , periodicRetryDelay );
9691 }
9792
9893 @ Override
@@ -109,10 +104,6 @@ public void start(Callback callback, Supplier<Request> requestSupplier) {
109104 }
110105 }
111106
112- private Duration getNextDelay () {
113- return Objects .requireNonNull (currentDelay .get ()).getNextDelay ();
114- }
115-
116107 @ Override
117108 public void stop () {
118109 if (isRunning .compareAndSet (true , false )) {
@@ -127,32 +118,16 @@ public void sendRequest() {
127118 throw new IllegalStateException ("HttpRequestService is not running" );
128119 }
129120
130- executorService .execute (this ::requestOnDemand );
131- }
132-
133- private void requestOnDemand () {
134- if (sendLock .tryLock ()) {
135- try {
136- ScheduledFuture <?> scheduledFuture = scheduledTask .get ();
137- if (scheduledFuture != null ) {
138- // Cancel future task
139- scheduledFuture .cancel (false );
140- }
141- sendAndScheduleNext ();
142- } finally {
143- sendLock .unlock ();
144- }
145- }
146- }
147-
148- private void periodicRequest () {
149- if (sendLock .tryLock ()) {
150- try {
151- sendAndScheduleNext ();
152- } finally {
153- sendLock .unlock ();
154- }
155- }
121+ executorService .execute (
122+ () -> {
123+ // cancel the already scheduled task, a new one is created after current request is
124+ // processed
125+ ScheduledFuture <?> scheduledFuture = scheduledTask .get ();
126+ if (scheduledFuture != null ) {
127+ scheduledFuture .cancel (false );
128+ }
129+ sendAndScheduleNext ();
130+ });
156131 }
157132
158133 private void sendAndScheduleNext () {
@@ -163,7 +138,9 @@ private void sendAndScheduleNext() {
163138 private void scheduleNextExecution () {
164139 scheduledTask .set (
165140 executorService .schedule (
166- this ::periodicRequest , getNextDelay ().toNanos (), TimeUnit .NANOSECONDS ));
141+ this ::sendAndScheduleNext ,
142+ connectionStatus .getNextDelay ().toNanos (),
143+ TimeUnit .NANOSECONDS ));
167144 }
168145
169146 private void doSendRequest () {
@@ -204,7 +181,7 @@ private void handleHttpError(HttpSender.Response response) {
204181 retryAfter = duration .get ();
205182 }
206183 }
207- useRetryDelay (retryAfter );
184+ connectionStatus . retryAfter (retryAfter );
208185 }
209186 }
210187
@@ -213,7 +190,7 @@ private static boolean isSuccessful(HttpSender.Response response) {
213190 }
214191
215192 private void handleHttpSuccess (Response response ) {
216- useRegularDelay ();
193+ connectionStatus . success ();
217194 ServerToAgent serverToAgent = response .getServerToAgent ();
218195
219196 if (serverToAgent .error_response != null ) {
@@ -229,28 +206,54 @@ private void handleErrorResponse(ServerErrorResponse errorResponse) {
229206 if (errorResponse .retry_info != null ) {
230207 retryAfter = Duration .ofNanos (errorResponse .retry_info .retry_after_nanoseconds );
231208 }
232- useRetryDelay (retryAfter );
209+ connectionStatus . retryAfter (retryAfter );
233210 }
234211 getCallback ().onRequestFailed (new OpampServerResponseError (errorResponse .error_message ));
235212 }
236213
237- private void useRegularDelay () {
238- if (currentDelay .compareAndSet (periodicRetryDelay , periodicRequestDelay )) {
239- periodicRequestDelay .reset ();
240- }
214+ private Callback getCallback () {
215+ return Objects .requireNonNull (callback );
241216 }
242217
243- private void useRetryDelay (@ Nullable Duration retryAfter ) {
244- if (currentDelay .compareAndSet (periodicRequestDelay , periodicRetryDelay )) {
245- periodicRetryDelay .reset ();
246- if (retryAfter != null && periodicRetryDelay instanceof AcceptsDelaySuggestion ) {
247- ((AcceptsDelaySuggestion ) periodicRetryDelay ).suggestDelay (retryAfter );
218+ // this class is only used from a single threaded ScheduledExecutorService, hence no
219+ // synchronization is needed
220+ private static class ConnectionStatus {
221+ private final PeriodicDelay periodicRequestDelay ;
222+ private final PeriodicDelay periodicRetryDelay ;
223+
224+ private boolean retrying ;
225+ private PeriodicDelay currentDelay ;
226+
227+ ConnectionStatus (PeriodicDelay periodicRequestDelay , PeriodicDelay periodicRetryDelay ) {
228+ this .periodicRequestDelay = periodicRequestDelay ;
229+ this .periodicRetryDelay = periodicRetryDelay ;
230+ currentDelay = periodicRequestDelay ;
231+ }
232+
233+ void success () {
234+ // after successful request transition from retry to regular delay
235+ if (retrying ) {
236+ retrying = false ;
237+ periodicRequestDelay .reset ();
238+ currentDelay = periodicRequestDelay ;
248239 }
249240 }
250- }
251241
252- private Callback getCallback () {
253- return Objects .requireNonNull (callback );
242+ void retryAfter (@ Nullable Duration retryAfter ) {
243+ // after failed request transition from regular to retry delay
244+ if (!retrying ) {
245+ retrying = true ;
246+ periodicRetryDelay .reset ();
247+ currentDelay = periodicRetryDelay ;
248+ if (retryAfter != null && periodicRetryDelay instanceof AcceptsDelaySuggestion ) {
249+ ((AcceptsDelaySuggestion ) periodicRetryDelay ).suggestDelay (retryAfter );
250+ }
251+ }
252+ }
253+
254+ Duration getNextDelay () {
255+ return currentDelay .getNextDelay ();
256+ }
254257 }
255258
256259 private static class DaemonThreadFactory implements ThreadFactory {
0 commit comments