Skip to content

Commit 3f4b222

Browse files
authored
Merge pull request #647 from yue9944882/bugfix/flush-websocket-before-closing
Release/flush websocket buffers on abnormal closing
2 parents 607a34b + f99bc24 commit 3f4b222

File tree

2 files changed

+88
-0
lines changed

2 files changed

+88
-0
lines changed

util/src/main/java/io/kubernetes/client/util/WebSocketStreamHandler.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,16 +92,36 @@ protected void handleMessage(int stream, InputStream inStream) throws IOExceptio
9292
public synchronized void close() {
9393
if (state != State.CLOSED) {
9494
state = State.CLOSED;
95+
try {
96+
if (null != socket) {
97+
// code 1000 means "Normal Closure"
98+
socket.close(1000, "Triggered client-side terminate");
99+
log.debug("Successfully closed socket.");
100+
}
101+
} catch (IOException ex) {
102+
log.error("Error on close socket", ex);
103+
// continue to flush and release the buffers
104+
}
95105
// Close all output streams. Caller of getInputStream(int) is responsible
96106
// for closing returned input streams
97107
for (PipedOutputStream out : pipedOutput.values()) {
108+
try {
109+
out.flush();
110+
} catch (IOException ex) {
111+
log.error("Error on flush", ex);
112+
}
98113
try {
99114
out.close();
100115
} catch (IOException ex) {
101116
log.error("Error on close", ex);
102117
}
103118
}
104119
for (OutputStream out : output.values()) {
120+
try {
121+
out.flush();
122+
} catch (IOException ex) {
123+
log.error("Error on flush", ex);
124+
}
105125
try {
106126
out.close();
107127
} catch (IOException ex) {
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package io.kubernetes.client;
2+
3+
import static org.junit.Assert.*;
4+
5+
import com.squareup.okhttp.RequestBody;
6+
import com.squareup.okhttp.ws.WebSocket;
7+
import io.kubernetes.client.util.WebSocketStreamHandler;
8+
import java.io.*;
9+
import okio.Buffer;
10+
import okio.BufferedSink;
11+
import okio.Okio;
12+
import org.junit.Test;
13+
14+
public class WebsocketStreamHandlerTest {
15+
16+
private static String testProtocol = "test-protocol";
17+
18+
@Test
19+
public void testHandlerReceivingData() throws IOException {
20+
int testStreamId = 0;
21+
byte testData = 1;
22+
byte[] testDatas =
23+
new byte[] {(byte) testStreamId, testData, testData}; // first byte stands for stream id,
24+
ByteArrayInputStream testBytesInputStream = new ByteArrayInputStream(testDatas);
25+
26+
WebSocketStreamHandler handler = new WebSocketStreamHandler();
27+
MockWebSocket mockWebSocket = new MockWebSocket();
28+
29+
handler.open(testProtocol, mockWebSocket);
30+
31+
InputStream inputStream = handler.getInputStream(testStreamId);
32+
33+
// handler receiving
34+
handler.bytesMessage(testBytesInputStream);
35+
36+
byte[] receivingData = new byte[16];
37+
inputStream.read(receivingData);
38+
handler.close();
39+
40+
assertEquals(testData, receivingData[0]);
41+
assertEquals(testData, receivingData[1]);
42+
assertTrue(mockWebSocket.closed);
43+
}
44+
45+
private class MockWebSocket implements WebSocket {
46+
private byte[] data;
47+
private boolean pinged = false;
48+
private boolean closed = false;
49+
50+
@Override
51+
public void sendMessage(RequestBody message) throws IOException {
52+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
53+
BufferedSink sink = Okio.buffer(Okio.sink(outputStream));
54+
message.writeTo(sink);
55+
this.data = outputStream.toByteArray();
56+
}
57+
58+
@Override
59+
public void sendPing(Buffer payload) throws IOException {
60+
this.pinged = true;
61+
}
62+
63+
@Override
64+
public void close(int code, String reason) throws IOException {
65+
this.closed = true;
66+
}
67+
}
68+
}

0 commit comments

Comments
 (0)