Skip to content

Commit 3ecf573

Browse files
brendandburnsbrendanburns
authored andcommitted
Buffer writes > 16 MiB into multiple web socket messages.
1 parent e5e6468 commit 3ecf573

File tree

2 files changed

+99
-25
lines changed

2 files changed

+99
-25
lines changed

util/src/main/java/io/kubernetes/client/util/WebSocketStreamHandler.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,20 @@ public void write(byte[] b, int offset, int length) throws IOException {
241241
}
242242
}
243243
}
244-
byte[] buffer = new byte[length + 1];
245-
buffer[0] = stream;
246-
System.arraycopy(b, offset, buffer, 1, length);
247-
WebSocketStreamHandler.this.socket.send(ByteString.of(buffer));
244+
int bytesWritten = 0;
245+
int remaining = length;
246+
while (bytesWritten < length) {
247+
// OkHTTP3 Web Sockets limits buffer size to 16MiB, so cap buffer at 15MiB
248+
int bufferSize = Math.min(remaining, 15 * 1024 * 1024);
249+
byte[] buffer = new byte[bufferSize + 1];
250+
buffer[0] = stream;
251+
System.arraycopy(b, offset + bytesWritten, buffer, 1, bufferSize);
252+
if (!WebSocketStreamHandler.this.socket.send(ByteString.of(buffer))) {
253+
throw new IOException("WebSocket has closed.");
254+
}
255+
bytesWritten += bufferSize;
256+
remaining -= bytesWritten;
257+
}
248258
}
249259
}
250260
}

util/src/test/java/io/kubernetes/client/WebsocketStreamHandlerTest.java

Lines changed: 85 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,19 @@
1212
*/
1313
package io.kubernetes.client;
1414

15+
import static org.junit.Assert.assertArrayEquals;
1516
import static org.junit.Assert.assertEquals;
1617
import static org.junit.Assert.assertTrue;
1718

1819
import io.kubernetes.client.util.WebSocketStreamHandler;
1920
import java.io.ByteArrayInputStream;
20-
import java.io.ByteArrayOutputStream;
2121
import java.io.IOException;
2222
import java.io.InputStream;
23+
import java.io.OutputStream;
2324
import java.nio.charset.Charset;
2425
import okhttp3.Request;
2526
import okhttp3.WebSocket;
26-
import okio.BufferedSink;
2727
import okio.ByteString;
28-
import okio.Okio;
2928
import org.junit.Test;
3029

3130
public class WebsocketStreamHandlerTest {
@@ -59,34 +58,99 @@ public void testHandlerReceivingData() throws IOException {
5958
assertTrue(mockWebSocket.closed);
6059
}
6160

61+
@Test
62+
public void testHandlerSendingData() throws IOException {
63+
int testStreamId = 0;
64+
byte testData = 1;
65+
byte[] testDatas =
66+
new byte[] {(byte) testStreamId, testData, testData}; // first byte stands for stream id,
67+
ByteArrayInputStream testBytesInputStream = new ByteArrayInputStream(testDatas);
68+
69+
WebSocketStreamHandler handler = new WebSocketStreamHandler();
70+
MockWebSocket mockWebSocket = new MockWebSocket();
71+
72+
handler.open(testProtocol, mockWebSocket);
73+
74+
OutputStream outputStream = handler.getOutputStream(testStreamId);
75+
76+
byte[] bytes = "This is a test string".getBytes("UTF-8");
77+
byte[] output = new byte[bytes.length + 1];
78+
output[0] = 0;
79+
for (int i = 0; i < bytes.length; i++) {
80+
output[i + 1] = bytes[i];
81+
}
82+
outputStream.write(bytes);
83+
outputStream.flush();
84+
85+
assertArrayEquals(output, mockWebSocket.data);
86+
}
87+
88+
@Test
89+
public void testHandlerSendingLargeData() throws IOException {
90+
int testStreamId = 0;
91+
byte testData = 1;
92+
byte[] testDatas =
93+
new byte[] {(byte) testStreamId, testData, testData}; // first byte stands for stream id,
94+
ByteArrayInputStream testBytesInputStream = new ByteArrayInputStream(testDatas);
95+
96+
WebSocketStreamHandler handler = new WebSocketStreamHandler();
97+
MockWebSocket mockWebSocket = new MockWebSocket();
98+
99+
handler.open(testProtocol, mockWebSocket);
100+
101+
OutputStream outputStream = handler.getOutputStream(testStreamId);
102+
103+
byte[] bytes = new byte[20 * 1024 * 1024];
104+
byte[] output = new byte[bytes.length + 2];
105+
106+
for (int i = 0; i < bytes.length; i++) {
107+
bytes[i] = (byte) i;
108+
}
109+
// First stream header
110+
output[0] = 0;
111+
for (int i = 0; i < 15 * 1024 * 1024; i++) {
112+
output[i + 1] = bytes[i];
113+
}
114+
// Second stream header
115+
output[15 * 1024 * 1024 + 1] = 0;
116+
for (int i = 15 * 1024 * 1024; i < bytes.length; i++) {
117+
output[i + 2] = bytes[i];
118+
}
119+
outputStream.write(bytes);
120+
outputStream.flush();
121+
122+
assertArrayEquals(output, mockWebSocket.data);
123+
}
124+
62125
private class MockWebSocket implements WebSocket {
63-
private byte[] data;
126+
byte[] data;
64127
private boolean closed = false;
65128

129+
private byte[] append(byte[] one, byte[] two) {
130+
if (one == null || one.length == 0) {
131+
return two;
132+
}
133+
134+
byte[] result = new byte[one.length + two.length];
135+
for (int i = 0; i < one.length; i++) {
136+
result[i] = one[i];
137+
}
138+
for (int i = 0; i < two.length; i++) {
139+
result[i + one.length] = two[i];
140+
}
141+
return result;
142+
}
143+
66144
@Override
67145
public boolean send(String s) {
68-
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
69-
BufferedSink sink = Okio.buffer(Okio.sink(outputStream));
70-
try {
71-
sink.writeString(s, Charset.defaultCharset());
72-
} catch (IOException e) {
73-
throw new RuntimeException(e);
74-
}
75-
this.data = outputStream.toByteArray();
146+
this.data = append(data, s.getBytes(Charset.defaultCharset()));
76147
return true;
77148
}
78149

79150
@Override
80151
public boolean send(ByteString byteString) {
81-
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
82-
BufferedSink sink = Okio.buffer(Okio.sink(outputStream));
83-
try {
84-
sink.write(byteString);
85-
} catch (IOException e) {
86-
throw new RuntimeException(e);
87-
}
88-
this.data = outputStream.toByteArray();
89-
return false;
152+
this.data = append(data, byteString.toByteArray());
153+
return true;
90154
}
91155

92156
@Override

0 commit comments

Comments
 (0)