From 1fb5d4f96e884c650fa7a0a0b1fe565dd1b6811a Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Wed, 18 Jun 2025 09:34:26 +0200 Subject: [PATCH 01/24] Creating WebSocket contract --- .../websocket/OkHttpWebSocket.java | 67 +++++++++++++++++++ .../connectivity/websocket/WebSocket.java | 14 ++++ .../websocket/WebSocketListener.java | 16 +++++ 3 files changed, 97 insertions(+) create mode 100644 opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java create mode 100644 opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java create mode 100644 opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocketListener.java diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java new file mode 100644 index 000000000..e156389ab --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java @@ -0,0 +1,67 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.connectivity.websocket; + +import okhttp3.OkHttpClient; +import okhttp3.Response; +import okio.ByteString; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class OkHttpWebSocket extends okhttp3.WebSocketListener implements WebSocket { + private final OkHttpClient client; + private final String url; + private WebSocketListener listener; + private okhttp3.WebSocket webSocket; + + public static OkHttpWebSocket create(String url) { + OkHttpClient client = new OkHttpClient(); + return new OkHttpWebSocket(client, url); + } + + public OkHttpWebSocket(OkHttpClient client, String url) { + this.client = client; + this.url = url; + } + + @Override + public void start(WebSocketListener listener) { + this.listener = listener; + okhttp3.Request request = new okhttp3.Request.Builder().url(url).build(); + webSocket = client.newWebSocket(request, this); + } + + @Override + public void send(byte[] request) { + webSocket.send(ByteString.of(request)); + } + + @Override + public void stop() { + webSocket.cancel(); + } + + @Override + public void onOpen(@NotNull okhttp3.WebSocket webSocket, @NotNull Response response) { + listener.onOpened(this); + } + + @Override + public void onClosed(@NotNull okhttp3.WebSocket webSocket, int code, @NotNull String reason) { + listener.onClosed(this); + } + + @Override + public void onFailure( + @NotNull okhttp3.WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) { + listener.onFailure(this, t); + } + + @Override + public void onMessage(@NotNull okhttp3.WebSocket webSocket, @NotNull ByteString bytes) { + listener.onMessage(this, bytes.toByteArray()); + } +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java new file mode 100644 index 000000000..00bc3878e --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java @@ -0,0 +1,14 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.connectivity.websocket; + +public interface WebSocket { + void start(WebSocketListener listener); + + void send(byte[] request); + + void stop(); +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocketListener.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocketListener.java new file mode 100644 index 000000000..ce81d3dd4 --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocketListener.java @@ -0,0 +1,16 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.connectivity.websocket; + +public interface WebSocketListener { + void onOpened(WebSocket webSocket); + + void onClosed(WebSocket webSocket); + + void onMessage(WebSocket webSocket, byte[] data); + + void onFailure(WebSocket webSocket, Throwable t); +} From 52476f75d0690bbaf4e742189225a80c73bb3802 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Thu, 19 Jun 2025 17:03:38 +0200 Subject: [PATCH 02/24] Updating tests to reuse tools and adding javadocs --- .../connectivity/websocket/WebSocket.java | 58 ++++- .../websocket/WebSocketListener.java | 16 -- .../service/WebSocketRequestService.java | 210 ++++++++++++++++++ .../service/HttpRequestServiceTest.java | 128 ++--------- .../request/service/TestScheduler.java | 115 ++++++++++ 5 files changed, 401 insertions(+), 126 deletions(-) delete mode 100644 opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocketListener.java create mode 100644 opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java create mode 100644 opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/TestScheduler.java diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java index 00bc3878e..b40409f98 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java @@ -5,10 +5,62 @@ package io.opentelemetry.opamp.client.internal.connectivity.websocket; +import javax.annotation.Nullable; + public interface WebSocket { - void start(WebSocketListener listener); + /** + * Starts the websocket connection if it's not yet started or if it has been closed. + * + * @param listener Will receive events from the websocket connection. + */ + void open(Listener listener); + + /** + * Stops the websocket connection if running. Nothing will happen if it's already stopped. + * + * @param code Status code as defined by Section 7.4 of RFC 6455 + * @param reason Reason for shutting down, as explained in Section 5.5.1 of RFC + * 6455 + */ + void close(int code, @Nullable String reason); + + /** + * Sends a message via the websocket connection. + * + * @param request The message payload. + * @return {@link Boolean#FALSE} If the message can't get dispatched for any reason, whether the + * websocket isn't running, or the connection isn't established, or it's terminated. {@link + * Boolean#TRUE} if the message can get sent. Returning {@link Boolean#TRUE} doesn't guarantee + * that the message will arrive at the remote peer. + */ + boolean send(byte[] request); + + interface Listener { + + /** + * Called when the websocket connection is successfully established with the remote peer and may + * start sending messages. + */ + void onOpen(); + + /** Called when the connection is terminated and no further messages can get transmitted. */ + void onClosed(); - void send(byte[] request); + /** + * Called when receiving a message from the remote peer. + * + * @param data The payload sent by the remote peer. + */ + void onMessage(byte[] data); - void stop(); + /** + * Called when the connection is closed or cannot be established due to an error. No messages + * can get transmitted after this method is called. + * + * @param t The error. + */ + void onFailure(Throwable t); + } } diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocketListener.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocketListener.java deleted file mode 100644 index ce81d3dd4..000000000 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocketListener.java +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.opamp.client.internal.connectivity.websocket; - -public interface WebSocketListener { - void onOpened(WebSocket webSocket); - - void onClosed(WebSocket webSocket); - - void onMessage(WebSocket webSocket, byte[] data); - - void onFailure(WebSocket webSocket, Throwable t); -} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java new file mode 100644 index 000000000..18fc9ba13 --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java @@ -0,0 +1,210 @@ +package io.opentelemetry.opamp.client.internal.request.service; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; +import io.opentelemetry.opamp.client.internal.connectivity.websocket.WebSocket; +import io.opentelemetry.opamp.client.internal.request.Request; +import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion; +import io.opentelemetry.opamp.client.internal.request.delay.PeriodicDelay; +import io.opentelemetry.opamp.client.internal.response.OpampServerResponseError; +import io.opentelemetry.opamp.client.internal.response.Response; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import opamp.proto.ServerErrorResponse; +import opamp.proto.ServerErrorResponseType; +import opamp.proto.ServerToAgent; + +public final class WebSocketRequestService implements RequestService, WebSocket.Listener { + private final WebSocket webSocket; + private final PeriodicDelay periodicRetryDelay; + private final AtomicBoolean retryingConnection = new AtomicBoolean(false); + private final AtomicBoolean nextRetryScheduled = new AtomicBoolean(false); + private final AtomicBoolean hasPendingRequest = new AtomicBoolean(false); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final ScheduledExecutorService executorService; + public static final PeriodicDelay DEFAULT_DELAY_BETWEEN_RETRIES = + PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30)); + @Nullable private Callback callback; + @Nullable private Supplier requestSupplier; + + /** + * Creates an {@link WebSocketRequestService}. + * + * @param webSocket The WebSocket implementation. + */ + public static WebSocketRequestService create(WebSocket webSocket) { + return create(webSocket, DEFAULT_DELAY_BETWEEN_RETRIES); + } + + /** + * Creates an {@link WebSocketRequestService}. + * + * @param webSocket The WebSocket implementation. + * @param periodicRetryDelay The time to wait between retries. + */ + public static WebSocketRequestService create( + WebSocket webSocket, PeriodicDelay periodicRetryDelay) { + return new WebSocketRequestService( + webSocket, periodicRetryDelay, Executors.newSingleThreadScheduledExecutor()); + } + + WebSocketRequestService( + WebSocket webSocket, + PeriodicDelay periodicRetryDelay, + ScheduledExecutorService executorService) { + this.webSocket = webSocket; + this.periodicRetryDelay = periodicRetryDelay; + this.executorService = executorService; + } + + @Override + public void start(Callback callback, Supplier requestSupplier) { + if (closed.get()) { + throw new IllegalStateException("This service is already closed"); + } + this.callback = callback; + this.requestSupplier = requestSupplier; + startConnection(); + } + + private void startConnection() { + webSocket.open(this); + } + + @Override + public void sendRequest() { + try { + if (!trySendRequest()) { + hasPendingRequest.set(true); + } + } catch (IOException e) { + getCallback().onRequestFailed(e); + } + } + + private boolean trySendRequest() throws IOException { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + CodedOutputStream codedOutput = CodedOutputStream.newInstance(outputStream); + codedOutput.writeUInt64NoTag(0); + byte[] payload = getRequest().getAgentToServer().encode(); + codedOutput.write(payload, 0, payload.length); + codedOutput.flush(); + return webSocket.send(outputStream.toByteArray()); + } + } + + @Nonnull + private Request getRequest() { + return Objects.requireNonNull(requestSupplier).get(); + } + + @Override + public void stop() { + if (closed.compareAndSet(false, true)) { + sendRequest(); + webSocket.close(1000, null); + } + } + + @Override + public void onOpen() { + retryingConnection.set(false); + getCallback().onConnectionSuccess(); + if (hasPendingRequest.compareAndSet(true, false)) { + sendRequest(); + } + } + + @Override + public void onMessage(byte[] data) { + try { + ServerToAgent serverToAgent = readServerToAgent(data); + + if (serverToAgent.error_response != null) { + handleServerError(serverToAgent.error_response); + getCallback() + .onRequestFailed( + new OpampServerResponseError(serverToAgent.error_response.error_message)); + return; + } + + getCallback().onRequestSuccess(Response.create(serverToAgent)); + } catch (IOException e) { + getCallback().onRequestFailed(e); + } + } + + private static ServerToAgent readServerToAgent(byte[] data) throws IOException { + CodedInputStream codedInputStream = CodedInputStream.newInstance(data); + codedInputStream.readRawVarint64(); // It moves the read position to the end of the header. + int totalBytesRead = codedInputStream.getTotalBytesRead(); + int payloadSize = data.length - totalBytesRead; + byte[] payload = new byte[payloadSize]; + System.arraycopy(data, totalBytesRead, payload, 0, payloadSize); + return ServerToAgent.ADAPTER.decode(payload); + } + + private void handleServerError(ServerErrorResponse errorResponse) { + if (serverIsUnavailable(errorResponse)) { + Duration retryAfter = null; + + if (errorResponse.retry_info != null) { + retryAfter = Duration.ofNanos(errorResponse.retry_info.retry_after_nanoseconds); + } + + webSocket.close(1000, null); + scheduleConnectionRetry(retryAfter); + } + } + + private static boolean serverIsUnavailable(ServerErrorResponse errorResponse) { + return errorResponse.type.equals(ServerErrorResponseType.ServerErrorResponseType_Unavailable); + } + + @SuppressWarnings("FutureReturnValueIgnored") + private void scheduleConnectionRetry(@Nullable Duration retryAfter) { + if (retryingConnection.compareAndSet(false, true)) { + periodicRetryDelay.reset(); + if (retryAfter != null && periodicRetryDelay instanceof AcceptsDelaySuggestion) { + ((AcceptsDelaySuggestion) periodicRetryDelay).suggestDelay(retryAfter); + } + } + if (nextRetryScheduled.compareAndSet(false, true)) { + executorService.schedule( + this::retryConnection, periodicRetryDelay.getNextDelay().toNanos(), TimeUnit.NANOSECONDS); + } + } + + private void retryConnection() { + nextRetryScheduled.set(false); + startConnection(); + } + + @Override + public void onClosed() { + if (!closed.get()) { + // The service isn't closed so we should retry connecting. + scheduleConnectionRetry(null); + } + } + + @Override + public void onFailure(Throwable t) { + getCallback().onConnectionFailed(t); + scheduleConnectionRetry(null); + } + + @Nonnull + private Callback getCallback() { + return Objects.requireNonNull(callback); + } +} diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java index 63439e8c6..9e0125943 100644 --- a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java @@ -8,10 +8,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -33,10 +31,7 @@ import java.util.NoSuchElementException; import java.util.Queue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import opamp.proto.AgentToServer; @@ -44,7 +39,6 @@ import opamp.proto.ServerErrorResponse; import opamp.proto.ServerErrorResponseType; import opamp.proto.ServerToAgent; -import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -61,8 +55,7 @@ class HttpRequestServiceTest { private static final Duration RETRY_DELAY = Duration.ofSeconds(5); @Mock private RequestService.Callback callback; - private final List scheduledTasks = new ArrayList<>(); - private ScheduledExecutorService executorService; + private TestScheduler scheduler; private TestHttpSender requestSender; private PeriodicDelay periodicRequestDelay; private PeriodicDelayWithSuggestion periodicRetryDelay; @@ -74,11 +67,11 @@ void setUp() { requestSender = new TestHttpSender(); periodicRequestDelay = createPeriodicDelay(REGULAR_DELAY); periodicRetryDelay = createPeriodicDelayWithSuggestionSupport(RETRY_DELAY); - executorService = createTestScheduleExecutorService(); + scheduler = new TestScheduler(); httpRequestService = new HttpRequestService( requestSender, - executorService, + scheduler.getMockService(), periodicRequestDelay, periodicRetryDelay, RetryAfterParser.getInstance()); @@ -88,28 +81,26 @@ void setUp() { @AfterEach void tearDown() { httpRequestService.stop(); - scheduledTasks.clear(); - verify(executorService).shutdown(); + verify(scheduler.getMockService()).shutdown(); } @Test void verifyStart_scheduledFirstTask() { - assertThat(scheduledTasks).hasSize(1); - ScheduledTask firstTask = scheduledTasks.get(0); - assertThat(firstTask.delay).isEqualTo(REGULAR_DELAY); + TestScheduler.Task firstTask = assertAndGetSingleCurrentTask(); + assertThat(firstTask.getDelay()).isEqualTo(REGULAR_DELAY); // Verify initial task creates next one - scheduledTasks.clear(); + scheduler.clearTasks(); requestSender.enqueueResponse(createSuccessfulResponse(new ServerToAgent.Builder().build())); firstTask.run(); - assertThat(scheduledTasks).hasSize(1); + assertThat(scheduler.getScheduledTasks()).hasSize(1); // Check on-demand requests don't create subsequent tasks requestSender.enqueueResponse(createSuccessfulResponse(new ServerToAgent.Builder().build())); httpRequestService.sendRequest(); - assertThat(scheduledTasks).hasSize(1); + assertThat(scheduler.getScheduledTasks()).hasSize(1); } @Test @@ -128,14 +119,14 @@ void verifySendingRequest_happyPath() { @Test void verifyWhenSendingOnDemandRequest_andDelayChanges() { // Initial state - assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(REGULAR_DELAY); + assertThat(assertAndGetSingleCurrentTask().getDelay()).isEqualTo(REGULAR_DELAY); // Trigger delay strategy change requestSender.enqueueResponse(createFailedResponse(503)); httpRequestService.sendRequest(); // Expected state - assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(RETRY_DELAY); + assertThat(assertAndGetSingleCurrentTask().getDelay()).isEqualTo(RETRY_DELAY); } @Test @@ -252,30 +243,30 @@ void verifySendingRequest_duringRegularMode() { private void verifyRetryDelayOnError( HttpSender.Response errorResponse, Duration expectedRetryDelay) { requestSender.enqueueResponse(errorResponse); - ScheduledTask previousTask = assertAndGetSingleCurrentTask(); + TestScheduler.Task previousTask = assertAndGetSingleCurrentTask(); previousTask.run(); verifySingleRequestSent(); verify(periodicRetryDelay).reset(); verify(callback).onRequestFailed(any()); - ScheduledTask retryTask = assertAndGetSingleCurrentTask(); - assertThat(retryTask.delay).isEqualTo(expectedRetryDelay); + TestScheduler.Task retryTask = assertAndGetSingleCurrentTask(); + assertThat(retryTask.getDelay()).isEqualTo(expectedRetryDelay); // Retry with another error clearInvocations(callback); - scheduledTasks.clear(); + scheduler.clearTasks(); requestSender.enqueueResponse(createFailedResponse(500)); retryTask.run(); verifySingleRequestSent(); verify(callback).onRequestFailed(any()); - ScheduledTask retryTask2 = assertAndGetSingleCurrentTask(); - assertThat(retryTask2.delay).isEqualTo(expectedRetryDelay); + TestScheduler.Task retryTask2 = assertAndGetSingleCurrentTask(); + assertThat(retryTask2.getDelay()).isEqualTo(expectedRetryDelay); // Retry with a success clearInvocations(callback); - scheduledTasks.clear(); + scheduler.clearTasks(); ServerToAgent serverToAgent = new ServerToAgent.Builder().build(); requestSender.enqueueResponse(createSuccessfulResponse(serverToAgent)); retryTask2.run(); @@ -283,7 +274,7 @@ private void verifyRetryDelayOnError( verify(periodicRequestDelay).reset(); verifySingleRequestSent(); verifyRequestSuccessCallback(serverToAgent); - assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(REGULAR_DELAY); + assertThat(assertAndGetSingleCurrentTask().getDelay()).isEqualTo(REGULAR_DELAY); } private Request createRequestSupplier() { @@ -292,7 +283,8 @@ private Request createRequestSupplier() { return Request.create(agentToServer); } - private ScheduledTask assertAndGetSingleCurrentTask() { + private TestScheduler.Task assertAndGetSingleCurrentTask() { + List scheduledTasks = scheduler.getScheduledTasks(); assertThat(scheduledTasks).hasSize(1); return scheduledTasks.get(0); } @@ -313,33 +305,6 @@ private void verifyRequestFailedCallback(int errorCode) { assertThat(captor.getValue().getMessage()).isEqualTo("Error message"); } - private ScheduledExecutorService createTestScheduleExecutorService() { - ScheduledExecutorService service = mock(); - - lenient() - .doAnswer( - invocation -> { - Runnable runnable = invocation.getArgument(0); - runnable.run(); - return null; - }) - .when(service) - .execute(any()); - - when(service.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) - .thenAnswer( - invocation -> { - ScheduledTask task = - new ScheduledTask(invocation.getArgument(0), invocation.getArgument(1)); - - scheduledTasks.add(task); - - return task; - }); - - return service; - } - private static HttpSender.Response createSuccessfulResponse(ServerToAgent serverToAgent) { return createSuccessfulResponse(serverToAgent.encodeByteString().toByteArray()); } @@ -438,55 +403,4 @@ private RequestParams(int contentLength) { } } } - - private class ScheduledTask implements ScheduledFuture { - private final Runnable runnable; - private final Duration delay; - - public void run() { - get(); - } - - private ScheduledTask(Runnable runnable, long timeNanos) { - this.runnable = runnable; - this.delay = Duration.ofNanos(timeNanos); - } - - @Override - public long getDelay(@NotNull TimeUnit unit) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return scheduledTasks.remove(this); - } - - @Override - public boolean isCancelled() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isDone() { - throw new UnsupportedOperationException(); - } - - @Override - public Object get() { - scheduledTasks.remove(this); - runnable.run(); - return null; - } - - @Override - public Object get(long timeout, @NotNull TimeUnit unit) { - throw new UnsupportedOperationException(); - } - - @Override - public int compareTo(@NotNull Delayed o) { - throw new UnsupportedOperationException(); - } - } } diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/TestScheduler.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/TestScheduler.java new file mode 100644 index 000000000..d3ef55b3e --- /dev/null +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/TestScheduler.java @@ -0,0 +1,115 @@ +package io.opentelemetry.opamp.client.internal.request.service; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Delayed; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.jetbrains.annotations.NotNull; + +public final class TestScheduler { + private final List tasks = new ArrayList<>(); + private final ScheduledExecutorService service = createTestScheduleExecutorService(); + + public ScheduledExecutorService getMockService() { + return service; + } + + public List getScheduledTasks() { + return Collections.unmodifiableList(tasks); + } + + public void clearTasks() { + tasks.clear(); + } + + private ScheduledExecutorService createTestScheduleExecutorService() { + ScheduledExecutorService service = mock(); + + lenient() + .doAnswer( + invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }) + .when(service) + .execute(any()); + + when(service.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .thenAnswer( + invocation -> { + Task task = new Task(invocation.getArgument(0), invocation.getArgument(1)); + + tasks.add(task); + + return task; + }); + + return service; + } + + public class Task implements ScheduledFuture { + private final Runnable runnable; + private final Duration delay; + + public void run() { + get(); + } + + private Task(Runnable runnable, long timeNanos) { + this.runnable = runnable; + this.delay = Duration.ofNanos(timeNanos); + } + + public Duration getDelay() { + return delay; + } + + @Override + public long getDelay(@NotNull TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return tasks.remove(this); + } + + @Override + public boolean isCancelled() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDone() { + throw new UnsupportedOperationException(); + } + + @Override + public Object get() { + tasks.remove(this); + runnable.run(); + return null; + } + + @Override + public Object get(long timeout, @NotNull TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public int compareTo(@NotNull Delayed o) { + throw new UnsupportedOperationException(); + } + } +} From 2a0064a092d20e34d2638b40dcfb4e607d36bd15 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Thu, 19 Jun 2025 19:56:30 +0200 Subject: [PATCH 03/24] Adding tests for OkHttpWebSocket --- .../websocket/OkHttpWebSocket.java | 115 ++++++++++---- .../websocket/OkHttpWebSocketTest.java | 145 ++++++++++++++++++ 2 files changed, 227 insertions(+), 33 deletions(-) create mode 100644 opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java index e156389ab..48f0e1a78 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java @@ -5,63 +5,112 @@ package io.opentelemetry.opamp.client.internal.connectivity.websocket; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import okhttp3.OkHttpClient; import okhttp3.Response; +import okhttp3.WebSocketListener; import okio.ByteString; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -public class OkHttpWebSocket extends okhttp3.WebSocketListener implements WebSocket { - private final OkHttpClient client; +public class OkHttpWebSocket implements WebSocket { private final String url; - private WebSocketListener listener; - private okhttp3.WebSocket webSocket; + private final OkHttpClient client; + private final AtomicBoolean starting = new AtomicBoolean(false); + private final AtomicBoolean closing = new AtomicBoolean(false); + private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicReference webSocket = new AtomicReference<>(); public static OkHttpWebSocket create(String url) { - OkHttpClient client = new OkHttpClient(); - return new OkHttpWebSocket(client, url); + return create(url, new OkHttpClient()); } - public OkHttpWebSocket(OkHttpClient client, String url) { - this.client = client; - this.url = url; + public static OkHttpWebSocket create(String url, OkHttpClient client) { + return new OkHttpWebSocket(url, client); } - @Override - public void start(WebSocketListener listener) { - this.listener = listener; - okhttp3.Request request = new okhttp3.Request.Builder().url(url).build(); - webSocket = client.newWebSocket(request, this); + private OkHttpWebSocket(String url, OkHttpClient client) { + this.url = url; + this.client = client; } @Override - public void send(byte[] request) { - webSocket.send(ByteString.of(request)); + public void open(Listener listener) { + if (running.get()) { + return; + } + if (starting.compareAndSet(false, true)) { + okhttp3.Request request = new okhttp3.Request.Builder().url(url).build(); + webSocket.set(client.newWebSocket(request, new ListenerAdapter(listener))); + } } @Override - public void stop() { - webSocket.cancel(); + public boolean send(byte[] request) { + if (!running.get()) { + return false; + } + return getWebSocket().send(ByteString.of(request)); } @Override - public void onOpen(@NotNull okhttp3.WebSocket webSocket, @NotNull Response response) { - listener.onOpened(this); + public void close(int code, @Nullable String reason) { + if (!running.get()) { + return; + } + if (closing.compareAndSet(false, true)) { + if (!getWebSocket().close(code, reason)) { + closing.set(false); + running.set(false); + } + } } - @Override - public void onClosed(@NotNull okhttp3.WebSocket webSocket, int code, @NotNull String reason) { - listener.onClosed(this); + private okhttp3.WebSocket getWebSocket() { + return Objects.requireNonNull(webSocket.get()); } - @Override - public void onFailure( - @NotNull okhttp3.WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) { - listener.onFailure(this, t); - } + private class ListenerAdapter extends WebSocketListener { + private final Listener delegate; - @Override - public void onMessage(@NotNull okhttp3.WebSocket webSocket, @NotNull ByteString bytes) { - listener.onMessage(this, bytes.toByteArray()); + private ListenerAdapter(Listener delegate) { + this.delegate = delegate; + } + + @Override + public void onOpen(@Nonnull okhttp3.WebSocket webSocket, @Nonnull Response response) { + running.set(true); + starting.set(false); + delegate.onOpen(); + } + + @Override + public void onClosing(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) { + running.set(false); + closing.set(true); + } + + @Override + public void onClosed(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) { + running.set(false); + closing.set(false); + delegate.onClosed(); + } + + @Override + public void onFailure( + @Nonnull okhttp3.WebSocket webSocket, @Nonnull Throwable t, @Nullable Response response) { + running.set(false); + starting.set(false); + closing.set(false); + delegate.onFailure(t); + } + + @Override + public void onMessage(@Nonnull okhttp3.WebSocket webSocket, @Nonnull ByteString bytes) { + delegate.onMessage(bytes.toByteArray()); + } } } diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java new file mode 100644 index 000000000..aaa0257db --- /dev/null +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java @@ -0,0 +1,145 @@ +package io.opentelemetry.opamp.client.internal.connectivity.websocket; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.WebSocketListener; +import okio.ByteString; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class OkHttpWebSocketTest { + @Mock private OkHttpClient client; + @Mock private okhttp3.WebSocket okHttpWebSocket; + @Mock private WebSocket.Listener listener; + @Captor private ArgumentCaptor requestCaptor; + @Captor private ArgumentCaptor listenerCaptor; + private static final String URL = "ws://some.server"; + private OkHttpWebSocket webSocket; + + @BeforeEach + void setUp() { + webSocket = OkHttpWebSocket.create(URL, client); + when(client.newWebSocket(any(), any())).thenReturn(okHttpWebSocket); + } + + @Test + void validateOpen() { + // Assert websocket created + openAndCaptureArguments(); + assertThat(requestCaptor.getValue().url().host()).isEqualTo("some.server"); + + // Assert further calls to open won't do anything + webSocket.open(listener); + verifyNoMoreInteractions(client); + + // When connectivity succeeds, open calls won't do anything. + callOnOpen(); + webSocket.open(listener); + verifyNoMoreInteractions(client); + + // When connectivity fails, allow future open calls + clearInvocations(client); + callOnFailure(); + openAndCaptureArguments(); + assertThat(requestCaptor.getValue().url().host()).isEqualTo("some.server"); + } + + @Test + void validateSend() { + byte[] payload = new byte[1]; + + // Before opening + assertThat(webSocket.send(payload)).isFalse(); + + // After opening successfully + when(okHttpWebSocket.send(any(ByteString.class))).thenReturn(true); + openAndCaptureArguments(); + callOnOpen(); + assertThat(webSocket.send(payload)).isTrue(); + verify(okHttpWebSocket).send(ByteString.of(payload)); + + // After failing + callOnFailure(); + assertThat(webSocket.send(payload)).isFalse(); + verifyNoMoreInteractions(okHttpWebSocket); + } + + @Test + void validateClose() { + openAndCaptureArguments(); + + callOnOpen(); + webSocket.close(123, "something"); + verify(okHttpWebSocket).close(123, "something"); + + // Validate calling it again + webSocket.close(1, null); + verifyNoMoreInteractions(okHttpWebSocket); + + // Once closed, it should be possible to reopen it. + clearInvocations(client); + callOnClosed(); + openAndCaptureArguments(); + } + + @Test + void validateOnClosing() { + openAndCaptureArguments(); + + callOnOpen(); + callOnClosing(); + + // Validate calling after onClosing + webSocket.close(1, null); + verifyNoInteractions(okHttpWebSocket); + } + + @Test + void validateOnMessage() { + byte[] payload = new byte[1]; + openAndCaptureArguments(); + + listenerCaptor.getValue().onMessage(mock(), ByteString.of(payload)); + verify(listener).onMessage(payload); + } + + private void callOnOpen() { + listenerCaptor.getValue().onOpen(mock(), mock()); + verify(listener).onOpen(); + } + + private void callOnClosed() { + listenerCaptor.getValue().onClosed(mock(), 0, ""); + verify(listener).onClosed(); + } + + private void callOnClosing() { + listenerCaptor.getValue().onClosing(mock(), 0, ""); + } + + private void callOnFailure() { + Throwable t = mock(); + listenerCaptor.getValue().onFailure(mock(), t, mock()); + verify(listener).onFailure(t); + } + + private void openAndCaptureArguments() { + webSocket.open(listener); + verify(client).newWebSocket(requestCaptor.capture(), listenerCaptor.capture()); + } +} From a8a0cfd50a1517837ecce447b3eb6ebc5892585f Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Thu, 19 Jun 2025 19:57:46 +0200 Subject: [PATCH 04/24] Updating tests --- .../internal/connectivity/websocket/OkHttpWebSocketTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java index aaa0257db..4579ba573 100644 --- a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java @@ -95,6 +95,7 @@ void validateClose() { clearInvocations(client); callOnClosed(); openAndCaptureArguments(); + assertThat(requestCaptor.getValue().url().host()).isEqualTo("some.server"); } @Test From 4a9332ac5beae150df539fb2485b273bda3dd169 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Fri, 20 Jun 2025 15:53:40 +0200 Subject: [PATCH 05/24] Adding tests to WebSocketRequestService --- .../service/WebSocketRequestService.java | 60 ++-- .../service/HttpRequestServiceTest.java | 31 +- .../service/PeriodicDelayWithSuggestion.java | 30 ++ .../request/service/TestScheduler.java | 3 +- .../service/WebSocketRequestServiceTest.java | 331 ++++++++++++++++++ 5 files changed, 401 insertions(+), 54 deletions(-) create mode 100644 opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/PeriodicDelayWithSuggestion.java create mode 100644 opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestServiceTest.java diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java index 18fc9ba13..5ac5eeb27 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java @@ -1,7 +1,6 @@ package io.opentelemetry.opamp.client.internal.request.service; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; +import com.squareup.wire.ProtoAdapter; import io.opentelemetry.opamp.client.internal.connectivity.websocket.WebSocket; import io.opentelemetry.opamp.client.internal.request.Request; import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion; @@ -29,7 +28,8 @@ public final class WebSocketRequestService implements RequestService, WebSocket. private final AtomicBoolean retryingConnection = new AtomicBoolean(false); private final AtomicBoolean nextRetryScheduled = new AtomicBoolean(false); private final AtomicBoolean hasPendingRequest = new AtomicBoolean(false); - private final AtomicBoolean closed = new AtomicBoolean(false); + private final AtomicBoolean isRunning = new AtomicBoolean(false); + private final AtomicBoolean hasStopped = new AtomicBoolean(false); private final ScheduledExecutorService executorService; public static final PeriodicDelay DEFAULT_DELAY_BETWEEN_RETRIES = PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30)); @@ -68,12 +68,16 @@ public static WebSocketRequestService create( @Override public void start(Callback callback, Supplier requestSupplier) { - if (closed.get()) { - throw new IllegalStateException("This service is already closed"); + if (hasStopped.get()) { + throw new IllegalStateException("This service is already stopped"); + } + if (isRunning.compareAndSet(false, true)) { + this.callback = callback; + this.requestSupplier = requestSupplier; + startConnection(); + } else { + throw new IllegalStateException("The service has already started"); } - this.callback = callback; - this.requestSupplier = requestSupplier; - startConnection(); } private void startConnection() { @@ -82,6 +86,17 @@ private void startConnection() { @Override public void sendRequest() { + if (!isRunning.get()) { + throw new IllegalStateException("The service is not running"); + } + if (hasStopped.get()) { + throw new IllegalStateException("This service is already stopped"); + } + + doSendRequest(); + } + + private void doSendRequest() { try { if (!trySendRequest()) { hasPendingRequest.set(true); @@ -93,11 +108,9 @@ public void sendRequest() { private boolean trySendRequest() throws IOException { try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { - CodedOutputStream codedOutput = CodedOutputStream.newInstance(outputStream); - codedOutput.writeUInt64NoTag(0); + ProtoAdapter.UINT64.encode(outputStream, 0L); byte[] payload = getRequest().getAgentToServer().encode(); - codedOutput.write(payload, 0, payload.length); - codedOutput.flush(); + outputStream.write(payload); return webSocket.send(outputStream.toByteArray()); } } @@ -109,9 +122,10 @@ private Request getRequest() { @Override public void stop() { - if (closed.compareAndSet(false, true)) { - sendRequest(); + if (hasStopped.compareAndSet(false, true)) { + doSendRequest(); webSocket.close(1000, null); + executorService.shutdown(); } } @@ -144,12 +158,11 @@ public void onMessage(byte[] data) { } private static ServerToAgent readServerToAgent(byte[] data) throws IOException { - CodedInputStream codedInputStream = CodedInputStream.newInstance(data); - codedInputStream.readRawVarint64(); // It moves the read position to the end of the header. - int totalBytesRead = codedInputStream.getTotalBytesRead(); - int payloadSize = data.length - totalBytesRead; + Long header = ProtoAdapter.UINT64.decode(data); + int headerSize = ProtoAdapter.UINT64.encodedSize(header); + int payloadSize = data.length - headerSize; byte[] payload = new byte[payloadSize]; - System.arraycopy(data, totalBytesRead, payload, 0, payloadSize); + System.arraycopy(data, headerSize, payload, 0, payloadSize); return ServerToAgent.ADAPTER.decode(payload); } @@ -172,6 +185,9 @@ private static boolean serverIsUnavailable(ServerErrorResponse errorResponse) { @SuppressWarnings("FutureReturnValueIgnored") private void scheduleConnectionRetry(@Nullable Duration retryAfter) { + if (hasStopped.get()) { + return; + } if (retryingConnection.compareAndSet(false, true)) { periodicRetryDelay.reset(); if (retryAfter != null && periodicRetryDelay instanceof AcceptsDelaySuggestion) { @@ -191,10 +207,8 @@ private void retryConnection() { @Override public void onClosed() { - if (!closed.get()) { - // The service isn't closed so we should retry connecting. - scheduleConnectionRetry(null); - } + // If this service isn't stopped, we should retry connecting. + scheduleConnectionRetry(null); } @Override diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java index 9e0125943..9996c4783 100644 --- a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java @@ -19,7 +19,6 @@ import io.opentelemetry.opamp.client.internal.connectivity.http.HttpSender; import io.opentelemetry.opamp.client.internal.connectivity.http.RetryAfterParser; import io.opentelemetry.opamp.client.internal.request.Request; -import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion; import io.opentelemetry.opamp.client.internal.request.delay.PeriodicDelay; import io.opentelemetry.opamp.client.internal.response.Response; import java.io.ByteArrayInputStream; @@ -75,7 +74,7 @@ void setUp() { periodicRequestDelay, periodicRetryDelay, RetryAfterParser.getInstance()); - httpRequestService.start(callback, this::createRequestSupplier); + httpRequestService.start(callback, this::createRequest); } @AfterEach @@ -277,7 +276,7 @@ private void verifyRetryDelayOnError( assertThat(assertAndGetSingleCurrentTask().getDelay()).isEqualTo(REGULAR_DELAY); } - private Request createRequestSupplier() { + private Request createRequest() { AgentToServer agentToServer = new AgentToServer.Builder().sequence_num(10).build(); requestSize = agentToServer.encodeByteString().size(); return Request.create(agentToServer); @@ -335,32 +334,6 @@ private static PeriodicDelayWithSuggestion createPeriodicDelayWithSuggestionSupp return spy(new PeriodicDelayWithSuggestion(delay)); } - private static class PeriodicDelayWithSuggestion - implements PeriodicDelay, AcceptsDelaySuggestion { - private final Duration initialDelay; - private Duration currentDelay; - - private PeriodicDelayWithSuggestion(Duration initialDelay) { - this.initialDelay = initialDelay; - currentDelay = initialDelay; - } - - @Override - public void suggestDelay(Duration delay) { - currentDelay = delay; - } - - @Override - public Duration getNextDelay() { - return currentDelay; - } - - @Override - public void reset() { - currentDelay = initialDelay; - } - } - private static class TestHttpSender implements HttpSender { private final List requests = new ArrayList<>(); diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/PeriodicDelayWithSuggestion.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/PeriodicDelayWithSuggestion.java new file mode 100644 index 000000000..08434a0d3 --- /dev/null +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/PeriodicDelayWithSuggestion.java @@ -0,0 +1,30 @@ +package io.opentelemetry.opamp.client.internal.request.service; + +import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion; +import io.opentelemetry.opamp.client.internal.request.delay.PeriodicDelay; +import java.time.Duration; + +public class PeriodicDelayWithSuggestion implements PeriodicDelay, AcceptsDelaySuggestion { + private final Duration initialDelay; + private Duration currentDelay; + + public PeriodicDelayWithSuggestion(Duration initialDelay) { + this.initialDelay = initialDelay; + currentDelay = initialDelay; + } + + @Override + public void suggestDelay(Duration delay) { + currentDelay = delay; + } + + @Override + public Duration getNextDelay() { + return currentDelay; + } + + @Override + public void reset() { + currentDelay = initialDelay; + } +} diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/TestScheduler.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/TestScheduler.java index d3ef55b3e..e79590bcb 100644 --- a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/TestScheduler.java +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/TestScheduler.java @@ -4,7 +4,6 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import java.time.Duration; import java.util.ArrayList; @@ -45,7 +44,7 @@ private ScheduledExecutorService createTestScheduleExecutorService() { .when(service) .execute(any()); - when(service.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + lenient().when(service.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) .thenAnswer( invocation -> { Task task = new Task(invocation.getArgument(0), invocation.getArgument(1)); diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestServiceTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestServiceTest.java new file mode 100644 index 000000000..2f63d794b --- /dev/null +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestServiceTest.java @@ -0,0 +1,331 @@ +package io.opentelemetry.opamp.client.internal.request.service; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.protobuf.CodedOutputStream; +import io.opentelemetry.opamp.client.internal.connectivity.websocket.WebSocket; +import io.opentelemetry.opamp.client.internal.request.Request; +import io.opentelemetry.opamp.client.internal.response.OpampServerResponseError; +import io.opentelemetry.opamp.client.internal.response.Response; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.time.Duration; +import opamp.proto.AgentToServer; +import opamp.proto.RetryInfo; +import opamp.proto.ServerErrorResponse; +import opamp.proto.ServerErrorResponseType; +import opamp.proto.ServerToAgent; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class WebSocketRequestServiceTest { + @Mock private WebSocket webSocket; + @Mock private RequestService.Callback callback; + @Mock private PeriodicDelayWithSuggestion retryDelay; + private Request request; + private TestScheduler scheduler; + private WebSocketRequestService requestService; + private static final Duration INITIAL_RETRY_DELAY = Duration.ofSeconds(1); + + @BeforeEach + void setUp() { + lenient().when(retryDelay.getNextDelay()).thenReturn(INITIAL_RETRY_DELAY); + scheduler = new TestScheduler(); + requestService = new WebSocketRequestService(webSocket, retryDelay, scheduler.getMockService()); + } + + @Test + void verifySuccessfulStart() { + startService(); + verify(webSocket).open(requestService); + + // When opening successfully, notify callback + requestService.onOpen(); + verify(callback).onConnectionSuccess(); + verifyNoMoreInteractions(callback); + + // It shouldn't allow starting again + try { + startService(); + fail(); + } catch (IllegalStateException e) { + assertThat(e).hasMessage("The service has already started"); + } + } + + @Test + void verifyFailedStart() { + startService(); + verify(webSocket).open(requestService); + + // When failing while opening, notify callback + Throwable t = mock(); + requestService.onFailure(t); + verify(retryDelay).reset(); + verify(callback).onConnectionFailed(t); + verifyNoMoreInteractions(callback); + + // Check connection retry is scheduled + assertThat(scheduler.getScheduledTasks()).hasSize(1); + assertThat(scheduler.getScheduledTasks().get(0).getDelay()).isEqualTo(INITIAL_RETRY_DELAY); + + // It shouldn't allow starting again + try { + startService(); + fail(); + } catch (IllegalStateException e) { + assertThat(e).hasMessage("The service has already started"); + } + + // It shouldn't schedule more than one retry at a time + clearInvocations(retryDelay, callback); + requestService.onFailure(t); + verify(callback).onConnectionFailed(t); + verifyNoInteractions(retryDelay); + verifyNoMoreInteractions(callback); + assertThat(scheduler.getScheduledTasks()).hasSize(1); + + // Execute retry with new delay + clearInvocations(webSocket, callback); + when(retryDelay.getNextDelay()).thenReturn(Duration.ofSeconds(5)); + scheduler.getScheduledTasks().get(0).run(); + assertThat(scheduler.getScheduledTasks()).isEmpty(); + verify(webSocket).open(requestService); + + // Fail again + requestService.onFailure(t); + verify(retryDelay, never()).reset(); + verify(callback).onConnectionFailed(t); + + // A new retry has been scheduled + assertThat(scheduler.getScheduledTasks()).hasSize(1); + assertThat(scheduler.getScheduledTasks().get(0).getDelay()).isEqualTo(Duration.ofSeconds(5)); + + // Execute retry again + clearInvocations(webSocket, callback); + scheduler.getScheduledTasks().get(0).run(); + assertThat(scheduler.getScheduledTasks()).isEmpty(); + verify(webSocket).open(requestService); + + // Succeed + requestService.onOpen(); + verify(callback).onConnectionSuccess(); + verifyNoMoreInteractions(callback); + + // Fail at some point + clearInvocations(callback); + requestService.onFailure(t); + verify(callback).onConnectionFailed(t); + verifyNoMoreInteractions(callback); + verify(retryDelay).reset(); + assertThat(scheduler.getScheduledTasks()).hasSize(1); + } + + @Test + void verifySendRequest() { + // Validate when not running + try { + requestService.sendRequest(); + fail(); + } catch (IllegalStateException e) { + assertThat(e).hasMessage("The service is not running"); + } + + startService(); + + // Successful send + when(webSocket.send(any())).thenReturn(true); + requestService.sendRequest(); + verify(webSocket).send(getExpectedOutgoingBytes()); + + // Check there are no pending requests + clearInvocations(webSocket); + requestService.onOpen(); + verifyNoInteractions(webSocket); + + // Failed send + when(webSocket.send(any())).thenReturn(false); + requestService.sendRequest(); + clearInvocations(webSocket); + + // Check pending request + when(webSocket.send(any())).thenReturn(true); + requestService.onOpen(); + verify(webSocket).send(getExpectedOutgoingBytes()); + } + + @Test + void verifyOnMessage() { + startService(); + + // Successful message + ServerToAgent serverToAgent = new ServerToAgent.Builder().build(); + requestService.onMessage(createServerToAgentPayload(serverToAgent)); + verify(callback).onRequestSuccess(Response.create(serverToAgent)); + verifyNoMoreInteractions(callback); + assertThat(scheduler.getScheduledTasks()).isEmpty(); + + // Regular error message + ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class); + clearInvocations(callback); + serverToAgent = + new ServerToAgent.Builder() + .error_response(new ServerErrorResponse.Builder().error_message("A message").build()) + .build(); + requestService.onMessage(createServerToAgentPayload(serverToAgent)); + verify(callback).onRequestFailed(throwableCaptor.capture()); + verifyNoMoreInteractions(callback); + OpampServerResponseError error = (OpampServerResponseError) throwableCaptor.getValue(); + assertThat(error.getMessage()).isEqualTo("A message"); + assertThat(scheduler.getScheduledTasks()).isEmpty(); + + // Error message with unavailable status + clearInvocations(callback); + serverToAgent = + new ServerToAgent.Builder() + .error_response( + new ServerErrorResponse.Builder() + .type(ServerErrorResponseType.ServerErrorResponseType_Unavailable) + .error_message("Try later") + .build()) + .build(); + requestService.onMessage(createServerToAgentPayload(serverToAgent)); + verify(callback).onRequestFailed(throwableCaptor.capture()); + verifyNoMoreInteractions(callback); + OpampServerResponseError unavailableError = + (OpampServerResponseError) throwableCaptor.getValue(); + assertThat(unavailableError.getMessage()).isEqualTo("Try later"); + assertThat(scheduler.getScheduledTasks()).hasSize(1); + verify(retryDelay, never()).suggestDelay(any()); + + // Reset scheduled retry + scheduler.getScheduledTasks().get(0).run(); + requestService.onOpen(); + + // Error message with unavailable status and suggested delay + Duration suggestedDelay = Duration.ofSeconds(10); + clearInvocations(callback, retryDelay); + serverToAgent = + new ServerToAgent.Builder() + .error_response( + new ServerErrorResponse.Builder() + .type(ServerErrorResponseType.ServerErrorResponseType_Unavailable) + .retry_info( + new RetryInfo.Builder() + .retry_after_nanoseconds(suggestedDelay.toNanos()) + .build()) + .build()) + .build(); + requestService.onMessage(createServerToAgentPayload(serverToAgent)); + verify(callback).onRequestFailed(throwableCaptor.capture()); + verifyNoMoreInteractions(callback); + OpampServerResponseError unavailableErrorWithSuggestedDelay = + (OpampServerResponseError) throwableCaptor.getValue(); + assertThat(unavailableErrorWithSuggestedDelay.getMessage()).isEmpty(); + assertThat(scheduler.getScheduledTasks()).hasSize(1); + verify(retryDelay).suggestDelay(suggestedDelay); + } + + @Test + void verifyStop() { + startService(); + + requestService.stop(); + + InOrder inOrder = inOrder(webSocket); + inOrder.verify(webSocket).send(getExpectedOutgoingBytes()); + inOrder.verify(webSocket).close(1000, null); + verify(scheduler.getMockService()).shutdown(); + + // If something fails afterward, no retry should get scheduled. + requestService.onFailure(mock()); + verifyNoInteractions(retryDelay); + assertThat(scheduler.getScheduledTasks()).isEmpty(); + + // If onClosed is called afterward, no retry should get scheduled. + requestService.onClosed(); + verifyNoInteractions(retryDelay); + assertThat(scheduler.getScheduledTasks()).isEmpty(); + + // If a new message with a server unavailable error arrives afterward, no retry should get + // scheduled. + ServerToAgent serverToAgent = + new ServerToAgent.Builder() + .error_response( + new ServerErrorResponse.Builder() + .type(ServerErrorResponseType.ServerErrorResponseType_Unavailable) + .build()) + .build(); + requestService.onMessage(createServerToAgentPayload(serverToAgent)); + verifyNoInteractions(retryDelay); + assertThat(scheduler.getScheduledTasks()).isEmpty(); + + // Requests cannot get enqueued afterward. + try { + requestService.sendRequest(); + fail(); + } catch (IllegalStateException e) { + assertThat(e).hasMessage("This service is already stopped"); + } + + // The service cannot get restarted afterward. + try { + startService(); + fail(); + } catch (IllegalStateException e) { + assertThat(e).hasMessage("This service is already stopped"); + } + } + + private byte[] getExpectedOutgoingBytes() { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + CodedOutputStream codedOutput = CodedOutputStream.newInstance(outputStream); + codedOutput.writeUInt64NoTag(0); + byte[] payload = request.getAgentToServer().encode(); + codedOutput.writeRawBytes(payload); + codedOutput.flush(); + return outputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static byte[] createServerToAgentPayload(ServerToAgent serverToAgent) { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + CodedOutputStream codedOutput = CodedOutputStream.newInstance(outputStream); + codedOutput.writeUInt64NoTag(0); + codedOutput.writeRawBytes(serverToAgent.encode()); + codedOutput.flush(); + return outputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void startService() { + requestService.start(callback, this::createRequest); + } + + private Request createRequest() { + AgentToServer agentToServer = new AgentToServer.Builder().sequence_num(10).build(); + request = Request.create(agentToServer); + return request; + } +} From 0e0b9824a4ea7c9924b1daff78adf76b20ea63a1 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Fri, 20 Jun 2025 15:55:13 +0200 Subject: [PATCH 06/24] Adding test dependency --- dependencyManagement/build.gradle.kts | 1 + opamp-client/build.gradle.kts | 1 + 2 files changed, 2 insertions(+) diff --git a/dependencyManagement/build.gradle.kts b/dependencyManagement/build.gradle.kts index 6a496980c..a35a71f98 100644 --- a/dependencyManagement/build.gradle.kts +++ b/dependencyManagement/build.gradle.kts @@ -57,5 +57,6 @@ dependencies { api("tools.profiler:async-profiler:4.0") api("com.blogspot.mydailyjava:weak-lock-free:0.18") api("org.agrona:agrona:1.22.0") + api("com.google.protobuf:protobuf-java-util:4.29.1") } } diff --git a/opamp-client/build.gradle.kts b/opamp-client/build.gradle.kts index 0e33a87a6..7ba1c5701 100644 --- a/opamp-client/build.gradle.kts +++ b/opamp-client/build.gradle.kts @@ -16,6 +16,7 @@ dependencies { annotationProcessor("com.google.auto.value:auto-value") compileOnly("com.google.auto.value:auto-value-annotations") testImplementation("org.mockito:mockito-inline") + testImplementation("com.google.protobuf:protobuf-java-util") } val opampReleaseInfo = tasks.register("opampLastReleaseInfo") { From 65156f87d9e279b73fb9b5b2054bf14b642ff4fc Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Fri, 20 Jun 2025 15:59:14 +0200 Subject: [PATCH 07/24] Clean up --- .../internal/request/service/WebSocketRequestService.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java index 5ac5eeb27..0cc1f766f 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java @@ -158,8 +158,7 @@ public void onMessage(byte[] data) { } private static ServerToAgent readServerToAgent(byte[] data) throws IOException { - Long header = ProtoAdapter.UINT64.decode(data); - int headerSize = ProtoAdapter.UINT64.encodedSize(header); + int headerSize = ProtoAdapter.UINT64.encodedSize(ProtoAdapter.UINT64.decode(data)); int payloadSize = data.length - headerSize; byte[] payload = new byte[payloadSize]; System.arraycopy(data, headerSize, payload, 0, payloadSize); From a6bf6e04a0906da5710a80d851901824c4bbceaa Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Fri, 20 Jun 2025 16:07:43 +0200 Subject: [PATCH 08/24] Spotless --- .../internal/request/service/WebSocketRequestService.java | 5 +++++ .../connectivity/websocket/OkHttpWebSocketTest.java | 5 +++++ .../request/service/PeriodicDelayWithSuggestion.java | 5 +++++ .../client/internal/request/service/TestScheduler.java | 8 +++++++- .../request/service/WebSocketRequestServiceTest.java | 5 +++++ 5 files changed, 27 insertions(+), 1 deletion(-) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java index 0cc1f766f..cb56b7c01 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java @@ -1,3 +1,8 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + package io.opentelemetry.opamp.client.internal.request.service; import com.squareup.wire.ProtoAdapter; diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java index 4579ba573..553ab916d 100644 --- a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java @@ -1,3 +1,8 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + package io.opentelemetry.opamp.client.internal.connectivity.websocket; import static org.assertj.core.api.Assertions.assertThat; diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/PeriodicDelayWithSuggestion.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/PeriodicDelayWithSuggestion.java index 08434a0d3..071067611 100644 --- a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/PeriodicDelayWithSuggestion.java +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/PeriodicDelayWithSuggestion.java @@ -1,3 +1,8 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + package io.opentelemetry.opamp.client.internal.request.service; import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion; diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/TestScheduler.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/TestScheduler.java index e79590bcb..18379a421 100644 --- a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/TestScheduler.java +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/TestScheduler.java @@ -1,3 +1,8 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + package io.opentelemetry.opamp.client.internal.request.service; import static org.mockito.ArgumentMatchers.any; @@ -44,7 +49,8 @@ private ScheduledExecutorService createTestScheduleExecutorService() { .when(service) .execute(any()); - lenient().when(service.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + lenient() + .when(service.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) .thenAnswer( invocation -> { Task task = new Task(invocation.getArgument(0), invocation.getArgument(1)); diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestServiceTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestServiceTest.java index 2f63d794b..afca33fd0 100644 --- a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestServiceTest.java +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestServiceTest.java @@ -1,3 +1,8 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + package io.opentelemetry.opamp.client.internal.request.service; import static org.assertj.core.api.Assertions.assertThat; From 228a010c67499e494ab598b21ba0b61839c31e1e Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Thu, 26 Jun 2025 17:17:39 +0200 Subject: [PATCH 09/24] Using enum for OkHttpWebSocket statuses --- .../websocket/OkHttpWebSocket.java | 40 ++++++++----------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java index 48f0e1a78..136e95001 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java @@ -6,7 +6,6 @@ package io.opentelemetry.opamp.client.internal.connectivity.websocket; import java.util.Objects; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -18,9 +17,7 @@ public class OkHttpWebSocket implements WebSocket { private final String url; private final OkHttpClient client; - private final AtomicBoolean starting = new AtomicBoolean(false); - private final AtomicBoolean closing = new AtomicBoolean(false); - private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicReference status = new AtomicReference<>(Status.NOT_RUNNING); private final AtomicReference webSocket = new AtomicReference<>(); public static OkHttpWebSocket create(String url) { @@ -38,10 +35,7 @@ private OkHttpWebSocket(String url, OkHttpClient client) { @Override public void open(Listener listener) { - if (running.get()) { - return; - } - if (starting.compareAndSet(false, true)) { + if (status.compareAndSet(Status.NOT_RUNNING, Status.STARTING)) { okhttp3.Request request = new okhttp3.Request.Builder().url(url).build(); webSocket.set(client.newWebSocket(request, new ListenerAdapter(listener))); } @@ -49,7 +43,7 @@ public void open(Listener listener) { @Override public boolean send(byte[] request) { - if (!running.get()) { + if (status.get() != Status.RUNNING) { return false; } return getWebSocket().send(ByteString.of(request)); @@ -57,13 +51,9 @@ public boolean send(byte[] request) { @Override public void close(int code, @Nullable String reason) { - if (!running.get()) { - return; - } - if (closing.compareAndSet(false, true)) { + if (status.compareAndSet(Status.RUNNING, Status.CLOSING)) { if (!getWebSocket().close(code, reason)) { - closing.set(false); - running.set(false); + status.set(Status.NOT_RUNNING); } } } @@ -81,30 +71,25 @@ private ListenerAdapter(Listener delegate) { @Override public void onOpen(@Nonnull okhttp3.WebSocket webSocket, @Nonnull Response response) { - running.set(true); - starting.set(false); + status.set(Status.RUNNING); delegate.onOpen(); } @Override public void onClosing(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) { - running.set(false); - closing.set(true); + status.set(Status.CLOSING); } @Override public void onClosed(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) { - running.set(false); - closing.set(false); + status.set(Status.NOT_RUNNING); delegate.onClosed(); } @Override public void onFailure( @Nonnull okhttp3.WebSocket webSocket, @Nonnull Throwable t, @Nullable Response response) { - running.set(false); - starting.set(false); - closing.set(false); + status.set(Status.NOT_RUNNING); delegate.onFailure(t); } @@ -113,4 +98,11 @@ public void onMessage(@Nonnull okhttp3.WebSocket webSocket, @Nonnull ByteString delegate.onMessage(bytes.toByteArray()); } } + + enum Status { + NOT_RUNNING, + STARTING, + CLOSING, + RUNNING + } } From c7c74ca65e55969c30c52a36b511f88d1c8e85a1 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Thu, 26 Jun 2025 17:25:50 +0200 Subject: [PATCH 10/24] Using protobuf bom --- dependencyManagement/build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dependencyManagement/build.gradle.kts b/dependencyManagement/build.gradle.kts index a35a71f98..bb0adaf1f 100644 --- a/dependencyManagement/build.gradle.kts +++ b/dependencyManagement/build.gradle.kts @@ -16,6 +16,7 @@ dependencies { // as runtime dependencies if they are actually used as runtime dependencies) api(enforcedPlatform("io.opentelemetry.instrumentation:opentelemetry-instrumentation-bom-alpha:${otelInstrumentationVersion}")) api(enforcedPlatform("com.fasterxml.jackson:jackson-bom:2.19.1")) + api(enforcedPlatform("com.google.protobuf:protobuf-bom:4.31.1")) constraints { api("io.opentelemetry.semconv:opentelemetry-semconv:${semconvVersion}") @@ -57,6 +58,5 @@ dependencies { api("tools.profiler:async-profiler:4.0") api("com.blogspot.mydailyjava:weak-lock-free:0.18") api("org.agrona:agrona:1.22.0") - api("com.google.protobuf:protobuf-java-util:4.29.1") } } From cd80847fd425817a9d04d2d6b4bd8b6cfc3f5f7f Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Thu, 26 Jun 2025 17:39:57 +0200 Subject: [PATCH 11/24] Validating illegal argument exception when closing --- .../internal/connectivity/websocket/OkHttpWebSocket.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java index 136e95001..da1f58d48 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java @@ -52,8 +52,13 @@ public boolean send(byte[] request) { @Override public void close(int code, @Nullable String reason) { if (status.compareAndSet(Status.RUNNING, Status.CLOSING)) { - if (!getWebSocket().close(code, reason)) { - status.set(Status.NOT_RUNNING); + try { + if (!getWebSocket().close(code, reason)) { + status.set(Status.NOT_RUNNING); + } + } catch (IllegalArgumentException e) { + status.set(Status.RUNNING); + throw e; } } } From ac84012173af97362a228336c54e5abe4cfae8ae Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Thu, 26 Jun 2025 17:49:03 +0200 Subject: [PATCH 12/24] Updating javadoc wording --- .../client/internal/connectivity/websocket/WebSocket.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java index b40409f98..67ec4d4d4 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java @@ -40,8 +40,8 @@ public interface WebSocket { interface Listener { /** - * Called when the websocket connection is successfully established with the remote peer and may - * start sending messages. + * Called when the websocket connection is successfully established with the remote peer. The + * client may start sending messages after this method is called. */ void onOpen(); From ba37adc9094da275b8c0e8949f634f5a14be3f29 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Thu, 26 Jun 2025 18:35:48 +0200 Subject: [PATCH 13/24] Synchronizing access to hasPendingRequest --- .../service/WebSocketRequestService.java | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java index cb56b7c01..451160017 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java @@ -23,6 +23,7 @@ import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import opamp.proto.ServerErrorResponse; import opamp.proto.ServerErrorResponseType; import opamp.proto.ServerToAgent; @@ -32,12 +33,16 @@ public final class WebSocketRequestService implements RequestService, WebSocket. private final PeriodicDelay periodicRetryDelay; private final AtomicBoolean retryingConnection = new AtomicBoolean(false); private final AtomicBoolean nextRetryScheduled = new AtomicBoolean(false); - private final AtomicBoolean hasPendingRequest = new AtomicBoolean(false); private final AtomicBoolean isRunning = new AtomicBoolean(false); private final AtomicBoolean hasStopped = new AtomicBoolean(false); private final ScheduledExecutorService executorService; public static final PeriodicDelay DEFAULT_DELAY_BETWEEN_RETRIES = PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30)); + + @GuardedBy("hasPendingRequestLock") + private boolean hasPendingRequest = false; + + private final Object hasPendingRequestLock = new Object(); @Nullable private Callback callback; @Nullable private Supplier requestSupplier; @@ -103,8 +108,10 @@ public void sendRequest() { private void doSendRequest() { try { - if (!trySendRequest()) { - hasPendingRequest.set(true); + synchronized (hasPendingRequestLock) { + if (!trySendRequest()) { + hasPendingRequest = true; + } } } catch (IOException e) { getCallback().onRequestFailed(e); @@ -138,8 +145,11 @@ public void stop() { public void onOpen() { retryingConnection.set(false); getCallback().onConnectionSuccess(); - if (hasPendingRequest.compareAndSet(true, false)) { - sendRequest(); + synchronized (hasPendingRequestLock) { + if (hasPendingRequest) { + hasPendingRequest = false; + sendRequest(); + } } } From b3929232c3a4bcfbe2981e992f2c3bb92ce56d32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar?= <56847527+LikeTheSalad@users.noreply.github.com> Date: Fri, 27 Jun 2025 06:36:58 +0200 Subject: [PATCH 14/24] Update opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java Co-authored-by: Lauri Tulmin --- .../opamp/client/internal/connectivity/websocket/WebSocket.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java index b40409f98..53cb528f9 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java @@ -30,7 +30,7 @@ public interface WebSocket { * Sends a message via the websocket connection. * * @param request The message payload. - * @return {@link Boolean#FALSE} If the message can't get dispatched for any reason, whether the + * @return {@link Boolean#FALSE} If the message can't be dispatched for any reason, whether the * websocket isn't running, or the connection isn't established, or it's terminated. {@link * Boolean#TRUE} if the message can get sent. Returning {@link Boolean#TRUE} doesn't guarantee * that the message will arrive at the remote peer. From c9fba4259689a9f17e49867380f1b6f3aac5805c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar?= <56847527+LikeTheSalad@users.noreply.github.com> Date: Fri, 27 Jun 2025 06:37:08 +0200 Subject: [PATCH 15/24] Update opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java Co-authored-by: Lauri Tulmin --- .../opamp/client/internal/connectivity/websocket/WebSocket.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java index 53cb528f9..ebf63bc34 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java @@ -57,7 +57,7 @@ interface Listener { /** * Called when the connection is closed or cannot be established due to an error. No messages - * can get transmitted after this method is called. + * can be transmitted after this method is called. * * @param t The error. */ From 522587d6cce1992e6adc736ffec9111a3af63712 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar?= <56847527+LikeTheSalad@users.noreply.github.com> Date: Fri, 27 Jun 2025 06:37:18 +0200 Subject: [PATCH 16/24] Update opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java Co-authored-by: Lauri Tulmin --- .../opamp/client/internal/connectivity/websocket/WebSocket.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java index ebf63bc34..533690573 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java @@ -45,7 +45,7 @@ interface Listener { */ void onOpen(); - /** Called when the connection is terminated and no further messages can get transmitted. */ + /** Called when the connection is terminated and no further messages can be transmitted. */ void onClosed(); /** From b3716295a3e6bcca74213c73986f064b972c6a92 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Fri, 27 Jun 2025 06:44:10 +0200 Subject: [PATCH 17/24] Adding onClosing method --- .../internal/connectivity/websocket/WebSocket.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java index 3098480c0..56978bbef 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java @@ -30,10 +30,10 @@ public interface WebSocket { * Sends a message via the websocket connection. * * @param request The message payload. - * @return {@link Boolean#FALSE} If the message can't be dispatched for any reason, whether the - * websocket isn't running, or the connection isn't established, or it's terminated. {@link - * Boolean#TRUE} if the message can get sent. Returning {@link Boolean#TRUE} doesn't guarantee - * that the message will arrive at the remote peer. + * @return {@code false} If the message can't be dispatched for any reason, whether the websocket + * isn't running, or the connection isn't established, or it's terminated. {@code true} if the + * message can get sent. Returning {@code true} doesn't guarantee that the message will arrive + * at the remote peer. */ boolean send(byte[] request); @@ -45,6 +45,12 @@ interface Listener { */ void onOpen(); + /** + * Called when the closing handshake has started. No further messages will be sent after this + * method call. + */ + void onClosing(); + /** Called when the connection is terminated and no further messages can be transmitted. */ void onClosed(); From dc7f2a996fc81dcf5108e5eee9027f810968badc Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Fri, 27 Jun 2025 06:46:10 +0200 Subject: [PATCH 18/24] Using onClosing --- .../internal/connectivity/websocket/OkHttpWebSocket.java | 1 + .../internal/request/service/WebSocketRequestService.java | 5 +++++ .../internal/connectivity/websocket/OkHttpWebSocketTest.java | 1 + 3 files changed, 7 insertions(+) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java index da1f58d48..01c1c5f3a 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java @@ -83,6 +83,7 @@ public void onOpen(@Nonnull okhttp3.WebSocket webSocket, @Nonnull Response respo @Override public void onClosing(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) { status.set(Status.CLOSING); + delegate.onClosing(); } @Override diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java index 451160017..b8e67c58b 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java @@ -219,6 +219,11 @@ private void retryConnection() { startConnection(); } + @Override + public void onClosing() { + // Noop + } + @Override public void onClosed() { // If this service isn't stopped, we should retry connecting. diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java index 553ab916d..ab84b3881 100644 --- a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java @@ -136,6 +136,7 @@ private void callOnClosed() { private void callOnClosing() { listenerCaptor.getValue().onClosing(mock(), 0, ""); + verify(listener).onClosing(); } private void callOnFailure() { From 061e743f595141cf542972796bdb68ae568b4e70 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Tue, 1 Jul 2025 09:49:52 +0200 Subject: [PATCH 19/24] Adding comment explaining error rethrowing --- .../client/internal/connectivity/websocket/OkHttpWebSocket.java | 1 + 1 file changed, 1 insertion(+) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java index 01c1c5f3a..ec453bf6f 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java @@ -58,6 +58,7 @@ public void close(int code, @Nullable String reason) { } } catch (IllegalArgumentException e) { status.set(Status.RUNNING); + // Re-throwing as this error happens due to a caller error. throw e; } } From 560c92ad8cf0783dfdd8b1c713b40781abc50b03 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Tue, 1 Jul 2025 09:56:50 +0200 Subject: [PATCH 20/24] Using daemon thread factory --- .../request/service/DaemonThreadFactory.java | 20 +++++++++++++++++++ .../request/service/HttpRequestService.java | 17 ---------------- .../service/WebSocketRequestService.java | 4 +++- 3 files changed, 23 insertions(+), 18 deletions(-) create mode 100644 opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/DaemonThreadFactory.java diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/DaemonThreadFactory.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/DaemonThreadFactory.java new file mode 100644 index 000000000..d310e211e --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/DaemonThreadFactory.java @@ -0,0 +1,20 @@ +package io.opentelemetry.opamp.client.internal.request.service; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import javax.annotation.Nonnull; + +final class DaemonThreadFactory implements ThreadFactory { + private final ThreadFactory delegate = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(@Nonnull Runnable r) { + Thread t = delegate.newThread(r); + try { + t.setDaemon(true); + } catch (SecurityException e) { + // Well, we tried. + } + return t; + } +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java index 32d5a60f1..1b196218b 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java @@ -22,13 +22,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -import javax.annotation.Nonnull; import javax.annotation.Nullable; import opamp.proto.AgentToServer; import opamp.proto.ServerErrorResponse; @@ -255,19 +253,4 @@ Duration getNextDelay() { return currentDelay.getNextDelay(); } } - - private static class DaemonThreadFactory implements ThreadFactory { - private final ThreadFactory delegate = Executors.defaultThreadFactory(); - - @Override - public Thread newThread(@Nonnull Runnable r) { - Thread t = delegate.newThread(r); - try { - t.setDaemon(true); - } catch (SecurityException e) { - // Well, we tried. - } - return t; - } - } } diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java index b8e67c58b..3450efa4a 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java @@ -64,7 +64,9 @@ public static WebSocketRequestService create(WebSocket webSocket) { public static WebSocketRequestService create( WebSocket webSocket, PeriodicDelay periodicRetryDelay) { return new WebSocketRequestService( - webSocket, periodicRetryDelay, Executors.newSingleThreadScheduledExecutor()); + webSocket, + periodicRetryDelay, + Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory())); } WebSocketRequestService( From 950d7c3d5a3ba1708bfafb43d3d88cb9ccbc20ac Mon Sep 17 00:00:00 2001 From: otelbot <197425009+otelbot@users.noreply.github.com> Date: Tue, 1 Jul 2025 07:58:03 +0000 Subject: [PATCH 21/24] ./gradlew spotlessApply --- .../client/internal/request/service/DaemonThreadFactory.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/DaemonThreadFactory.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/DaemonThreadFactory.java index d310e211e..77f84f72a 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/DaemonThreadFactory.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/DaemonThreadFactory.java @@ -1,3 +1,8 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + package io.opentelemetry.opamp.client.internal.request.service; import java.util.concurrent.Executors; From 83b4c54fec229b60ce5994ef280a3aaf7786f0ce Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Tue, 8 Jul 2025 15:07:49 +0200 Subject: [PATCH 22/24] Describing websocket close code --- .../internal/request/service/WebSocketRequestService.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java index 3450efa4a..85f0a8b31 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java @@ -39,6 +39,9 @@ public final class WebSocketRequestService implements RequestService, WebSocket. public static final PeriodicDelay DEFAULT_DELAY_BETWEEN_RETRIES = PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30)); + /** Defined here. */ + private static final int WEBSOCKET_NORMAL_CLOSURE_CODE = 1000; + @GuardedBy("hasPendingRequestLock") private boolean hasPendingRequest = false; @@ -138,7 +141,7 @@ private Request getRequest() { public void stop() { if (hasStopped.compareAndSet(false, true)) { doSendRequest(); - webSocket.close(1000, null); + webSocket.close(WEBSOCKET_NORMAL_CLOSURE_CODE, null); executorService.shutdown(); } } @@ -190,7 +193,7 @@ private void handleServerError(ServerErrorResponse errorResponse) { retryAfter = Duration.ofNanos(errorResponse.retry_info.retry_after_nanoseconds); } - webSocket.close(1000, null); + webSocket.close(WEBSOCKET_NORMAL_CLOSURE_CODE, null); scheduleConnectionRetry(retryAfter); } } From 3f7f483ae24c25d6f3eaa21fe79e1712226827b7 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Tue, 8 Jul 2025 15:16:57 +0200 Subject: [PATCH 23/24] Adding comment to explain last message when closing the websocket service --- .../internal/request/service/WebSocketRequestService.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java index 85f0a8b31..e143e4580 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java @@ -140,6 +140,12 @@ private Request getRequest() { @Override public void stop() { if (hasStopped.compareAndSet(false, true)) { + /* + Sending last message as explained in the spec: + https://opentelemetry.io/docs/specs/opamp/#websocket-transport-opamp-client-initiated. + The client implementation must ensure that the "agent_disconnect" field will be provided in the + next supplied request body. + */ doSendRequest(); webSocket.close(WEBSOCKET_NORMAL_CLOSURE_CODE, null); executorService.shutdown(); From 3728857905f29b5af125121e9cf012128c6ed4d2 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Tue, 8 Jul 2025 16:37:21 +0200 Subject: [PATCH 24/24] Applying changes suggested from reviews --- .../service/WebSocketRequestService.java | 81 +++++++++++-------- 1 file changed, 49 insertions(+), 32 deletions(-) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java index e143e4580..a55a1be70 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java @@ -29,15 +29,14 @@ import opamp.proto.ServerToAgent; public final class WebSocketRequestService implements RequestService, WebSocket.Listener { + private static final PeriodicDelay DEFAULT_DELAY_BETWEEN_RETRIES = + PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30)); + private final WebSocket webSocket; - private final PeriodicDelay periodicRetryDelay; - private final AtomicBoolean retryingConnection = new AtomicBoolean(false); - private final AtomicBoolean nextRetryScheduled = new AtomicBoolean(false); private final AtomicBoolean isRunning = new AtomicBoolean(false); private final AtomicBoolean hasStopped = new AtomicBoolean(false); + private final ConnectionStatus connectionStatus; private final ScheduledExecutorService executorService; - public static final PeriodicDelay DEFAULT_DELAY_BETWEEN_RETRIES = - PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30)); /** Defined here. */ private static final int WEBSOCKET_NORMAL_CLOSURE_CODE = 1000; @@ -77,8 +76,8 @@ public static WebSocketRequestService create( PeriodicDelay periodicRetryDelay, ScheduledExecutorService executorService) { this.webSocket = webSocket; - this.periodicRetryDelay = periodicRetryDelay; this.executorService = executorService; + this.connectionStatus = new ConnectionStatus(periodicRetryDelay); } @Override @@ -154,7 +153,7 @@ public void stop() { @Override public void onOpen() { - retryingConnection.set(false); + connectionStatus.success(); getCallback().onConnectionSuccess(); synchronized (hasPendingRequestLock) { if (hasPendingRequest) { @@ -200,7 +199,7 @@ private void handleServerError(ServerErrorResponse errorResponse) { } webSocket.close(WEBSOCKET_NORMAL_CLOSURE_CODE, null); - scheduleConnectionRetry(retryAfter); + connectionStatus.retryAfter(retryAfter); } } @@ -208,28 +207,6 @@ private static boolean serverIsUnavailable(ServerErrorResponse errorResponse) { return errorResponse.type.equals(ServerErrorResponseType.ServerErrorResponseType_Unavailable); } - @SuppressWarnings("FutureReturnValueIgnored") - private void scheduleConnectionRetry(@Nullable Duration retryAfter) { - if (hasStopped.get()) { - return; - } - if (retryingConnection.compareAndSet(false, true)) { - periodicRetryDelay.reset(); - if (retryAfter != null && periodicRetryDelay instanceof AcceptsDelaySuggestion) { - ((AcceptsDelaySuggestion) periodicRetryDelay).suggestDelay(retryAfter); - } - } - if (nextRetryScheduled.compareAndSet(false, true)) { - executorService.schedule( - this::retryConnection, periodicRetryDelay.getNextDelay().toNanos(), TimeUnit.NANOSECONDS); - } - } - - private void retryConnection() { - nextRetryScheduled.set(false); - startConnection(); - } - @Override public void onClosing() { // Noop @@ -238,17 +215,57 @@ public void onClosing() { @Override public void onClosed() { // If this service isn't stopped, we should retry connecting. - scheduleConnectionRetry(null); + connectionStatus.retryAfter(null); } @Override public void onFailure(Throwable t) { getCallback().onConnectionFailed(t); - scheduleConnectionRetry(null); + connectionStatus.retryAfter(null); } @Nonnull private Callback getCallback() { return Objects.requireNonNull(callback); } + + private class ConnectionStatus { + private final PeriodicDelay periodicRetryDelay; + private final AtomicBoolean retryingConnection = new AtomicBoolean(false); + private final AtomicBoolean nextRetryScheduled = new AtomicBoolean(false); + + ConnectionStatus(PeriodicDelay periodicRetryDelay) { + this.periodicRetryDelay = periodicRetryDelay; + } + + void success() { + retryingConnection.set(false); + } + + @SuppressWarnings("FutureReturnValueIgnored") + void retryAfter(@Nullable Duration retryAfter) { + if (hasStopped.get()) { + return; + } + + if (retryingConnection.compareAndSet(false, true)) { + periodicRetryDelay.reset(); + if (retryAfter != null && periodicRetryDelay instanceof AcceptsDelaySuggestion) { + ((AcceptsDelaySuggestion) periodicRetryDelay).suggestDelay(retryAfter); + } + } + + if (nextRetryScheduled.compareAndSet(false, true)) { + executorService.schedule( + this::retryConnection, + periodicRetryDelay.getNextDelay().toNanos(), + TimeUnit.NANOSECONDS); + } + } + + private void retryConnection() { + nextRetryScheduled.set(false); + startConnection(); + } + } }