-
Notifications
You must be signed in to change notification settings - Fork 169
OpAMP WebSocket service #1969
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OpAMP WebSocket service #1969
Changes from 20 commits
1fb5d4f
52476f7
2a0064a
a8a0cfd
4a9332a
0e0b982
65156f8
a6bf6e0
228a010
c7c74ca
cd80847
ac84012
ba37adc
b392923
c9fba42
522587d
4624809
b371629
dc7f2a9
f08736a
defd60b
061e743
560c92a
950d7c3
cf26a64
83b4c54
3f7f483
3728857
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package io.opentelemetry.opamp.client.internal.connectivity.websocket; | ||
|
|
||
| import java.util.Objects; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import javax.annotation.Nonnull; | ||
| import javax.annotation.Nullable; | ||
| import okhttp3.OkHttpClient; | ||
| import okhttp3.Response; | ||
| import okhttp3.WebSocketListener; | ||
| import okio.ByteString; | ||
|
|
||
| public class OkHttpWebSocket implements WebSocket { | ||
| private final String url; | ||
| private final OkHttpClient client; | ||
| private final AtomicReference<Status> status = new AtomicReference<>(Status.NOT_RUNNING); | ||
| private final AtomicReference<okhttp3.WebSocket> webSocket = new AtomicReference<>(); | ||
|
|
||
| public static OkHttpWebSocket create(String url) { | ||
| return create(url, new OkHttpClient()); | ||
| } | ||
|
|
||
| public static OkHttpWebSocket create(String url, OkHttpClient client) { | ||
| return new OkHttpWebSocket(url, client); | ||
| } | ||
|
|
||
| private OkHttpWebSocket(String url, OkHttpClient client) { | ||
| this.url = url; | ||
| this.client = client; | ||
| } | ||
|
|
||
| @Override | ||
| public void open(Listener listener) { | ||
| if (status.compareAndSet(Status.NOT_RUNNING, Status.STARTING)) { | ||
| okhttp3.Request request = new okhttp3.Request.Builder().url(url).build(); | ||
| webSocket.set(client.newWebSocket(request, new ListenerAdapter(listener))); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public boolean send(byte[] request) { | ||
| if (status.get() != Status.RUNNING) { | ||
| return false; | ||
| } | ||
| return getWebSocket().send(ByteString.of(request)); | ||
| } | ||
|
|
||
| @Override | ||
| public void close(int code, @Nullable String reason) { | ||
| if (status.compareAndSet(Status.RUNNING, Status.CLOSING)) { | ||
| try { | ||
| if (!getWebSocket().close(code, reason)) { | ||
| status.set(Status.NOT_RUNNING); | ||
| } | ||
| } catch (IllegalArgumentException e) { | ||
| status.set(Status.RUNNING); | ||
| throw e; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private okhttp3.WebSocket getWebSocket() { | ||
| return Objects.requireNonNull(webSocket.get()); | ||
| } | ||
|
|
||
| private class ListenerAdapter extends WebSocketListener { | ||
| private final Listener delegate; | ||
|
|
||
| private ListenerAdapter(Listener delegate) { | ||
| this.delegate = delegate; | ||
| } | ||
|
|
||
| @Override | ||
| public void onOpen(@Nonnull okhttp3.WebSocket webSocket, @Nonnull Response response) { | ||
| status.set(Status.RUNNING); | ||
| delegate.onOpen(); | ||
| } | ||
|
|
||
| @Override | ||
| public void onClosing(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) { | ||
| status.set(Status.CLOSING); | ||
| delegate.onClosing(); | ||
| } | ||
|
|
||
| @Override | ||
| public void onClosed(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) { | ||
| status.set(Status.NOT_RUNNING); | ||
| delegate.onClosed(); | ||
| } | ||
|
|
||
| @Override | ||
| public void onFailure( | ||
| @Nonnull okhttp3.WebSocket webSocket, @Nonnull Throwable t, @Nullable Response response) { | ||
| status.set(Status.NOT_RUNNING); | ||
| delegate.onFailure(t); | ||
| } | ||
|
|
||
| @Override | ||
| public void onMessage(@Nonnull okhttp3.WebSocket webSocket, @Nonnull ByteString bytes) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is also
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Binary format should be the only one we need, so yeah I think it's fine to leave the utf-8 message unhandled. |
||
| delegate.onMessage(bytes.toByteArray()); | ||
| } | ||
| } | ||
|
|
||
| enum Status { | ||
| NOT_RUNNING, | ||
| STARTING, | ||
| CLOSING, | ||
| RUNNING | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| /* | ||
| * Copyright The OpenTelemetry Authors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package io.opentelemetry.opamp.client.internal.connectivity.websocket; | ||
|
|
||
| import javax.annotation.Nullable; | ||
|
|
||
| public interface WebSocket { | ||
| /** | ||
| * Starts the websocket connection if it's not yet started or if it has been closed. | ||
| * | ||
| * @param listener Will receive events from the websocket connection. | ||
| */ | ||
| void open(Listener listener); | ||
|
|
||
| /** | ||
| * Stops the websocket connection if running. Nothing will happen if it's already stopped. | ||
| * | ||
| * @param code Status code as defined by <a | ||
| * href="http://tools.ietf.org/html/rfc6455#section-7.4">Section 7.4 of RFC 6455</a> | ||
| * @param reason Reason for shutting down, as explained in <a | ||
| * href="https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.1">Section 5.5.1 of RFC | ||
| * 6455</a> | ||
| */ | ||
| void close(int code, @Nullable String reason); | ||
|
|
||
| /** | ||
| * Sends a message via the websocket connection. | ||
| * | ||
| * @param request The message payload. | ||
| * @return {@code false} If the message can't be dispatched for any reason, whether the websocket | ||
| * isn't running, or the connection isn't established, or it's terminated. {@code true} if the | ||
| * message can get sent. Returning {@code true} doesn't guarantee that the message will arrive | ||
| * at the remote peer. | ||
| */ | ||
| boolean send(byte[] request); | ||
|
|
||
| interface Listener { | ||
|
|
||
| /** | ||
| * Called when the websocket connection is successfully established with the remote peer. The | ||
| * client may start sending messages after this method is called. | ||
| */ | ||
| void onOpen(); | ||
|
|
||
| /** | ||
| * Called when the closing handshake has started. No further messages will be sent after this | ||
| * method call. | ||
| */ | ||
| void onClosing(); | ||
|
|
||
| /** Called when the connection is terminated and no further messages can be transmitted. */ | ||
| void onClosed(); | ||
|
|
||
| /** | ||
| * Called when receiving a message from the remote peer. | ||
| * | ||
| * @param data The payload sent by the remote peer. | ||
| */ | ||
| void onMessage(byte[] data); | ||
|
|
||
| /** | ||
| * Called when the connection is closed or cannot be established due to an error. No messages | ||
| * can be transmitted after this method is called. | ||
| * | ||
| * @param t The error. | ||
| */ | ||
| void onFailure(Throwable t); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to keep it then there should be a comment explaining why this is needed. I think that we don't really need to handle this since using an invalid code or reason that is too long is a caller error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I've added a comment.