Skip to content

Commit 4a73aec

Browse files
committed
Add note to explain the use of the queue.
1 parent 4e43370 commit 4a73aec

File tree

2 files changed

+58
-3
lines changed

2 files changed

+58
-3
lines changed

src/main/java/com/iexec/worker/pubsub/SessionCreatedEvent.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright 2020 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package com.iexec.worker.pubsub;
218

319
import lombok.NoArgsConstructor;

src/main/java/com/iexec/worker/pubsub/StompClient.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright 2020 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package com.iexec.worker.pubsub;
218

319
import java.util.Arrays;
@@ -24,6 +40,7 @@
2440
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
2541
import org.springframework.stereotype.Component;
2642
import org.springframework.web.client.RestTemplate;
43+
import org.springframework.web.socket.WebSocketHandler;
2744
import org.springframework.web.socket.client.WebSocketClient;
2845
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
2946
import org.springframework.web.socket.messaging.WebSocketStompClient;
@@ -92,12 +109,34 @@ private void requestNewSession() {
92109
}
93110

94111
/**
95-
* Listen to new session request events and refresh the websocket
112+
* Listen to session request events and refresh the websocket
96113
* connection by establishing a new STOMP session. Only one of
97114
* the received requests in a fixed time interval
98-
* (SESSION_REFRESH_DELAY) will be processed. We use @Scheduled
99-
* to start the watcher asynchronously with an initial delay and
115+
* {@code SESSION_REFRESH_DELAY} will be processed. We use
116+
* {@code @Scheduled} to start the watcher asynchronously and
100117
* restart it in case a problem occurs.
118+
* <br>
119+
*
120+
* <p><b>Note:</b> the reason we use a queue is because the
121+
* method {@link SessionHandler#handleTransportError()} is called,
122+
* in some cases, two times to handle two different exceptions.
123+
* This occurs when the connection is, for whatever reason,
124+
* brutally terminated while a message is being transmitted.
125+
* The first call is to handle the incomplete body message parsing
126+
* problem ({@code Premature end of chunk coded message body:
127+
* closing chunk expected}).
128+
* The second call happens after the connection is closed in
129+
* {@link WebSocketHandler#afterConnectionClosed()}.
130+
* So trying to directly trigger new connection attempts from
131+
* {@link SessionHandler#handleTransportError()} would result
132+
* in parallel zombie threads trying to establish a new session
133+
* each.
134+
* Instead, each call to this method adds a
135+
* {@link SessionRequestEvent} to the queue. We wait for a short
136+
* period of time to collect all these requests coming from
137+
* different calls (and possibly different threads), then we send
138+
* only one request to the server. This process is repeated until
139+
* the websocket connection is reestablished again.
101140
*
102141
* @throws InterruptedException
103142
*/

0 commit comments

Comments
 (0)