1+ /*
2+ * Copyright 2020 IEXEC BLOCKCHAIN TECH
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
117package com .iexec .worker .pubsub ;
218
319import java .util .Arrays ;
2440import org .springframework .scheduling .concurrent .ConcurrentTaskScheduler ;
2541import org .springframework .stereotype .Component ;
2642import org .springframework .web .client .RestTemplate ;
43+ import org .springframework .web .socket .WebSocketHandler ;
2744import org .springframework .web .socket .client .WebSocketClient ;
2845import org .springframework .web .socket .client .standard .StandardWebSocketClient ;
2946import org .springframework .web .socket .messaging .WebSocketStompClient ;
@@ -92,12 +109,34 @@ private void requestNewSession() {
92109 }
93110
94111 /**
95- * Listen to new session request events and refresh the websocket
112+ * Listen to session request events and refresh the websocket
96113 * connection by establishing a new STOMP session. Only one of
97114 * the received requests in a fixed time interval
98- * ( SESSION_REFRESH_DELAY) will be processed. We use @Scheduled
99- * to start the watcher asynchronously with an initial delay and
115+ * {@code SESSION_REFRESH_DELAY} will be processed. We use
116+ * {@code @Scheduled} to start the watcher asynchronously and
100117 * restart it in case a problem occurs.
118+ * <br>
119+ *
120+ * <p><b>Note:</b> the reason we use a queue is because the
121+ * method {@link SessionHandler#handleTransportError()} is called
122+ * two times to handle connectivity issues when the websocket
123+ * connection is, for whatever reason, brutally terminated while
124+ * a message is being transmitted.
125+ * The first call is to handle the incomplete body message parsing
126+ * problem ({@code Premature end of chunk coded message body:
127+ * closing chunk expected}).
128+ * The second call happens after the connection is closed in
129+ * {@link WebSocketHandler#afterConnectionClosed()}.
130+ * So trying to directly trigger new connection attempts from
131+ * {@link SessionHandler#handleTransportError()} would result
132+ * in parallel zombie threads trying to establish a new session
133+ * each.
134+ * Instead, each call to this method adds a
135+ * {@link SessionRequestEvent} to the queue. We wait for a short
136+ * period of time to collect all these requests coming from
137+ * different calls (and possibly different threads), then we send
138+ * only one request to the server. This process is repeated until
139+ * the websocket connection is reestablished again.
101140 *
102141 * @throws InterruptedException
103142 */
@@ -180,4 +219,4 @@ public void handleTransportError(StompSession session, Throwable exception) {
180219 private class SessionRequestEvent {
181220
182221 }
183- }
222+ }
0 commit comments