2929import opamp .proto .ServerToAgent ;
3030
3131public final class WebSocketRequestService implements RequestService , WebSocket .Listener {
32+ private static final PeriodicDelay DEFAULT_DELAY_BETWEEN_RETRIES =
33+ PeriodicDelay .ofFixedDuration (Duration .ofSeconds (30 ));
34+
3235 private final WebSocket webSocket ;
33- private final PeriodicDelay periodicRetryDelay ;
34- private final AtomicBoolean retryingConnection = new AtomicBoolean (false );
35- private final AtomicBoolean nextRetryScheduled = new AtomicBoolean (false );
3636 private final AtomicBoolean isRunning = new AtomicBoolean (false );
3737 private final AtomicBoolean hasStopped = new AtomicBoolean (false );
38+ private final ConnectionStatus connectionStatus ;
3839 private final ScheduledExecutorService executorService ;
39- public static final PeriodicDelay DEFAULT_DELAY_BETWEEN_RETRIES =
40- PeriodicDelay .ofFixedDuration (Duration .ofSeconds (30 ));
4140
4241 /** Defined <a href="https://datatracker.ietf.org/doc/html/rfc6455#section-7.4.1">here</a>. */
4342 private static final int WEBSOCKET_NORMAL_CLOSURE_CODE = 1000 ;
@@ -77,8 +76,8 @@ public static WebSocketRequestService create(
7776 PeriodicDelay periodicRetryDelay ,
7877 ScheduledExecutorService executorService ) {
7978 this .webSocket = webSocket ;
80- this .periodicRetryDelay = periodicRetryDelay ;
8179 this .executorService = executorService ;
80+ this .connectionStatus = new ConnectionStatus (periodicRetryDelay );
8281 }
8382
8483 @ Override
@@ -154,7 +153,7 @@ public void stop() {
154153
155154 @ Override
156155 public void onOpen () {
157- retryingConnection . set ( false );
156+ connectionStatus . success ( );
158157 getCallback ().onConnectionSuccess ();
159158 synchronized (hasPendingRequestLock ) {
160159 if (hasPendingRequest ) {
@@ -200,36 +199,14 @@ private void handleServerError(ServerErrorResponse errorResponse) {
200199 }
201200
202201 webSocket .close (WEBSOCKET_NORMAL_CLOSURE_CODE , null );
203- scheduleConnectionRetry (retryAfter );
202+ connectionStatus . retryAfter (retryAfter );
204203 }
205204 }
206205
207206 private static boolean serverIsUnavailable (ServerErrorResponse errorResponse ) {
208207 return errorResponse .type .equals (ServerErrorResponseType .ServerErrorResponseType_Unavailable );
209208 }
210209
211- @ SuppressWarnings ("FutureReturnValueIgnored" )
212- private void scheduleConnectionRetry (@ Nullable Duration retryAfter ) {
213- if (hasStopped .get ()) {
214- return ;
215- }
216- if (retryingConnection .compareAndSet (false , true )) {
217- periodicRetryDelay .reset ();
218- if (retryAfter != null && periodicRetryDelay instanceof AcceptsDelaySuggestion ) {
219- ((AcceptsDelaySuggestion ) periodicRetryDelay ).suggestDelay (retryAfter );
220- }
221- }
222- if (nextRetryScheduled .compareAndSet (false , true )) {
223- executorService .schedule (
224- this ::retryConnection , periodicRetryDelay .getNextDelay ().toNanos (), TimeUnit .NANOSECONDS );
225- }
226- }
227-
228- private void retryConnection () {
229- nextRetryScheduled .set (false );
230- startConnection ();
231- }
232-
233210 @ Override
234211 public void onClosing () {
235212 // Noop
@@ -238,17 +215,57 @@ public void onClosing() {
238215 @ Override
239216 public void onClosed () {
240217 // If this service isn't stopped, we should retry connecting.
241- scheduleConnectionRetry (null );
218+ connectionStatus . retryAfter (null );
242219 }
243220
244221 @ Override
245222 public void onFailure (Throwable t ) {
246223 getCallback ().onConnectionFailed (t );
247- scheduleConnectionRetry (null );
224+ connectionStatus . retryAfter (null );
248225 }
249226
250227 @ Nonnull
251228 private Callback getCallback () {
252229 return Objects .requireNonNull (callback );
253230 }
231+
232+ private class ConnectionStatus {
233+ private final PeriodicDelay periodicRetryDelay ;
234+ private final AtomicBoolean retryingConnection = new AtomicBoolean (false );
235+ private final AtomicBoolean nextRetryScheduled = new AtomicBoolean (false );
236+
237+ ConnectionStatus (PeriodicDelay periodicRetryDelay ) {
238+ this .periodicRetryDelay = periodicRetryDelay ;
239+ }
240+
241+ void success () {
242+ retryingConnection .set (false );
243+ }
244+
245+ @ SuppressWarnings ("FutureReturnValueIgnored" )
246+ void retryAfter (@ Nullable Duration retryAfter ) {
247+ if (hasStopped .get ()) {
248+ return ;
249+ }
250+
251+ if (retryingConnection .compareAndSet (false , true )) {
252+ periodicRetryDelay .reset ();
253+ if (retryAfter != null && periodicRetryDelay instanceof AcceptsDelaySuggestion ) {
254+ ((AcceptsDelaySuggestion ) periodicRetryDelay ).suggestDelay (retryAfter );
255+ }
256+ }
257+
258+ if (nextRetryScheduled .compareAndSet (false , true )) {
259+ executorService .schedule (
260+ this ::retryConnection ,
261+ periodicRetryDelay .getNextDelay ().toNanos (),
262+ TimeUnit .NANOSECONDS );
263+ }
264+ }
265+
266+ private void retryConnection () {
267+ nextRetryScheduled .set (false );
268+ startConnection ();
269+ }
270+ }
254271}
0 commit comments