Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1fb5d4f
Creating WebSocket contract
LikeTheSalad Jun 18, 2025
52476f7
Updating tests to reuse tools and adding javadocs
LikeTheSalad Jun 19, 2025
2a0064a
Adding tests for OkHttpWebSocket
LikeTheSalad Jun 19, 2025
a8a0cfd
Updating tests
LikeTheSalad Jun 19, 2025
4a9332a
Adding tests to WebSocketRequestService
LikeTheSalad Jun 20, 2025
0e0b982
Adding test dependency
LikeTheSalad Jun 20, 2025
65156f8
Clean up
LikeTheSalad Jun 20, 2025
a6bf6e0
Spotless
LikeTheSalad Jun 20, 2025
228a010
Using enum for OkHttpWebSocket statuses
LikeTheSalad Jun 26, 2025
c7c74ca
Using protobuf bom
LikeTheSalad Jun 26, 2025
cd80847
Validating illegal argument exception when closing
LikeTheSalad Jun 26, 2025
ac84012
Updating javadoc wording
LikeTheSalad Jun 26, 2025
ba37adc
Synchronizing access to hasPendingRequest
LikeTheSalad Jun 26, 2025
b392923
Update opamp-client/src/main/java/io/opentelemetry/opamp/client/inter…
LikeTheSalad Jun 27, 2025
c9fba42
Update opamp-client/src/main/java/io/opentelemetry/opamp/client/inter…
LikeTheSalad Jun 27, 2025
522587d
Update opamp-client/src/main/java/io/opentelemetry/opamp/client/inter…
LikeTheSalad Jun 27, 2025
4624809
Merge remote-tracking branch 'origin/opamp-websocket-service' into op…
LikeTheSalad Jun 27, 2025
b371629
Adding onClosing method
LikeTheSalad Jun 27, 2025
dc7f2a9
Using onClosing
LikeTheSalad Jun 27, 2025
f08736a
Merge branch 'main' into opamp-websocket-service
LikeTheSalad Jun 27, 2025
defd60b
Merge branch 'main' into opamp-websocket-service
LikeTheSalad Jul 1, 2025
061e743
Adding comment explaining error rethrowing
LikeTheSalad Jul 1, 2025
560c92a
Using daemon thread factory
LikeTheSalad Jul 1, 2025
950d7c3
./gradlew spotlessApply
otelbot[bot] Jul 1, 2025
cf26a64
Merge branch 'main' into opamp-websocket-service
LikeTheSalad Jul 8, 2025
83b4c54
Describing websocket close code
LikeTheSalad Jul 8, 2025
3f7f483
Adding comment to explain last message when closing the websocket ser…
LikeTheSalad Jul 8, 2025
3728857
Applying changes suggested from reviews
LikeTheSalad Jul 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dependencyManagement/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies {
// as runtime dependencies if they are actually used as runtime dependencies)
api(enforcedPlatform("io.opentelemetry.instrumentation:opentelemetry-instrumentation-bom-alpha:${otelInstrumentationVersion}"))
api(enforcedPlatform("com.fasterxml.jackson:jackson-bom:2.19.1"))
api(enforcedPlatform("com.google.protobuf:protobuf-bom:4.31.1"))

constraints {
api("io.opentelemetry.semconv:opentelemetry-semconv:${semconvVersion}")
Expand Down
1 change: 1 addition & 0 deletions opamp-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies {
annotationProcessor("com.google.auto.value:auto-value")
compileOnly("com.google.auto.value:auto-value-annotations")
testImplementation("org.mockito:mockito-inline")
testImplementation("com.google.protobuf:protobuf-java-util")
}

val opampProtos = tasks.register<DownloadOpampProtos>("opampProtoDownload", download)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.opamp.client.internal.connectivity.websocket;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import okhttp3.OkHttpClient;
import okhttp3.Response;
import okhttp3.WebSocketListener;
import okio.ByteString;

public class OkHttpWebSocket implements WebSocket {
private final String url;
private final OkHttpClient client;
private final AtomicReference<Status> status = new AtomicReference<>(Status.NOT_RUNNING);
private final AtomicReference<okhttp3.WebSocket> webSocket = new AtomicReference<>();

public static OkHttpWebSocket create(String url) {
return create(url, new OkHttpClient());
}

public static OkHttpWebSocket create(String url, OkHttpClient client) {
return new OkHttpWebSocket(url, client);
}

private OkHttpWebSocket(String url, OkHttpClient client) {
this.url = url;
this.client = client;
}

@Override
public void open(Listener listener) {
if (status.compareAndSet(Status.NOT_RUNNING, Status.STARTING)) {
okhttp3.Request request = new okhttp3.Request.Builder().url(url).build();
webSocket.set(client.newWebSocket(request, new ListenerAdapter(listener)));
}
}

@Override
public boolean send(byte[] request) {
if (status.get() != Status.RUNNING) {
return false;
}
return getWebSocket().send(ByteString.of(request));
}

@Override
public void close(int code, @Nullable String reason) {
if (status.compareAndSet(Status.RUNNING, Status.CLOSING)) {
try {
if (!getWebSocket().close(code, reason)) {
status.set(Status.NOT_RUNNING);
}
} catch (IllegalArgumentException e) {
status.set(Status.RUNNING);
// Re-throwing as this error happens due to a caller error.
throw e;
}
}
}

private okhttp3.WebSocket getWebSocket() {
return Objects.requireNonNull(webSocket.get());
}

private class ListenerAdapter extends WebSocketListener {
private final Listener delegate;

private ListenerAdapter(Listener delegate) {
this.delegate = delegate;
}

@Override
public void onOpen(@Nonnull okhttp3.WebSocket webSocket, @Nonnull Response response) {
status.set(Status.RUNNING);
delegate.onOpen();
}

@Override
public void onClosing(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) {
status.set(Status.CLOSING);
delegate.onClosing();
}

@Override
public void onClosed(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) {
status.set(Status.NOT_RUNNING);
delegate.onClosed();
}

@Override
public void onFailure(
@Nonnull okhttp3.WebSocket webSocket, @Nonnull Throwable t, @Nullable Response response) {
status.set(Status.NOT_RUNNING);
delegate.onFailure(t);
}

@Override
public void onMessage(@Nonnull okhttp3.WebSocket webSocket, @Nonnull ByteString bytes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is also fun onMessage(webSocket: WebSocket, text: String) is it fine to leave that unhandled? I guess it is since it isn't used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Binary format should be the only one we need, so yeah I think it's fine to leave the utf-8 message unhandled.

delegate.onMessage(bytes.toByteArray());
}
}

enum Status {
NOT_RUNNING,
STARTING,
CLOSING,
RUNNING
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.opamp.client.internal.connectivity.websocket;

import javax.annotation.Nullable;

public interface WebSocket {
/**
* Starts the websocket connection if it's not yet started or if it has been closed.
*
* @param listener Will receive events from the websocket connection.
*/
void open(Listener listener);

/**
* Stops the websocket connection if running. Nothing will happen if it's already stopped.
*
* @param code Status code as defined by <a
* href="http://tools.ietf.org/html/rfc6455#section-7.4">Section 7.4 of RFC 6455</a>
* @param reason Reason for shutting down, as explained in <a
* href="https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.1">Section 5.5.1 of RFC
* 6455</a>
*/
void close(int code, @Nullable String reason);

/**
* Sends a message via the websocket connection.
*
* @param request The message payload.
* @return {@code false} If the message can't be dispatched for any reason, whether the websocket
* isn't running, or the connection isn't established, or it's terminated. {@code true} if the
* message can get sent. Returning {@code true} doesn't guarantee that the message will arrive
* at the remote peer.
*/
boolean send(byte[] request);

interface Listener {

/**
* Called when the websocket connection is successfully established with the remote peer. The
* client may start sending messages after this method is called.
*/
void onOpen();

/**
* Called when the closing handshake has started. No further messages will be sent after this
* method call.
*/
void onClosing();

/** Called when the connection is terminated and no further messages can be transmitted. */
void onClosed();

/**
* Called when receiving a message from the remote peer.
*
* @param data The payload sent by the remote peer.
*/
void onMessage(byte[] data);

/**
* Called when the connection is closed or cannot be established due to an error. No messages
* can be transmitted after this method is called.
*
* @param t The error.
*/
void onFailure(Throwable t);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.opamp.client.internal.request.service;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nonnull;

final class DaemonThreadFactory implements ThreadFactory {
private final ThreadFactory delegate = Executors.defaultThreadFactory();

@Override
public Thread newThread(@Nonnull Runnable r) {
Thread t = delegate.newThread(r);
try {
t.setDaemon(true);
} catch (SecurityException e) {
// Well, we tried.
}
return t;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import opamp.proto.AgentToServer;
import opamp.proto.ServerErrorResponse;
Expand Down Expand Up @@ -255,19 +253,4 @@ Duration getNextDelay() {
return currentDelay.getNextDelay();
}
}

private static class DaemonThreadFactory implements ThreadFactory {
private final ThreadFactory delegate = Executors.defaultThreadFactory();

@Override
public Thread newThread(@Nonnull Runnable r) {
Thread t = delegate.newThread(r);
try {
t.setDaemon(true);
} catch (SecurityException e) {
// Well, we tried.
}
return t;
}
}
}
Loading
Loading