diff --git a/opamp-client/README.md b/opamp-client/README.md index d8c0369a1..a5aa42fb9 100644 --- a/opamp-client/README.md +++ b/opamp-client/README.md @@ -22,16 +22,16 @@ OpampClient client = .build( new OpampClient.Callbacks() { @Override - public void onConnect() {} + public void onConnect(OpampClient client) {} @Override - public void onConnectFailed(@Nullable Throwable throwable) {} + public void onConnectFailed(OpampClient client, @Nullable Throwable throwable) {} @Override - public void onErrorResponse(ServerErrorResponse errorResponse) {} + public void onErrorResponse(OpampClient client, ServerErrorResponse errorResponse) {} @Override - public void onMessage(MessageData messageData) { + public void onMessage(OpampClient client, MessageData messageData) { AgentRemoteConfig remoteConfig = messageData.getRemoteConfig(); if (remoteConfig != null) { // A remote config was received diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/OpampClient.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/OpampClient.java index 19d63eb11..735fa3340 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/OpampClient.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/OpampClient.java @@ -22,7 +22,7 @@ static OpampClientBuilder builder() { * Sets attributes of the Agent. The attributes will be included in the next outgoing status * report. This is typically used by Agents which allow their AgentDescription to change * dynamically while the OpAMPClient is started. May be also called from {@link - * Callbacks#onMessage(MessageData)}. + * Callbacks#onMessage(OpampClient, MessageData)}. * * @param agentDescription The new agent description. */ @@ -40,16 +40,19 @@ interface Callbacks { * Called when the connection is successfully established to the Server. For WebSocket clients * this is called after the handshake is completed without any error. For HTTP clients this is * called for any request if the response status is OK. + * + * @param client The client that's connected. */ - void onConnect(); + void onConnect(OpampClient client); /** * Called when the connection to the Server cannot be established. May also be called if the * connection is lost and reconnection attempt fails. * + * @param client The client that failed to connect. * @param throwable The exception. */ - void onConnectFailed(@Nullable Throwable throwable); + void onConnectFailed(OpampClient client, @Nullable Throwable throwable); /** * Called when the Server reports an error in response to some previously sent request. Useful @@ -57,9 +60,10 @@ interface Callbacks { * retrying previous operations. The client handles the ErrorResponse_UNAVAILABLE case * internally by performing retries as necessary. * + * @param client The client that received an error response. * @param errorResponse The error returned by the Server. */ - void onErrorResponse(ServerErrorResponse errorResponse); + void onErrorResponse(OpampClient client, ServerErrorResponse errorResponse); /** * Called when the Agent receives a message that needs processing. See {@link MessageData} @@ -70,8 +74,9 @@ interface Callbacks { * onMessage returns. This is advisable if processing can take a long time. In that case * returning quickly is preferable to avoid blocking the {@link OpampClient}. * + * @param client The client that received a message. * @param messageData The server response data that needs processing. */ - void onMessage(MessageData messageData); + void onMessage(OpampClient client, MessageData messageData); } } diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/impl/OpampClientImpl.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/impl/OpampClientImpl.java index e224135ff..1441093d7 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/impl/OpampClientImpl.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/impl/OpampClientImpl.java @@ -146,12 +146,12 @@ public void setRemoteConfigStatus(@Nonnull RemoteConfigStatus remoteConfigStatus @Override public void onConnectionSuccess() { - callbacks.onConnect(); + callbacks.onConnect(this); } @Override public void onConnectionFailed(Throwable throwable) { - callbacks.onConnectFailed(throwable); + callbacks.onConnectFailed(this, throwable); } @Override @@ -168,7 +168,7 @@ public void onRequestFailed(Throwable throwable) { preserveFailedRequestRecipe(); if (throwable instanceof OpampServerResponseException) { ServerErrorResponse errorResponse = ((OpampServerResponseException) throwable).errorResponse; - callbacks.onErrorResponse(errorResponse); + callbacks.onErrorResponse(this, errorResponse); } } @@ -195,7 +195,7 @@ private void handleResponsePayload(ServerToAgent response) { } if (notifyOnMessage) { - callbacks.onMessage(messageBuilder.build()); + callbacks.onMessage(this, messageBuilder.build()); } } diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/impl/OpampClientImplTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/impl/OpampClientImplTest.java index dd5c0b956..020b1b2e2 100644 --- a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/impl/OpampClientImplTest.java +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/impl/OpampClientImplTest.java @@ -8,6 +8,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -199,7 +200,8 @@ void onSuccess_withChangesToReport_notifyCallbackOnMessage() { // Await for onMessage call await().atMost(Duration.ofSeconds(5)).until(() -> callbacks.onMessageCalls.get() == 1); - verify(callbacks).onMessage(MessageData.builder().setRemoteConfig(remoteConfig).build()); + verify(callbacks) + .onMessage(client, MessageData.builder().setRemoteConfig(remoteConfig).build()); } @Test @@ -214,7 +216,7 @@ void onSuccess_withNoChangesToReport_doNotNotifyCallbackOnMessage() { // Giving some time for the callback to get called await().during(Duration.ofSeconds(1)); - verify(callbacks, never()).onMessage(any()); + verify(callbacks, never()).onMessage(eq(client), any()); } @Test @@ -257,8 +259,8 @@ void onConnectionSuccessful_notifyCallback() { await().atMost(Duration.ofSeconds(5)).until(() -> callbacks.onConnectCalls.get() == 1); - verify(callbacks).onConnect(); - verify(callbacks, never()).onConnectFailed(any()); + verify(callbacks).onConnect(client); + verify(callbacks, never()).onConnectFailed(eq(client), any()); } @Test @@ -301,8 +303,8 @@ void onFailedResponse_withServerErrorData_notifyCallback() { await().atMost(Duration.ofSeconds(5)).until(() -> callbacks.onErrorResponseCalls.get() == 1); - verify(callbacks).onErrorResponse(errorResponse); - verify(callbacks, never()).onMessage(any()); + verify(callbacks).onErrorResponse(client, errorResponse); + verify(callbacks, never()).onMessage(eq(client), any()); } @Test @@ -312,7 +314,7 @@ void onConnectionFailed_notifyCallback() { client.onConnectionFailed(throwable); - verify(callbacks).onConnectFailed(throwable); + verify(callbacks).onConnectFailed(client, throwable); } @Test @@ -450,22 +452,22 @@ private static class TestCallbacks implements OpampClient.Callbacks { private final AtomicInteger onMessageCalls = new AtomicInteger(); @Override - public void onConnect() { + public void onConnect(OpampClient client) { onConnectCalls.incrementAndGet(); } @Override - public void onConnectFailed(@Nullable Throwable throwable) { + public void onConnectFailed(OpampClient client, @Nullable Throwable throwable) { onConnectFailedCalls.incrementAndGet(); } @Override - public void onErrorResponse(ServerErrorResponse errorResponse) { + public void onErrorResponse(OpampClient client, ServerErrorResponse errorResponse) { onErrorResponseCalls.incrementAndGet(); } @Override - public void onMessage(MessageData messageData) { + public void onMessage(OpampClient client, MessageData messageData) { onMessageCalls.incrementAndGet(); } }