Skip to content

Commit 3bb8d7b

Browse files
committed
Ensuring only one task is executed at a time and it always schedules the next one
1 parent 73cb995 commit 3bb8d7b

File tree

2 files changed

+62
-33
lines changed

2 files changed

+62
-33
lines changed

opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import java.util.concurrent.TimeoutException;
2828
import java.util.concurrent.atomic.AtomicBoolean;
2929
import java.util.concurrent.atomic.AtomicReference;
30+
import java.util.concurrent.locks.Lock;
31+
import java.util.concurrent.locks.ReentrantLock;
3032
import java.util.function.Supplier;
3133
import javax.annotation.Nonnull;
3234
import javax.annotation.Nullable;
@@ -43,7 +45,8 @@ public final class HttpRequestService implements RequestService {
4345
private final AtomicBoolean isRunning = new AtomicBoolean(false);
4446
private final AtomicBoolean hasStopped = new AtomicBoolean(false);
4547
private final AtomicReference<PeriodicDelay> currentDelay;
46-
private final AtomicReference<ScheduledFuture<?>> currentTask = new AtomicReference<>();
48+
private final AtomicReference<ScheduledFuture<?>> scheduledTask = new AtomicReference<>();
49+
private final Lock sendLock = new ReentrantLock();
4750
private final RetryAfterParser retryAfterParser;
4851
@Nullable private Callback callback;
4952
@Nullable private Supplier<Request> requestSupplier;
@@ -100,26 +103,12 @@ public void start(Callback callback, Supplier<Request> requestSupplier) {
100103
if (isRunning.compareAndSet(false, true)) {
101104
this.callback = callback;
102105
this.requestSupplier = requestSupplier;
103-
currentTask.set(
104-
executorService.schedule(
105-
this::periodicSend, getNextDelay().toNanos(), TimeUnit.NANOSECONDS));
106+
scheduleNextExecution();
106107
} else {
107108
throw new IllegalStateException("HttpRequestService is already running");
108109
}
109110
}
110111

111-
private void periodicSend() {
112-
doSendRequest();
113-
// schedule the next execution
114-
currentTask.set(
115-
executorService.schedule(
116-
this::periodicSend, getNextDelay().toNanos(), TimeUnit.NANOSECONDS));
117-
}
118-
119-
private void sendOnce() {
120-
executorService.execute(this::doSendRequest);
121-
}
122-
123112
private Duration getNextDelay() {
124113
return Objects.requireNonNull(currentDelay.get()).getNextDelay();
125114
}
@@ -138,7 +127,43 @@ public void sendRequest() {
138127
throw new IllegalStateException("HttpRequestService is not running");
139128
}
140129

141-
sendOnce();
130+
executorService.execute(this::requestOnDemand);
131+
}
132+
133+
private void requestOnDemand() {
134+
if (sendLock.tryLock()) {
135+
try {
136+
ScheduledFuture<?> scheduledFuture = scheduledTask.get();
137+
if (scheduledFuture != null) {
138+
// Cancel future task
139+
scheduledFuture.cancel(false);
140+
}
141+
sendAndScheduleNext();
142+
} finally {
143+
sendLock.unlock();
144+
}
145+
}
146+
}
147+
148+
private void periodicRequest() {
149+
if (sendLock.tryLock()) {
150+
try {
151+
sendAndScheduleNext();
152+
} finally {
153+
sendLock.unlock();
154+
}
155+
}
156+
}
157+
158+
private void sendAndScheduleNext() {
159+
doSendRequest();
160+
scheduleNextExecution();
161+
}
162+
163+
private void scheduleNextExecution() {
164+
scheduledTask.set(
165+
executorService.schedule(
166+
this::periodicRequest, getNextDelay().toNanos(), TimeUnit.NANOSECONDS));
142167
}
143168

144169
private void doSendRequest() {
@@ -150,7 +175,7 @@ private void doSendRequest() {
150175
try (HttpSender.Response response = future.get(30, TimeUnit.SECONDS)) {
151176
getCallback().onConnectionSuccess();
152177
if (isSuccessful(response)) {
153-
handleSuccessResponse(
178+
handleHttpSuccess(
154179
Response.create(ServerToAgent.ADAPTER.decode(response.bodyInputStream())));
155180
} else {
156181
handleHttpError(response);
@@ -187,7 +212,7 @@ private static boolean isSuccessful(HttpSender.Response response) {
187212
return response.statusCode() >= 200 && response.statusCode() < 300;
188213
}
189214

190-
private void handleSuccessResponse(Response response) {
215+
private void handleHttpSuccess(Response response) {
191216
useRegularDelay();
192217
ServerToAgent serverToAgent = response.getServerToAgent();
193218

@@ -211,28 +236,19 @@ private void handleErrorResponse(ServerErrorResponse errorResponse) {
211236

212237
private void useRegularDelay() {
213238
if (currentDelay.compareAndSet(periodicRetryDelay, periodicRequestDelay)) {
214-
cancelCurrentTask();
215239
periodicRequestDelay.reset();
216240
}
217241
}
218242

219243
private void useRetryDelay(@Nullable Duration retryAfter) {
220244
if (currentDelay.compareAndSet(periodicRequestDelay, periodicRetryDelay)) {
221-
cancelCurrentTask();
222245
periodicRetryDelay.reset();
223246
if (retryAfter != null && periodicRetryDelay instanceof AcceptsDelaySuggestion) {
224247
((AcceptsDelaySuggestion) periodicRetryDelay).suggestDelay(retryAfter);
225248
}
226249
}
227250
}
228251

229-
private void cancelCurrentTask() {
230-
ScheduledFuture<?> future = currentTask.get();
231-
if (future != null) {
232-
future.cancel(false);
233-
}
234-
}
235-
236252
private Callback getCallback() {
237253
return Objects.requireNonNull(callback);
238254
}

opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,19 @@ void verifySendingRequest_happyPath() {
126126
verify(callback).onConnectionSuccess();
127127
}
128128

129+
@Test
130+
void verifyWhenSendingOnDemandRequest_andDelayChanges() {
131+
// Initial state
132+
assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(REGULAR_DELAY);
133+
134+
// Trigger delay strategy change
135+
requestSender.enqueueResponse(createFailedResponse(503));
136+
httpRequestService.sendRequest();
137+
138+
// Expected state
139+
assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(RETRY_DELAY);
140+
}
141+
129142
@Test
130143
void verifySendingRequest_whenTheresAParsingError() {
131144
HttpSender.Response httpResponse = createSuccessfulResponse(new byte[] {1, 2, 3});
@@ -240,14 +253,14 @@ void verifySendingRequest_duringRegularMode() {
240253
private void verifyRetryDelayOnError(
241254
HttpSender.Response errorResponse, Duration expectedRetryDelay) {
242255
requestSender.enqueueResponse(errorResponse);
243-
ScheduledTask previousTask = getCurrentScheduledTask();
256+
ScheduledTask previousTask = assertAndGetSingleCurrentTask();
244257

245258
previousTask.run();
246259

247260
verifySingleRequestSent();
248261
verify(periodicRetryDelay).reset();
249262
verify(callback).onRequestFailed(any());
250-
ScheduledTask retryTask = getCurrentScheduledTask();
263+
ScheduledTask retryTask = assertAndGetSingleCurrentTask();
251264
assertThat(retryTask.delay).isEqualTo(expectedRetryDelay);
252265

253266
// Retry with another error
@@ -258,7 +271,7 @@ private void verifyRetryDelayOnError(
258271

259272
verifySingleRequestSent();
260273
verify(callback).onRequestFailed(any());
261-
ScheduledTask retryTask2 = getCurrentScheduledTask();
274+
ScheduledTask retryTask2 = assertAndGetSingleCurrentTask();
262275
assertThat(retryTask2.delay).isEqualTo(expectedRetryDelay);
263276

264277
// Retry with a success
@@ -271,7 +284,7 @@ private void verifyRetryDelayOnError(
271284
verify(periodicRequestDelay).reset();
272285
verifySingleRequestSent();
273286
verifyRequestSuccessCallback(serverToAgent);
274-
assertThat(getCurrentScheduledTask().delay).isEqualTo(REGULAR_DELAY);
287+
assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(REGULAR_DELAY);
275288
}
276289

277290
private Request createRequestSupplier() {
@@ -280,7 +293,7 @@ private Request createRequestSupplier() {
280293
return Request.create(agentToServer);
281294
}
282295

283-
private ScheduledTask getCurrentScheduledTask() {
296+
private ScheduledTask assertAndGetSingleCurrentTask() {
284297
assertThat(scheduledTasks).hasSize(1);
285298
return scheduledTasks.get(0);
286299
}

0 commit comments

Comments
 (0)