Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1fb5d4f
Creating WebSocket contract
LikeTheSalad Jun 18, 2025
52476f7
Updating tests to reuse tools and adding javadocs
LikeTheSalad Jun 19, 2025
2a0064a
Adding tests for OkHttpWebSocket
LikeTheSalad Jun 19, 2025
a8a0cfd
Updating tests
LikeTheSalad Jun 19, 2025
4a9332a
Adding tests to WebSocketRequestService
LikeTheSalad Jun 20, 2025
0e0b982
Adding test dependency
LikeTheSalad Jun 20, 2025
65156f8
Clean up
LikeTheSalad Jun 20, 2025
a6bf6e0
Spotless
LikeTheSalad Jun 20, 2025
228a010
Using enum for OkHttpWebSocket statuses
LikeTheSalad Jun 26, 2025
c7c74ca
Using protobuf bom
LikeTheSalad Jun 26, 2025
cd80847
Validating illegal argument exception when closing
LikeTheSalad Jun 26, 2025
ac84012
Updating javadoc wording
LikeTheSalad Jun 26, 2025
ba37adc
Synchronizing access to hasPendingRequest
LikeTheSalad Jun 26, 2025
b392923
Update opamp-client/src/main/java/io/opentelemetry/opamp/client/inter…
LikeTheSalad Jun 27, 2025
c9fba42
Update opamp-client/src/main/java/io/opentelemetry/opamp/client/inter…
LikeTheSalad Jun 27, 2025
522587d
Update opamp-client/src/main/java/io/opentelemetry/opamp/client/inter…
LikeTheSalad Jun 27, 2025
4624809
Merge remote-tracking branch 'origin/opamp-websocket-service' into op…
LikeTheSalad Jun 27, 2025
b371629
Adding onClosing method
LikeTheSalad Jun 27, 2025
dc7f2a9
Using onClosing
LikeTheSalad Jun 27, 2025
f08736a
Merge branch 'main' into opamp-websocket-service
LikeTheSalad Jun 27, 2025
defd60b
Merge branch 'main' into opamp-websocket-service
LikeTheSalad Jul 1, 2025
061e743
Adding comment explaining error rethrowing
LikeTheSalad Jul 1, 2025
560c92a
Using daemon thread factory
LikeTheSalad Jul 1, 2025
950d7c3
./gradlew spotlessApply
otelbot[bot] Jul 1, 2025
cf26a64
Merge branch 'main' into opamp-websocket-service
LikeTheSalad Jul 8, 2025
83b4c54
Describing websocket close code
LikeTheSalad Jul 8, 2025
3f7f483
Adding comment to explain last message when closing the websocket ser…
LikeTheSalad Jul 8, 2025
3728857
Applying changes suggested from reviews
LikeTheSalad Jul 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dependencyManagement/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,6 @@ dependencies {
api("tools.profiler:async-profiler:4.0")
api("com.blogspot.mydailyjava:weak-lock-free:0.18")
api("org.agrona:agrona:1.22.0")
api("com.google.protobuf:protobuf-java-util:4.29.1")
}
}
1 change: 1 addition & 0 deletions opamp-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies {
annotationProcessor("com.google.auto.value:auto-value")
compileOnly("com.google.auto.value:auto-value-annotations")
testImplementation("org.mockito:mockito-inline")
testImplementation("com.google.protobuf:protobuf-java-util")
}

val opampReleaseInfo = tasks.register<Download>("opampLastReleaseInfo") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.opamp.client.internal.connectivity.websocket;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import okhttp3.OkHttpClient;
import okhttp3.Response;
import okhttp3.WebSocketListener;
import okio.ByteString;

public class OkHttpWebSocket implements WebSocket {
private final String url;
private final OkHttpClient client;
private final AtomicBoolean starting = new AtomicBoolean(false);
private final AtomicBoolean closing = new AtomicBoolean(false);
private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicReference<okhttp3.WebSocket> webSocket = new AtomicReference<>();

public static OkHttpWebSocket create(String url) {
return create(url, new OkHttpClient());
}

public static OkHttpWebSocket create(String url, OkHttpClient client) {
return new OkHttpWebSocket(url, client);
}

private OkHttpWebSocket(String url, OkHttpClient client) {
this.url = url;
this.client = client;
}

@Override
public void open(Listener listener) {
if (running.get()) {
return;
}
if (starting.compareAndSet(false, true)) {
okhttp3.Request request = new okhttp3.Request.Builder().url(url).build();
webSocket.set(client.newWebSocket(request, new ListenerAdapter(listener)));
}
}

@Override
public boolean send(byte[] request) {
if (!running.get()) {
return false;
}
return getWebSocket().send(ByteString.of(request));
}

@Override
public void close(int code, @Nullable String reason) {
if (!running.get()) {
return;
}
if (closing.compareAndSet(false, true)) {
if (!getWebSocket().close(code, reason)) {
closing.set(false);
running.set(false);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets say there is a failure at the same time when close is called. Then it is probably possible to reach a situation where the closing flag remains set to true after the close completes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I missed the "IllegalArgumentException" warning from the okhttp method. I've just updated it to avoid this issue.

}
}

private okhttp3.WebSocket getWebSocket() {
return Objects.requireNonNull(webSocket.get());
}

private class ListenerAdapter extends WebSocketListener {
private final Listener delegate;

private ListenerAdapter(Listener delegate) {
this.delegate = delegate;
}

@Override
public void onOpen(@Nonnull okhttp3.WebSocket webSocket, @Nonnull Response response) {
running.set(true);
starting.set(false);
delegate.onOpen();
}

@Override
public void onClosing(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) {
running.set(false);
closing.set(true);
}

@Override
public void onClosed(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) {
running.set(false);
closing.set(false);
delegate.onClosed();
}

@Override
public void onFailure(
@Nonnull okhttp3.WebSocket webSocket, @Nonnull Throwable t, @Nullable Response response) {
running.set(false);
starting.set(false);
closing.set(false);
delegate.onFailure(t);
}

@Override
public void onMessage(@Nonnull okhttp3.WebSocket webSocket, @Nonnull ByteString bytes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is also fun onMessage(webSocket: WebSocket, text: String) is it fine to leave that unhandled? I guess it is since it isn't used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Binary format should be the only one we need, so yeah I think it's fine to leave the utf-8 message unhandled.

delegate.onMessage(bytes.toByteArray());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.opamp.client.internal.connectivity.websocket;

import javax.annotation.Nullable;

public interface WebSocket {
/**
* Starts the websocket connection if it's not yet started or if it has been closed.
*
* @param listener Will receive events from the websocket connection.
*/
void open(Listener listener);

/**
* Stops the websocket connection if running. Nothing will happen if it's already stopped.
*
* @param code Status code as defined by <a
* href="http://tools.ietf.org/html/rfc6455#section-7.4">Section 7.4 of RFC 6455</a>
* @param reason Reason for shutting down, as explained in <a
* href="https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.1">Section 5.5.1 of RFC
* 6455</a>
*/
void close(int code, @Nullable String reason);

/**
* Sends a message via the websocket connection.
*
* @param request The message payload.
* @return {@link Boolean#FALSE} If the message can't get dispatched for any reason, whether the
* websocket isn't running, or the connection isn't established, or it's terminated. {@link
* Boolean#TRUE} if the message can get sent. Returning {@link Boolean#TRUE} doesn't guarantee
* that the message will arrive at the remote peer.
*/
boolean send(byte[] request);

interface Listener {

/**
* Called when the websocket connection is successfully established with the remote peer and may
* start sending messages.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels to me that there is a word missing before may start sending messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've rephrased it a bit, let me know what you think.

*/
void onOpen();

/** Called when the connection is terminated and no further messages can get transmitted. */
void onClosed();

/**
* Called when receiving a message from the remote peer.
*
* @param data The payload sent by the remote peer.
*/
void onMessage(byte[] data);

/**
* Called when the connection is closed or cannot be established due to an error. No messages
* can get transmitted after this method is called.
*
* @param t The error.
*/
void onFailure(Throwable t);
}
}
Loading
Loading