Skip to content

Commit 6f3a18a

Browse files
LikeTheSaladlauritotelbot[bot]
authored
OpAMP WebSocket service (#1969)
Co-authored-by: Lauri Tulmin <[email protected]> Co-authored-by: otelbot <[email protected]>
1 parent 83d5cd8 commit 6f3a18a

File tree

12 files changed

+1151
-153
lines changed

12 files changed

+1151
-153
lines changed

dependencyManagement/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ dependencies {
1616
// as runtime dependencies if they are actually used as runtime dependencies)
1717
api(enforcedPlatform("io.opentelemetry.instrumentation:opentelemetry-instrumentation-bom-alpha:${otelInstrumentationVersion}"))
1818
api(enforcedPlatform("com.fasterxml.jackson:jackson-bom:2.19.1"))
19+
api(enforcedPlatform("com.google.protobuf:protobuf-bom:4.31.1"))
1920

2021
constraints {
2122
api("io.opentelemetry.semconv:opentelemetry-semconv:${semconvVersion}")

opamp-client/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ dependencies {
1616
annotationProcessor("com.google.auto.value:auto-value")
1717
compileOnly("com.google.auto.value:auto-value-annotations")
1818
testImplementation("org.mockito:mockito-inline")
19+
testImplementation("com.google.protobuf:protobuf-java-util")
1920
}
2021

2122
val opampProtos = tasks.register<DownloadOpampProtos>("opampProtoDownload", download)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.opamp.client.internal.connectivity.websocket;
7+
8+
import java.util.Objects;
9+
import java.util.concurrent.atomic.AtomicReference;
10+
import javax.annotation.Nonnull;
11+
import javax.annotation.Nullable;
12+
import okhttp3.OkHttpClient;
13+
import okhttp3.Response;
14+
import okhttp3.WebSocketListener;
15+
import okio.ByteString;
16+
17+
public class OkHttpWebSocket implements WebSocket {
18+
private final String url;
19+
private final OkHttpClient client;
20+
private final AtomicReference<Status> status = new AtomicReference<>(Status.NOT_RUNNING);
21+
private final AtomicReference<okhttp3.WebSocket> webSocket = new AtomicReference<>();
22+
23+
public static OkHttpWebSocket create(String url) {
24+
return create(url, new OkHttpClient());
25+
}
26+
27+
public static OkHttpWebSocket create(String url, OkHttpClient client) {
28+
return new OkHttpWebSocket(url, client);
29+
}
30+
31+
private OkHttpWebSocket(String url, OkHttpClient client) {
32+
this.url = url;
33+
this.client = client;
34+
}
35+
36+
@Override
37+
public void open(Listener listener) {
38+
if (status.compareAndSet(Status.NOT_RUNNING, Status.STARTING)) {
39+
okhttp3.Request request = new okhttp3.Request.Builder().url(url).build();
40+
webSocket.set(client.newWebSocket(request, new ListenerAdapter(listener)));
41+
}
42+
}
43+
44+
@Override
45+
public boolean send(byte[] request) {
46+
if (status.get() != Status.RUNNING) {
47+
return false;
48+
}
49+
return getWebSocket().send(ByteString.of(request));
50+
}
51+
52+
@Override
53+
public void close(int code, @Nullable String reason) {
54+
if (status.compareAndSet(Status.RUNNING, Status.CLOSING)) {
55+
try {
56+
if (!getWebSocket().close(code, reason)) {
57+
status.set(Status.NOT_RUNNING);
58+
}
59+
} catch (IllegalArgumentException e) {
60+
status.set(Status.RUNNING);
61+
// Re-throwing as this error happens due to a caller error.
62+
throw e;
63+
}
64+
}
65+
}
66+
67+
private okhttp3.WebSocket getWebSocket() {
68+
return Objects.requireNonNull(webSocket.get());
69+
}
70+
71+
private class ListenerAdapter extends WebSocketListener {
72+
private final Listener delegate;
73+
74+
private ListenerAdapter(Listener delegate) {
75+
this.delegate = delegate;
76+
}
77+
78+
@Override
79+
public void onOpen(@Nonnull okhttp3.WebSocket webSocket, @Nonnull Response response) {
80+
status.set(Status.RUNNING);
81+
delegate.onOpen();
82+
}
83+
84+
@Override
85+
public void onClosing(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) {
86+
status.set(Status.CLOSING);
87+
delegate.onClosing();
88+
}
89+
90+
@Override
91+
public void onClosed(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) {
92+
status.set(Status.NOT_RUNNING);
93+
delegate.onClosed();
94+
}
95+
96+
@Override
97+
public void onFailure(
98+
@Nonnull okhttp3.WebSocket webSocket, @Nonnull Throwable t, @Nullable Response response) {
99+
status.set(Status.NOT_RUNNING);
100+
delegate.onFailure(t);
101+
}
102+
103+
@Override
104+
public void onMessage(@Nonnull okhttp3.WebSocket webSocket, @Nonnull ByteString bytes) {
105+
delegate.onMessage(bytes.toByteArray());
106+
}
107+
}
108+
109+
enum Status {
110+
NOT_RUNNING,
111+
STARTING,
112+
CLOSING,
113+
RUNNING
114+
}
115+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.opamp.client.internal.connectivity.websocket;
7+
8+
import javax.annotation.Nullable;
9+
10+
public interface WebSocket {
11+
/**
12+
* Starts the websocket connection if it's not yet started or if it has been closed.
13+
*
14+
* @param listener Will receive events from the websocket connection.
15+
*/
16+
void open(Listener listener);
17+
18+
/**
19+
* Stops the websocket connection if running. Nothing will happen if it's already stopped.
20+
*
21+
* @param code Status code as defined by <a
22+
* href="http://tools.ietf.org/html/rfc6455#section-7.4">Section 7.4 of RFC 6455</a>
23+
* @param reason Reason for shutting down, as explained in <a
24+
* href="https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.1">Section 5.5.1 of RFC
25+
* 6455</a>
26+
*/
27+
void close(int code, @Nullable String reason);
28+
29+
/**
30+
* Sends a message via the websocket connection.
31+
*
32+
* @param request The message payload.
33+
* @return {@code false} If the message can't be dispatched for any reason, whether the websocket
34+
* isn't running, or the connection isn't established, or it's terminated. {@code true} if the
35+
* message can get sent. Returning {@code true} doesn't guarantee that the message will arrive
36+
* at the remote peer.
37+
*/
38+
boolean send(byte[] request);
39+
40+
interface Listener {
41+
42+
/**
43+
* Called when the websocket connection is successfully established with the remote peer. The
44+
* client may start sending messages after this method is called.
45+
*/
46+
void onOpen();
47+
48+
/**
49+
* Called when the closing handshake has started. No further messages will be sent after this
50+
* method call.
51+
*/
52+
void onClosing();
53+
54+
/** Called when the connection is terminated and no further messages can be transmitted. */
55+
void onClosed();
56+
57+
/**
58+
* Called when receiving a message from the remote peer.
59+
*
60+
* @param data The payload sent by the remote peer.
61+
*/
62+
void onMessage(byte[] data);
63+
64+
/**
65+
* Called when the connection is closed or cannot be established due to an error. No messages
66+
* can be transmitted after this method is called.
67+
*
68+
* @param t The error.
69+
*/
70+
void onFailure(Throwable t);
71+
}
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.opamp.client.internal.request.service;
7+
8+
import java.util.concurrent.Executors;
9+
import java.util.concurrent.ThreadFactory;
10+
import javax.annotation.Nonnull;
11+
12+
final class DaemonThreadFactory implements ThreadFactory {
13+
private final ThreadFactory delegate = Executors.defaultThreadFactory();
14+
15+
@Override
16+
public Thread newThread(@Nonnull Runnable r) {
17+
Thread t = delegate.newThread(r);
18+
try {
19+
t.setDaemon(true);
20+
} catch (SecurityException e) {
21+
// Well, we tried.
22+
}
23+
return t;
24+
}
25+
}

opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,11 @@
2222
import java.util.concurrent.Executors;
2323
import java.util.concurrent.ScheduledExecutorService;
2424
import java.util.concurrent.ScheduledFuture;
25-
import java.util.concurrent.ThreadFactory;
2625
import java.util.concurrent.TimeUnit;
2726
import java.util.concurrent.TimeoutException;
2827
import java.util.concurrent.atomic.AtomicBoolean;
2928
import java.util.concurrent.atomic.AtomicReference;
3029
import java.util.function.Supplier;
31-
import javax.annotation.Nonnull;
3230
import javax.annotation.Nullable;
3331
import opamp.proto.AgentToServer;
3432
import opamp.proto.ServerErrorResponse;
@@ -255,19 +253,4 @@ Duration getNextDelay() {
255253
return currentDelay.getNextDelay();
256254
}
257255
}
258-
259-
private static class DaemonThreadFactory implements ThreadFactory {
260-
private final ThreadFactory delegate = Executors.defaultThreadFactory();
261-
262-
@Override
263-
public Thread newThread(@Nonnull Runnable r) {
264-
Thread t = delegate.newThread(r);
265-
try {
266-
t.setDaemon(true);
267-
} catch (SecurityException e) {
268-
// Well, we tried.
269-
}
270-
return t;
271-
}
272-
}
273256
}

0 commit comments

Comments
 (0)