diff --git a/dependencyManagement/build.gradle.kts b/dependencyManagement/build.gradle.kts index e491e0cc3..d34d10871 100644 --- a/dependencyManagement/build.gradle.kts +++ b/dependencyManagement/build.gradle.kts @@ -16,6 +16,7 @@ dependencies { // as runtime dependencies if they are actually used as runtime dependencies) api(enforcedPlatform("io.opentelemetry.instrumentation:opentelemetry-instrumentation-bom-alpha:${otelInstrumentationVersion}")) api(enforcedPlatform("com.fasterxml.jackson:jackson-bom:2.19.1")) + api(enforcedPlatform("com.google.protobuf:protobuf-bom:4.31.1")) constraints { api("io.opentelemetry.semconv:opentelemetry-semconv:${semconvVersion}") diff --git a/opamp-client/build.gradle.kts b/opamp-client/build.gradle.kts index f3ad4cfda..993b918bd 100644 --- a/opamp-client/build.gradle.kts +++ b/opamp-client/build.gradle.kts @@ -16,6 +16,7 @@ dependencies { annotationProcessor("com.google.auto.value:auto-value") compileOnly("com.google.auto.value:auto-value-annotations") testImplementation("org.mockito:mockito-inline") + testImplementation("com.google.protobuf:protobuf-java-util") } val opampProtos = tasks.register("opampProtoDownload", download) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java new file mode 100644 index 000000000..ec453bf6f --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocket.java @@ -0,0 +1,115 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.connectivity.websocket; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import okhttp3.OkHttpClient; +import okhttp3.Response; +import okhttp3.WebSocketListener; +import okio.ByteString; + +public class OkHttpWebSocket implements WebSocket { + private final String url; + private final OkHttpClient client; + private final AtomicReference status = new AtomicReference<>(Status.NOT_RUNNING); + private final AtomicReference webSocket = new AtomicReference<>(); + + public static OkHttpWebSocket create(String url) { + return create(url, new OkHttpClient()); + } + + public static OkHttpWebSocket create(String url, OkHttpClient client) { + return new OkHttpWebSocket(url, client); + } + + private OkHttpWebSocket(String url, OkHttpClient client) { + this.url = url; + this.client = client; + } + + @Override + public void open(Listener listener) { + if (status.compareAndSet(Status.NOT_RUNNING, Status.STARTING)) { + okhttp3.Request request = new okhttp3.Request.Builder().url(url).build(); + webSocket.set(client.newWebSocket(request, new ListenerAdapter(listener))); + } + } + + @Override + public boolean send(byte[] request) { + if (status.get() != Status.RUNNING) { + return false; + } + return getWebSocket().send(ByteString.of(request)); + } + + @Override + public void close(int code, @Nullable String reason) { + if (status.compareAndSet(Status.RUNNING, Status.CLOSING)) { + try { + if (!getWebSocket().close(code, reason)) { + status.set(Status.NOT_RUNNING); + } + } catch (IllegalArgumentException e) { + status.set(Status.RUNNING); + // Re-throwing as this error happens due to a caller error. + throw e; + } + } + } + + private okhttp3.WebSocket getWebSocket() { + return Objects.requireNonNull(webSocket.get()); + } + + private class ListenerAdapter extends WebSocketListener { + private final Listener delegate; + + private ListenerAdapter(Listener delegate) { + this.delegate = delegate; + } + + @Override + public void onOpen(@Nonnull okhttp3.WebSocket webSocket, @Nonnull Response response) { + status.set(Status.RUNNING); + delegate.onOpen(); + } + + @Override + public void onClosing(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) { + status.set(Status.CLOSING); + delegate.onClosing(); + } + + @Override + public void onClosed(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) { + status.set(Status.NOT_RUNNING); + delegate.onClosed(); + } + + @Override + public void onFailure( + @Nonnull okhttp3.WebSocket webSocket, @Nonnull Throwable t, @Nullable Response response) { + status.set(Status.NOT_RUNNING); + delegate.onFailure(t); + } + + @Override + public void onMessage(@Nonnull okhttp3.WebSocket webSocket, @Nonnull ByteString bytes) { + delegate.onMessage(bytes.toByteArray()); + } + } + + enum Status { + NOT_RUNNING, + STARTING, + CLOSING, + RUNNING + } +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java new file mode 100644 index 000000000..56978bbef --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/WebSocket.java @@ -0,0 +1,72 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.connectivity.websocket; + +import javax.annotation.Nullable; + +public interface WebSocket { + /** + * Starts the websocket connection if it's not yet started or if it has been closed. + * + * @param listener Will receive events from the websocket connection. + */ + void open(Listener listener); + + /** + * Stops the websocket connection if running. Nothing will happen if it's already stopped. + * + * @param code Status code as defined by Section 7.4 of RFC 6455 + * @param reason Reason for shutting down, as explained in Section 5.5.1 of RFC + * 6455 + */ + void close(int code, @Nullable String reason); + + /** + * Sends a message via the websocket connection. + * + * @param request The message payload. + * @return {@code false} If the message can't be dispatched for any reason, whether the websocket + * isn't running, or the connection isn't established, or it's terminated. {@code true} if the + * message can get sent. Returning {@code true} doesn't guarantee that the message will arrive + * at the remote peer. + */ + boolean send(byte[] request); + + interface Listener { + + /** + * Called when the websocket connection is successfully established with the remote peer. The + * client may start sending messages after this method is called. + */ + void onOpen(); + + /** + * Called when the closing handshake has started. No further messages will be sent after this + * method call. + */ + void onClosing(); + + /** Called when the connection is terminated and no further messages can be transmitted. */ + void onClosed(); + + /** + * Called when receiving a message from the remote peer. + * + * @param data The payload sent by the remote peer. + */ + void onMessage(byte[] data); + + /** + * Called when the connection is closed or cannot be established due to an error. No messages + * can be transmitted after this method is called. + * + * @param t The error. + */ + void onFailure(Throwable t); + } +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/DaemonThreadFactory.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/DaemonThreadFactory.java new file mode 100644 index 000000000..77f84f72a --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/DaemonThreadFactory.java @@ -0,0 +1,25 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.request.service; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import javax.annotation.Nonnull; + +final class DaemonThreadFactory implements ThreadFactory { + private final ThreadFactory delegate = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(@Nonnull Runnable r) { + Thread t = delegate.newThread(r); + try { + t.setDaemon(true); + } catch (SecurityException e) { + // Well, we tried. + } + return t; + } +} diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java index 32d5a60f1..1b196218b 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java @@ -22,13 +22,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -import javax.annotation.Nonnull; import javax.annotation.Nullable; import opamp.proto.AgentToServer; import opamp.proto.ServerErrorResponse; @@ -255,19 +253,4 @@ Duration getNextDelay() { return currentDelay.getNextDelay(); } } - - private static class DaemonThreadFactory implements ThreadFactory { - private final ThreadFactory delegate = Executors.defaultThreadFactory(); - - @Override - public Thread newThread(@Nonnull Runnable r) { - Thread t = delegate.newThread(r); - try { - t.setDaemon(true); - } catch (SecurityException e) { - // Well, we tried. - } - return t; - } - } } diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java new file mode 100644 index 000000000..a55a1be70 --- /dev/null +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java @@ -0,0 +1,271 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.request.service; + +import com.squareup.wire.ProtoAdapter; +import io.opentelemetry.opamp.client.internal.connectivity.websocket.WebSocket; +import io.opentelemetry.opamp.client.internal.request.Request; +import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion; +import io.opentelemetry.opamp.client.internal.request.delay.PeriodicDelay; +import io.opentelemetry.opamp.client.internal.response.OpampServerResponseError; +import io.opentelemetry.opamp.client.internal.response.Response; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import opamp.proto.ServerErrorResponse; +import opamp.proto.ServerErrorResponseType; +import opamp.proto.ServerToAgent; + +public final class WebSocketRequestService implements RequestService, WebSocket.Listener { + private static final PeriodicDelay DEFAULT_DELAY_BETWEEN_RETRIES = + PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30)); + + private final WebSocket webSocket; + private final AtomicBoolean isRunning = new AtomicBoolean(false); + private final AtomicBoolean hasStopped = new AtomicBoolean(false); + private final ConnectionStatus connectionStatus; + private final ScheduledExecutorService executorService; + + /** Defined here. */ + private static final int WEBSOCKET_NORMAL_CLOSURE_CODE = 1000; + + @GuardedBy("hasPendingRequestLock") + private boolean hasPendingRequest = false; + + private final Object hasPendingRequestLock = new Object(); + @Nullable private Callback callback; + @Nullable private Supplier requestSupplier; + + /** + * Creates an {@link WebSocketRequestService}. + * + * @param webSocket The WebSocket implementation. + */ + public static WebSocketRequestService create(WebSocket webSocket) { + return create(webSocket, DEFAULT_DELAY_BETWEEN_RETRIES); + } + + /** + * Creates an {@link WebSocketRequestService}. + * + * @param webSocket The WebSocket implementation. + * @param periodicRetryDelay The time to wait between retries. + */ + public static WebSocketRequestService create( + WebSocket webSocket, PeriodicDelay periodicRetryDelay) { + return new WebSocketRequestService( + webSocket, + periodicRetryDelay, + Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory())); + } + + WebSocketRequestService( + WebSocket webSocket, + PeriodicDelay periodicRetryDelay, + ScheduledExecutorService executorService) { + this.webSocket = webSocket; + this.executorService = executorService; + this.connectionStatus = new ConnectionStatus(periodicRetryDelay); + } + + @Override + public void start(Callback callback, Supplier requestSupplier) { + if (hasStopped.get()) { + throw new IllegalStateException("This service is already stopped"); + } + if (isRunning.compareAndSet(false, true)) { + this.callback = callback; + this.requestSupplier = requestSupplier; + startConnection(); + } else { + throw new IllegalStateException("The service has already started"); + } + } + + private void startConnection() { + webSocket.open(this); + } + + @Override + public void sendRequest() { + if (!isRunning.get()) { + throw new IllegalStateException("The service is not running"); + } + if (hasStopped.get()) { + throw new IllegalStateException("This service is already stopped"); + } + + doSendRequest(); + } + + private void doSendRequest() { + try { + synchronized (hasPendingRequestLock) { + if (!trySendRequest()) { + hasPendingRequest = true; + } + } + } catch (IOException e) { + getCallback().onRequestFailed(e); + } + } + + private boolean trySendRequest() throws IOException { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + ProtoAdapter.UINT64.encode(outputStream, 0L); + byte[] payload = getRequest().getAgentToServer().encode(); + outputStream.write(payload); + return webSocket.send(outputStream.toByteArray()); + } + } + + @Nonnull + private Request getRequest() { + return Objects.requireNonNull(requestSupplier).get(); + } + + @Override + public void stop() { + if (hasStopped.compareAndSet(false, true)) { + /* + Sending last message as explained in the spec: + https://opentelemetry.io/docs/specs/opamp/#websocket-transport-opamp-client-initiated. + The client implementation must ensure that the "agent_disconnect" field will be provided in the + next supplied request body. + */ + doSendRequest(); + webSocket.close(WEBSOCKET_NORMAL_CLOSURE_CODE, null); + executorService.shutdown(); + } + } + + @Override + public void onOpen() { + connectionStatus.success(); + getCallback().onConnectionSuccess(); + synchronized (hasPendingRequestLock) { + if (hasPendingRequest) { + hasPendingRequest = false; + sendRequest(); + } + } + } + + @Override + public void onMessage(byte[] data) { + try { + ServerToAgent serverToAgent = readServerToAgent(data); + + if (serverToAgent.error_response != null) { + handleServerError(serverToAgent.error_response); + getCallback() + .onRequestFailed( + new OpampServerResponseError(serverToAgent.error_response.error_message)); + return; + } + + getCallback().onRequestSuccess(Response.create(serverToAgent)); + } catch (IOException e) { + getCallback().onRequestFailed(e); + } + } + + private static ServerToAgent readServerToAgent(byte[] data) throws IOException { + int headerSize = ProtoAdapter.UINT64.encodedSize(ProtoAdapter.UINT64.decode(data)); + int payloadSize = data.length - headerSize; + byte[] payload = new byte[payloadSize]; + System.arraycopy(data, headerSize, payload, 0, payloadSize); + return ServerToAgent.ADAPTER.decode(payload); + } + + private void handleServerError(ServerErrorResponse errorResponse) { + if (serverIsUnavailable(errorResponse)) { + Duration retryAfter = null; + + if (errorResponse.retry_info != null) { + retryAfter = Duration.ofNanos(errorResponse.retry_info.retry_after_nanoseconds); + } + + webSocket.close(WEBSOCKET_NORMAL_CLOSURE_CODE, null); + connectionStatus.retryAfter(retryAfter); + } + } + + private static boolean serverIsUnavailable(ServerErrorResponse errorResponse) { + return errorResponse.type.equals(ServerErrorResponseType.ServerErrorResponseType_Unavailable); + } + + @Override + public void onClosing() { + // Noop + } + + @Override + public void onClosed() { + // If this service isn't stopped, we should retry connecting. + connectionStatus.retryAfter(null); + } + + @Override + public void onFailure(Throwable t) { + getCallback().onConnectionFailed(t); + connectionStatus.retryAfter(null); + } + + @Nonnull + private Callback getCallback() { + return Objects.requireNonNull(callback); + } + + private class ConnectionStatus { + private final PeriodicDelay periodicRetryDelay; + private final AtomicBoolean retryingConnection = new AtomicBoolean(false); + private final AtomicBoolean nextRetryScheduled = new AtomicBoolean(false); + + ConnectionStatus(PeriodicDelay periodicRetryDelay) { + this.periodicRetryDelay = periodicRetryDelay; + } + + void success() { + retryingConnection.set(false); + } + + @SuppressWarnings("FutureReturnValueIgnored") + void retryAfter(@Nullable Duration retryAfter) { + if (hasStopped.get()) { + return; + } + + if (retryingConnection.compareAndSet(false, true)) { + periodicRetryDelay.reset(); + if (retryAfter != null && periodicRetryDelay instanceof AcceptsDelaySuggestion) { + ((AcceptsDelaySuggestion) periodicRetryDelay).suggestDelay(retryAfter); + } + } + + if (nextRetryScheduled.compareAndSet(false, true)) { + executorService.schedule( + this::retryConnection, + periodicRetryDelay.getNextDelay().toNanos(), + TimeUnit.NANOSECONDS); + } + } + + private void retryConnection() { + nextRetryScheduled.set(false); + startConnection(); + } + } +} diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java new file mode 100644 index 000000000..ab84b3881 --- /dev/null +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/connectivity/websocket/OkHttpWebSocketTest.java @@ -0,0 +1,152 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.connectivity.websocket; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.WebSocketListener; +import okio.ByteString; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class OkHttpWebSocketTest { + @Mock private OkHttpClient client; + @Mock private okhttp3.WebSocket okHttpWebSocket; + @Mock private WebSocket.Listener listener; + @Captor private ArgumentCaptor requestCaptor; + @Captor private ArgumentCaptor listenerCaptor; + private static final String URL = "ws://some.server"; + private OkHttpWebSocket webSocket; + + @BeforeEach + void setUp() { + webSocket = OkHttpWebSocket.create(URL, client); + when(client.newWebSocket(any(), any())).thenReturn(okHttpWebSocket); + } + + @Test + void validateOpen() { + // Assert websocket created + openAndCaptureArguments(); + assertThat(requestCaptor.getValue().url().host()).isEqualTo("some.server"); + + // Assert further calls to open won't do anything + webSocket.open(listener); + verifyNoMoreInteractions(client); + + // When connectivity succeeds, open calls won't do anything. + callOnOpen(); + webSocket.open(listener); + verifyNoMoreInteractions(client); + + // When connectivity fails, allow future open calls + clearInvocations(client); + callOnFailure(); + openAndCaptureArguments(); + assertThat(requestCaptor.getValue().url().host()).isEqualTo("some.server"); + } + + @Test + void validateSend() { + byte[] payload = new byte[1]; + + // Before opening + assertThat(webSocket.send(payload)).isFalse(); + + // After opening successfully + when(okHttpWebSocket.send(any(ByteString.class))).thenReturn(true); + openAndCaptureArguments(); + callOnOpen(); + assertThat(webSocket.send(payload)).isTrue(); + verify(okHttpWebSocket).send(ByteString.of(payload)); + + // After failing + callOnFailure(); + assertThat(webSocket.send(payload)).isFalse(); + verifyNoMoreInteractions(okHttpWebSocket); + } + + @Test + void validateClose() { + openAndCaptureArguments(); + + callOnOpen(); + webSocket.close(123, "something"); + verify(okHttpWebSocket).close(123, "something"); + + // Validate calling it again + webSocket.close(1, null); + verifyNoMoreInteractions(okHttpWebSocket); + + // Once closed, it should be possible to reopen it. + clearInvocations(client); + callOnClosed(); + openAndCaptureArguments(); + assertThat(requestCaptor.getValue().url().host()).isEqualTo("some.server"); + } + + @Test + void validateOnClosing() { + openAndCaptureArguments(); + + callOnOpen(); + callOnClosing(); + + // Validate calling after onClosing + webSocket.close(1, null); + verifyNoInteractions(okHttpWebSocket); + } + + @Test + void validateOnMessage() { + byte[] payload = new byte[1]; + openAndCaptureArguments(); + + listenerCaptor.getValue().onMessage(mock(), ByteString.of(payload)); + verify(listener).onMessage(payload); + } + + private void callOnOpen() { + listenerCaptor.getValue().onOpen(mock(), mock()); + verify(listener).onOpen(); + } + + private void callOnClosed() { + listenerCaptor.getValue().onClosed(mock(), 0, ""); + verify(listener).onClosed(); + } + + private void callOnClosing() { + listenerCaptor.getValue().onClosing(mock(), 0, ""); + verify(listener).onClosing(); + } + + private void callOnFailure() { + Throwable t = mock(); + listenerCaptor.getValue().onFailure(mock(), t, mock()); + verify(listener).onFailure(t); + } + + private void openAndCaptureArguments() { + webSocket.open(listener); + verify(client).newWebSocket(requestCaptor.capture(), listenerCaptor.capture()); + } +} diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java index 63439e8c6..9996c4783 100644 --- a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java @@ -8,10 +8,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -21,7 +19,6 @@ import io.opentelemetry.opamp.client.internal.connectivity.http.HttpSender; import io.opentelemetry.opamp.client.internal.connectivity.http.RetryAfterParser; import io.opentelemetry.opamp.client.internal.request.Request; -import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion; import io.opentelemetry.opamp.client.internal.request.delay.PeriodicDelay; import io.opentelemetry.opamp.client.internal.response.Response; import java.io.ByteArrayInputStream; @@ -33,10 +30,7 @@ import java.util.NoSuchElementException; import java.util.Queue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import opamp.proto.AgentToServer; @@ -44,7 +38,6 @@ import opamp.proto.ServerErrorResponse; import opamp.proto.ServerErrorResponseType; import opamp.proto.ServerToAgent; -import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -61,8 +54,7 @@ class HttpRequestServiceTest { private static final Duration RETRY_DELAY = Duration.ofSeconds(5); @Mock private RequestService.Callback callback; - private final List scheduledTasks = new ArrayList<>(); - private ScheduledExecutorService executorService; + private TestScheduler scheduler; private TestHttpSender requestSender; private PeriodicDelay periodicRequestDelay; private PeriodicDelayWithSuggestion periodicRetryDelay; @@ -74,42 +66,40 @@ void setUp() { requestSender = new TestHttpSender(); periodicRequestDelay = createPeriodicDelay(REGULAR_DELAY); periodicRetryDelay = createPeriodicDelayWithSuggestionSupport(RETRY_DELAY); - executorService = createTestScheduleExecutorService(); + scheduler = new TestScheduler(); httpRequestService = new HttpRequestService( requestSender, - executorService, + scheduler.getMockService(), periodicRequestDelay, periodicRetryDelay, RetryAfterParser.getInstance()); - httpRequestService.start(callback, this::createRequestSupplier); + httpRequestService.start(callback, this::createRequest); } @AfterEach void tearDown() { httpRequestService.stop(); - scheduledTasks.clear(); - verify(executorService).shutdown(); + verify(scheduler.getMockService()).shutdown(); } @Test void verifyStart_scheduledFirstTask() { - assertThat(scheduledTasks).hasSize(1); - ScheduledTask firstTask = scheduledTasks.get(0); - assertThat(firstTask.delay).isEqualTo(REGULAR_DELAY); + TestScheduler.Task firstTask = assertAndGetSingleCurrentTask(); + assertThat(firstTask.getDelay()).isEqualTo(REGULAR_DELAY); // Verify initial task creates next one - scheduledTasks.clear(); + scheduler.clearTasks(); requestSender.enqueueResponse(createSuccessfulResponse(new ServerToAgent.Builder().build())); firstTask.run(); - assertThat(scheduledTasks).hasSize(1); + assertThat(scheduler.getScheduledTasks()).hasSize(1); // Check on-demand requests don't create subsequent tasks requestSender.enqueueResponse(createSuccessfulResponse(new ServerToAgent.Builder().build())); httpRequestService.sendRequest(); - assertThat(scheduledTasks).hasSize(1); + assertThat(scheduler.getScheduledTasks()).hasSize(1); } @Test @@ -128,14 +118,14 @@ void verifySendingRequest_happyPath() { @Test void verifyWhenSendingOnDemandRequest_andDelayChanges() { // Initial state - assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(REGULAR_DELAY); + assertThat(assertAndGetSingleCurrentTask().getDelay()).isEqualTo(REGULAR_DELAY); // Trigger delay strategy change requestSender.enqueueResponse(createFailedResponse(503)); httpRequestService.sendRequest(); // Expected state - assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(RETRY_DELAY); + assertThat(assertAndGetSingleCurrentTask().getDelay()).isEqualTo(RETRY_DELAY); } @Test @@ -252,30 +242,30 @@ void verifySendingRequest_duringRegularMode() { private void verifyRetryDelayOnError( HttpSender.Response errorResponse, Duration expectedRetryDelay) { requestSender.enqueueResponse(errorResponse); - ScheduledTask previousTask = assertAndGetSingleCurrentTask(); + TestScheduler.Task previousTask = assertAndGetSingleCurrentTask(); previousTask.run(); verifySingleRequestSent(); verify(periodicRetryDelay).reset(); verify(callback).onRequestFailed(any()); - ScheduledTask retryTask = assertAndGetSingleCurrentTask(); - assertThat(retryTask.delay).isEqualTo(expectedRetryDelay); + TestScheduler.Task retryTask = assertAndGetSingleCurrentTask(); + assertThat(retryTask.getDelay()).isEqualTo(expectedRetryDelay); // Retry with another error clearInvocations(callback); - scheduledTasks.clear(); + scheduler.clearTasks(); requestSender.enqueueResponse(createFailedResponse(500)); retryTask.run(); verifySingleRequestSent(); verify(callback).onRequestFailed(any()); - ScheduledTask retryTask2 = assertAndGetSingleCurrentTask(); - assertThat(retryTask2.delay).isEqualTo(expectedRetryDelay); + TestScheduler.Task retryTask2 = assertAndGetSingleCurrentTask(); + assertThat(retryTask2.getDelay()).isEqualTo(expectedRetryDelay); // Retry with a success clearInvocations(callback); - scheduledTasks.clear(); + scheduler.clearTasks(); ServerToAgent serverToAgent = new ServerToAgent.Builder().build(); requestSender.enqueueResponse(createSuccessfulResponse(serverToAgent)); retryTask2.run(); @@ -283,16 +273,17 @@ private void verifyRetryDelayOnError( verify(periodicRequestDelay).reset(); verifySingleRequestSent(); verifyRequestSuccessCallback(serverToAgent); - assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(REGULAR_DELAY); + assertThat(assertAndGetSingleCurrentTask().getDelay()).isEqualTo(REGULAR_DELAY); } - private Request createRequestSupplier() { + private Request createRequest() { AgentToServer agentToServer = new AgentToServer.Builder().sequence_num(10).build(); requestSize = agentToServer.encodeByteString().size(); return Request.create(agentToServer); } - private ScheduledTask assertAndGetSingleCurrentTask() { + private TestScheduler.Task assertAndGetSingleCurrentTask() { + List scheduledTasks = scheduler.getScheduledTasks(); assertThat(scheduledTasks).hasSize(1); return scheduledTasks.get(0); } @@ -313,33 +304,6 @@ private void verifyRequestFailedCallback(int errorCode) { assertThat(captor.getValue().getMessage()).isEqualTo("Error message"); } - private ScheduledExecutorService createTestScheduleExecutorService() { - ScheduledExecutorService service = mock(); - - lenient() - .doAnswer( - invocation -> { - Runnable runnable = invocation.getArgument(0); - runnable.run(); - return null; - }) - .when(service) - .execute(any()); - - when(service.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) - .thenAnswer( - invocation -> { - ScheduledTask task = - new ScheduledTask(invocation.getArgument(0), invocation.getArgument(1)); - - scheduledTasks.add(task); - - return task; - }); - - return service; - } - private static HttpSender.Response createSuccessfulResponse(ServerToAgent serverToAgent) { return createSuccessfulResponse(serverToAgent.encodeByteString().toByteArray()); } @@ -370,32 +334,6 @@ private static PeriodicDelayWithSuggestion createPeriodicDelayWithSuggestionSupp return spy(new PeriodicDelayWithSuggestion(delay)); } - private static class PeriodicDelayWithSuggestion - implements PeriodicDelay, AcceptsDelaySuggestion { - private final Duration initialDelay; - private Duration currentDelay; - - private PeriodicDelayWithSuggestion(Duration initialDelay) { - this.initialDelay = initialDelay; - currentDelay = initialDelay; - } - - @Override - public void suggestDelay(Duration delay) { - currentDelay = delay; - } - - @Override - public Duration getNextDelay() { - return currentDelay; - } - - @Override - public void reset() { - currentDelay = initialDelay; - } - } - private static class TestHttpSender implements HttpSender { private final List requests = new ArrayList<>(); @@ -438,55 +376,4 @@ private RequestParams(int contentLength) { } } } - - private class ScheduledTask implements ScheduledFuture { - private final Runnable runnable; - private final Duration delay; - - public void run() { - get(); - } - - private ScheduledTask(Runnable runnable, long timeNanos) { - this.runnable = runnable; - this.delay = Duration.ofNanos(timeNanos); - } - - @Override - public long getDelay(@NotNull TimeUnit unit) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return scheduledTasks.remove(this); - } - - @Override - public boolean isCancelled() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isDone() { - throw new UnsupportedOperationException(); - } - - @Override - public Object get() { - scheduledTasks.remove(this); - runnable.run(); - return null; - } - - @Override - public Object get(long timeout, @NotNull TimeUnit unit) { - throw new UnsupportedOperationException(); - } - - @Override - public int compareTo(@NotNull Delayed o) { - throw new UnsupportedOperationException(); - } - } } diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/PeriodicDelayWithSuggestion.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/PeriodicDelayWithSuggestion.java new file mode 100644 index 000000000..071067611 --- /dev/null +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/PeriodicDelayWithSuggestion.java @@ -0,0 +1,35 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.request.service; + +import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion; +import io.opentelemetry.opamp.client.internal.request.delay.PeriodicDelay; +import java.time.Duration; + +public class PeriodicDelayWithSuggestion implements PeriodicDelay, AcceptsDelaySuggestion { + private final Duration initialDelay; + private Duration currentDelay; + + public PeriodicDelayWithSuggestion(Duration initialDelay) { + this.initialDelay = initialDelay; + currentDelay = initialDelay; + } + + @Override + public void suggestDelay(Duration delay) { + currentDelay = delay; + } + + @Override + public Duration getNextDelay() { + return currentDelay; + } + + @Override + public void reset() { + currentDelay = initialDelay; + } +} diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/TestScheduler.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/TestScheduler.java new file mode 100644 index 000000000..18379a421 --- /dev/null +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/TestScheduler.java @@ -0,0 +1,120 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.request.service; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Delayed; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.jetbrains.annotations.NotNull; + +public final class TestScheduler { + private final List tasks = new ArrayList<>(); + private final ScheduledExecutorService service = createTestScheduleExecutorService(); + + public ScheduledExecutorService getMockService() { + return service; + } + + public List getScheduledTasks() { + return Collections.unmodifiableList(tasks); + } + + public void clearTasks() { + tasks.clear(); + } + + private ScheduledExecutorService createTestScheduleExecutorService() { + ScheduledExecutorService service = mock(); + + lenient() + .doAnswer( + invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }) + .when(service) + .execute(any()); + + lenient() + .when(service.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .thenAnswer( + invocation -> { + Task task = new Task(invocation.getArgument(0), invocation.getArgument(1)); + + tasks.add(task); + + return task; + }); + + return service; + } + + public class Task implements ScheduledFuture { + private final Runnable runnable; + private final Duration delay; + + public void run() { + get(); + } + + private Task(Runnable runnable, long timeNanos) { + this.runnable = runnable; + this.delay = Duration.ofNanos(timeNanos); + } + + public Duration getDelay() { + return delay; + } + + @Override + public long getDelay(@NotNull TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return tasks.remove(this); + } + + @Override + public boolean isCancelled() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDone() { + throw new UnsupportedOperationException(); + } + + @Override + public Object get() { + tasks.remove(this); + runnable.run(); + return null; + } + + @Override + public Object get(long timeout, @NotNull TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public int compareTo(@NotNull Delayed o) { + throw new UnsupportedOperationException(); + } + } +} diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestServiceTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestServiceTest.java new file mode 100644 index 000000000..afca33fd0 --- /dev/null +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestServiceTest.java @@ -0,0 +1,336 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.opamp.client.internal.request.service; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.protobuf.CodedOutputStream; +import io.opentelemetry.opamp.client.internal.connectivity.websocket.WebSocket; +import io.opentelemetry.opamp.client.internal.request.Request; +import io.opentelemetry.opamp.client.internal.response.OpampServerResponseError; +import io.opentelemetry.opamp.client.internal.response.Response; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.time.Duration; +import opamp.proto.AgentToServer; +import opamp.proto.RetryInfo; +import opamp.proto.ServerErrorResponse; +import opamp.proto.ServerErrorResponseType; +import opamp.proto.ServerToAgent; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class WebSocketRequestServiceTest { + @Mock private WebSocket webSocket; + @Mock private RequestService.Callback callback; + @Mock private PeriodicDelayWithSuggestion retryDelay; + private Request request; + private TestScheduler scheduler; + private WebSocketRequestService requestService; + private static final Duration INITIAL_RETRY_DELAY = Duration.ofSeconds(1); + + @BeforeEach + void setUp() { + lenient().when(retryDelay.getNextDelay()).thenReturn(INITIAL_RETRY_DELAY); + scheduler = new TestScheduler(); + requestService = new WebSocketRequestService(webSocket, retryDelay, scheduler.getMockService()); + } + + @Test + void verifySuccessfulStart() { + startService(); + verify(webSocket).open(requestService); + + // When opening successfully, notify callback + requestService.onOpen(); + verify(callback).onConnectionSuccess(); + verifyNoMoreInteractions(callback); + + // It shouldn't allow starting again + try { + startService(); + fail(); + } catch (IllegalStateException e) { + assertThat(e).hasMessage("The service has already started"); + } + } + + @Test + void verifyFailedStart() { + startService(); + verify(webSocket).open(requestService); + + // When failing while opening, notify callback + Throwable t = mock(); + requestService.onFailure(t); + verify(retryDelay).reset(); + verify(callback).onConnectionFailed(t); + verifyNoMoreInteractions(callback); + + // Check connection retry is scheduled + assertThat(scheduler.getScheduledTasks()).hasSize(1); + assertThat(scheduler.getScheduledTasks().get(0).getDelay()).isEqualTo(INITIAL_RETRY_DELAY); + + // It shouldn't allow starting again + try { + startService(); + fail(); + } catch (IllegalStateException e) { + assertThat(e).hasMessage("The service has already started"); + } + + // It shouldn't schedule more than one retry at a time + clearInvocations(retryDelay, callback); + requestService.onFailure(t); + verify(callback).onConnectionFailed(t); + verifyNoInteractions(retryDelay); + verifyNoMoreInteractions(callback); + assertThat(scheduler.getScheduledTasks()).hasSize(1); + + // Execute retry with new delay + clearInvocations(webSocket, callback); + when(retryDelay.getNextDelay()).thenReturn(Duration.ofSeconds(5)); + scheduler.getScheduledTasks().get(0).run(); + assertThat(scheduler.getScheduledTasks()).isEmpty(); + verify(webSocket).open(requestService); + + // Fail again + requestService.onFailure(t); + verify(retryDelay, never()).reset(); + verify(callback).onConnectionFailed(t); + + // A new retry has been scheduled + assertThat(scheduler.getScheduledTasks()).hasSize(1); + assertThat(scheduler.getScheduledTasks().get(0).getDelay()).isEqualTo(Duration.ofSeconds(5)); + + // Execute retry again + clearInvocations(webSocket, callback); + scheduler.getScheduledTasks().get(0).run(); + assertThat(scheduler.getScheduledTasks()).isEmpty(); + verify(webSocket).open(requestService); + + // Succeed + requestService.onOpen(); + verify(callback).onConnectionSuccess(); + verifyNoMoreInteractions(callback); + + // Fail at some point + clearInvocations(callback); + requestService.onFailure(t); + verify(callback).onConnectionFailed(t); + verifyNoMoreInteractions(callback); + verify(retryDelay).reset(); + assertThat(scheduler.getScheduledTasks()).hasSize(1); + } + + @Test + void verifySendRequest() { + // Validate when not running + try { + requestService.sendRequest(); + fail(); + } catch (IllegalStateException e) { + assertThat(e).hasMessage("The service is not running"); + } + + startService(); + + // Successful send + when(webSocket.send(any())).thenReturn(true); + requestService.sendRequest(); + verify(webSocket).send(getExpectedOutgoingBytes()); + + // Check there are no pending requests + clearInvocations(webSocket); + requestService.onOpen(); + verifyNoInteractions(webSocket); + + // Failed send + when(webSocket.send(any())).thenReturn(false); + requestService.sendRequest(); + clearInvocations(webSocket); + + // Check pending request + when(webSocket.send(any())).thenReturn(true); + requestService.onOpen(); + verify(webSocket).send(getExpectedOutgoingBytes()); + } + + @Test + void verifyOnMessage() { + startService(); + + // Successful message + ServerToAgent serverToAgent = new ServerToAgent.Builder().build(); + requestService.onMessage(createServerToAgentPayload(serverToAgent)); + verify(callback).onRequestSuccess(Response.create(serverToAgent)); + verifyNoMoreInteractions(callback); + assertThat(scheduler.getScheduledTasks()).isEmpty(); + + // Regular error message + ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class); + clearInvocations(callback); + serverToAgent = + new ServerToAgent.Builder() + .error_response(new ServerErrorResponse.Builder().error_message("A message").build()) + .build(); + requestService.onMessage(createServerToAgentPayload(serverToAgent)); + verify(callback).onRequestFailed(throwableCaptor.capture()); + verifyNoMoreInteractions(callback); + OpampServerResponseError error = (OpampServerResponseError) throwableCaptor.getValue(); + assertThat(error.getMessage()).isEqualTo("A message"); + assertThat(scheduler.getScheduledTasks()).isEmpty(); + + // Error message with unavailable status + clearInvocations(callback); + serverToAgent = + new ServerToAgent.Builder() + .error_response( + new ServerErrorResponse.Builder() + .type(ServerErrorResponseType.ServerErrorResponseType_Unavailable) + .error_message("Try later") + .build()) + .build(); + requestService.onMessage(createServerToAgentPayload(serverToAgent)); + verify(callback).onRequestFailed(throwableCaptor.capture()); + verifyNoMoreInteractions(callback); + OpampServerResponseError unavailableError = + (OpampServerResponseError) throwableCaptor.getValue(); + assertThat(unavailableError.getMessage()).isEqualTo("Try later"); + assertThat(scheduler.getScheduledTasks()).hasSize(1); + verify(retryDelay, never()).suggestDelay(any()); + + // Reset scheduled retry + scheduler.getScheduledTasks().get(0).run(); + requestService.onOpen(); + + // Error message with unavailable status and suggested delay + Duration suggestedDelay = Duration.ofSeconds(10); + clearInvocations(callback, retryDelay); + serverToAgent = + new ServerToAgent.Builder() + .error_response( + new ServerErrorResponse.Builder() + .type(ServerErrorResponseType.ServerErrorResponseType_Unavailable) + .retry_info( + new RetryInfo.Builder() + .retry_after_nanoseconds(suggestedDelay.toNanos()) + .build()) + .build()) + .build(); + requestService.onMessage(createServerToAgentPayload(serverToAgent)); + verify(callback).onRequestFailed(throwableCaptor.capture()); + verifyNoMoreInteractions(callback); + OpampServerResponseError unavailableErrorWithSuggestedDelay = + (OpampServerResponseError) throwableCaptor.getValue(); + assertThat(unavailableErrorWithSuggestedDelay.getMessage()).isEmpty(); + assertThat(scheduler.getScheduledTasks()).hasSize(1); + verify(retryDelay).suggestDelay(suggestedDelay); + } + + @Test + void verifyStop() { + startService(); + + requestService.stop(); + + InOrder inOrder = inOrder(webSocket); + inOrder.verify(webSocket).send(getExpectedOutgoingBytes()); + inOrder.verify(webSocket).close(1000, null); + verify(scheduler.getMockService()).shutdown(); + + // If something fails afterward, no retry should get scheduled. + requestService.onFailure(mock()); + verifyNoInteractions(retryDelay); + assertThat(scheduler.getScheduledTasks()).isEmpty(); + + // If onClosed is called afterward, no retry should get scheduled. + requestService.onClosed(); + verifyNoInteractions(retryDelay); + assertThat(scheduler.getScheduledTasks()).isEmpty(); + + // If a new message with a server unavailable error arrives afterward, no retry should get + // scheduled. + ServerToAgent serverToAgent = + new ServerToAgent.Builder() + .error_response( + new ServerErrorResponse.Builder() + .type(ServerErrorResponseType.ServerErrorResponseType_Unavailable) + .build()) + .build(); + requestService.onMessage(createServerToAgentPayload(serverToAgent)); + verifyNoInteractions(retryDelay); + assertThat(scheduler.getScheduledTasks()).isEmpty(); + + // Requests cannot get enqueued afterward. + try { + requestService.sendRequest(); + fail(); + } catch (IllegalStateException e) { + assertThat(e).hasMessage("This service is already stopped"); + } + + // The service cannot get restarted afterward. + try { + startService(); + fail(); + } catch (IllegalStateException e) { + assertThat(e).hasMessage("This service is already stopped"); + } + } + + private byte[] getExpectedOutgoingBytes() { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + CodedOutputStream codedOutput = CodedOutputStream.newInstance(outputStream); + codedOutput.writeUInt64NoTag(0); + byte[] payload = request.getAgentToServer().encode(); + codedOutput.writeRawBytes(payload); + codedOutput.flush(); + return outputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static byte[] createServerToAgentPayload(ServerToAgent serverToAgent) { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + CodedOutputStream codedOutput = CodedOutputStream.newInstance(outputStream); + codedOutput.writeUInt64NoTag(0); + codedOutput.writeRawBytes(serverToAgent.encode()); + codedOutput.flush(); + return outputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void startService() { + requestService.start(callback, this::createRequest); + } + + private Request createRequest() { + AgentToServer agentToServer = new AgentToServer.Builder().sequence_num(10).build(); + request = Request.create(agentToServer); + return request; + } +}