Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions opamp-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ OpampClient client =
.build(
new OpampClient.Callbacks() {
@Override
public void onConnect() {}
public void onConnect(OpampClient client) {}

@Override
public void onConnectFailed(@Nullable Throwable throwable) {}
public void onConnectFailed(OpampClient client, @Nullable Throwable throwable) {}

@Override
public void onErrorResponse(ServerErrorResponse errorResponse) {}
public void onErrorResponse(OpampClient client, ServerErrorResponse errorResponse) {}

@Override
public void onMessage(MessageData messageData) {
public void onMessage(OpampClient client, MessageData messageData) {
AgentRemoteConfig remoteConfig = messageData.getRemoteConfig();
if (remoteConfig != null) {
// A remote config was received
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ static OpampClientBuilder builder() {
* Sets attributes of the Agent. The attributes will be included in the next outgoing status
* report. This is typically used by Agents which allow their AgentDescription to change
* dynamically while the OpAMPClient is started. May be also called from {@link
* Callbacks#onMessage(MessageData)}.
* Callbacks#onMessage(OpampClient, MessageData)}.
*
* @param agentDescription The new agent description.
*/
Expand All @@ -40,26 +40,30 @@ interface Callbacks {
* Called when the connection is successfully established to the Server. For WebSocket clients
* this is called after the handshake is completed without any error. For HTTP clients this is
* called for any request if the response status is OK.
*
* @param client The client that's connected.
*/
void onConnect();
void onConnect(OpampClient client);

/**
* Called when the connection to the Server cannot be established. May also be called if the
* connection is lost and reconnection attempt fails.
*
* @param client The client that failed to connect.
* @param throwable The exception.
*/
void onConnectFailed(@Nullable Throwable throwable);
void onConnectFailed(OpampClient client, @Nullable Throwable throwable);

/**
* Called when the Server reports an error in response to some previously sent request. Useful
* for logging purposes. The Agent should not attempt to process the error by reconnecting or
* retrying previous operations. The client handles the ErrorResponse_UNAVAILABLE case
* internally by performing retries as necessary.
*
* @param client The client that received an error response.
* @param errorResponse The error returned by the Server.
*/
void onErrorResponse(ServerErrorResponse errorResponse);
void onErrorResponse(OpampClient client, ServerErrorResponse errorResponse);

/**
* Called when the Agent receives a message that needs processing. See {@link MessageData}
Expand All @@ -70,8 +74,9 @@ interface Callbacks {
* onMessage returns. This is advisable if processing can take a long time. In that case
* returning quickly is preferable to avoid blocking the {@link OpampClient}.
*
* @param client The client that received a message.
* @param messageData The server response data that needs processing.
*/
void onMessage(MessageData messageData);
void onMessage(OpampClient client, MessageData messageData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,12 @@ public void setRemoteConfigStatus(@Nonnull RemoteConfigStatus remoteConfigStatus

@Override
public void onConnectionSuccess() {
callbacks.onConnect();
callbacks.onConnect(this);
}

@Override
public void onConnectionFailed(Throwable throwable) {
callbacks.onConnectFailed(throwable);
callbacks.onConnectFailed(this, throwable);
}

@Override
Expand All @@ -168,7 +168,7 @@ public void onRequestFailed(Throwable throwable) {
preserveFailedRequestRecipe();
if (throwable instanceof OpampServerResponseException) {
ServerErrorResponse errorResponse = ((OpampServerResponseException) throwable).errorResponse;
callbacks.onErrorResponse(errorResponse);
callbacks.onErrorResponse(this, errorResponse);
}
}

Expand All @@ -195,7 +195,7 @@ private void handleResponsePayload(ServerToAgent response) {
}

if (notifyOnMessage) {
callbacks.onMessage(messageBuilder.build());
callbacks.onMessage(this, messageBuilder.build());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -199,7 +200,8 @@ void onSuccess_withChangesToReport_notifyCallbackOnMessage() {
// Await for onMessage call
await().atMost(Duration.ofSeconds(5)).until(() -> callbacks.onMessageCalls.get() == 1);

verify(callbacks).onMessage(MessageData.builder().setRemoteConfig(remoteConfig).build());
verify(callbacks)
.onMessage(client, MessageData.builder().setRemoteConfig(remoteConfig).build());
}

@Test
Expand All @@ -214,7 +216,7 @@ void onSuccess_withNoChangesToReport_doNotNotifyCallbackOnMessage() {
// Giving some time for the callback to get called
await().during(Duration.ofSeconds(1));

verify(callbacks, never()).onMessage(any());
verify(callbacks, never()).onMessage(eq(client), any());
}

@Test
Expand Down Expand Up @@ -257,8 +259,8 @@ void onConnectionSuccessful_notifyCallback() {

await().atMost(Duration.ofSeconds(5)).until(() -> callbacks.onConnectCalls.get() == 1);

verify(callbacks).onConnect();
verify(callbacks, never()).onConnectFailed(any());
verify(callbacks).onConnect(client);
verify(callbacks, never()).onConnectFailed(eq(client), any());
}

@Test
Expand Down Expand Up @@ -301,8 +303,8 @@ void onFailedResponse_withServerErrorData_notifyCallback() {

await().atMost(Duration.ofSeconds(5)).until(() -> callbacks.onErrorResponseCalls.get() == 1);

verify(callbacks).onErrorResponse(errorResponse);
verify(callbacks, never()).onMessage(any());
verify(callbacks).onErrorResponse(client, errorResponse);
verify(callbacks, never()).onMessage(eq(client), any());
}

@Test
Expand All @@ -312,7 +314,7 @@ void onConnectionFailed_notifyCallback() {

client.onConnectionFailed(throwable);

verify(callbacks).onConnectFailed(throwable);
verify(callbacks).onConnectFailed(client, throwable);
}

@Test
Expand Down Expand Up @@ -450,22 +452,22 @@ private static class TestCallbacks implements OpampClient.Callbacks {
private final AtomicInteger onMessageCalls = new AtomicInteger();

@Override
public void onConnect() {
public void onConnect(OpampClient client) {
onConnectCalls.incrementAndGet();
}

@Override
public void onConnectFailed(@Nullable Throwable throwable) {
public void onConnectFailed(OpampClient client, @Nullable Throwable throwable) {
onConnectFailedCalls.incrementAndGet();
}

@Override
public void onErrorResponse(ServerErrorResponse errorResponse) {
public void onErrorResponse(OpampClient client, ServerErrorResponse errorResponse) {
onErrorResponseCalls.incrementAndGet();
}

@Override
public void onMessage(MessageData messageData) {
public void onMessage(OpampClient client, MessageData messageData) {
onMessageCalls.incrementAndGet();
}
}
Expand Down