Skip to content

Commit 81c2554

Browse files
committed
[SCB-2894] optimize WebSocket closing race condition and add WebSocket status
1 parent 5aed958 commit 81c2554

File tree

3 files changed

+67
-3
lines changed

3 files changed

+67
-3
lines changed

swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/AbstractBaseWebSocket.java

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,22 @@
1919

2020
import java.util.concurrent.CompletableFuture;
2121

22+
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
23+
2224
/**
2325
* AbstractBaseWebSocket
2426
*/
2527
public abstract class AbstractBaseWebSocket implements WebSocket {
2628
private WebSocketAdapter webSocketAdapter;
2729

30+
private Status status = Status.CREATED;
31+
32+
private CompletableFuture<Void> closeFuture;
33+
34+
private Short closeStatusCode;
35+
36+
private String closeReason;
37+
2838
@Override
2939
public CompletableFuture<Void> sendMessage(WebSocketMessage<?> message) {
3040
return webSocketAdapter.sendMessage(message);
@@ -37,21 +47,39 @@ public CompletableFuture<Void> sendFrame(WebSocketFrame frame) {
3747

3848
@Override
3949
public CompletableFuture<Void> close() {
40-
return webSocketAdapter.close((short) 1000, "NORMAL");
50+
return this.close((short) WebSocketCloseStatus.NORMAL_CLOSURE.code(),
51+
WebSocketCloseStatus.NORMAL_CLOSURE.reasonText());
4152
}
4253

4354
@Override
4455
public CompletableFuture<Void> close(Short closeStatusCode, String closeReason) {
45-
return webSocketAdapter.close(closeStatusCode, closeReason);
56+
synchronized (this) {
57+
if (status == Status.WAITING_TO_CLOSE || status == Status.CLOSING || status == Status.CLOSED) {
58+
return CompletableFuture.completedFuture(null);
59+
}
60+
status = Status.WAITING_TO_CLOSE;
61+
this.closeStatusCode = closeStatusCode;
62+
this.closeReason = closeReason;
63+
if (webSocketAdapter == null) {
64+
// the case that close when WebSocket still not complete handshake
65+
closeFuture = new CompletableFuture<>();
66+
return closeFuture;
67+
}
68+
}
69+
status = Status.CLOSING;
70+
return webSocketAdapter.close(closeStatusCode, closeReason)
71+
.whenComplete((v, t) -> status = Status.CLOSED);
4672
}
4773

4874
@Override
4975
public void pause() {
76+
status = Status.PAUSED;
5077
webSocketAdapter.pause();
5178
}
5279

5380
@Override
5481
public void resume() {
82+
status = Status.RUNNING;
5583
webSocketAdapter.resume();
5684
}
5785

@@ -78,4 +106,29 @@ public void onWriteQueueDrain() {
78106
public void setWebSocketAdapter(WebSocketAdapter webSocketAdapter) {
79107
this.webSocketAdapter = webSocketAdapter;
80108
}
109+
110+
public void startWorking() {
111+
synchronized (this) {
112+
if (status == Status.WAITING_TO_CLOSE) {
113+
status = Status.CLOSING;
114+
webSocketAdapter.close(closeStatusCode, closeReason)
115+
.whenComplete((v, t) -> {
116+
status = Status.CLOSED;
117+
if (t != null) {
118+
closeFuture.completeExceptionally(t);
119+
} else {
120+
closeFuture.complete(null);
121+
}
122+
});
123+
return;
124+
}
125+
status = Status.RUNNING;
126+
}
127+
onConnectionReady();
128+
}
129+
130+
@Override
131+
public Status getStatus() {
132+
return status;
133+
}
81134
}

swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocket.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,15 @@ default void onFrame(WebSocketFrame frame) {
5656
boolean writeQueueFull();
5757

5858
void onWriteQueueDrain();
59+
60+
Status getStatus();
61+
62+
enum Status {
63+
CREATED,
64+
RUNNING,
65+
PAUSED,
66+
WAITING_TO_CLOSE,
67+
CLOSING,
68+
CLOSED
69+
}
5970
}

transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ private void linkVertxDrainHandler() {
162162

163163
private void startWorking() {
164164
scheduleTask(
165-
bizWebSocket::onConnectionReady);
165+
bizWebSocket::startWorking);
166166
}
167167

168168
private void scheduleTask(Runnable task) {

0 commit comments

Comments
 (0)