Skip to content

Commit b6bea4d

Browse files
committed
HTTPRequestProcessor: Don't allocate more than 20KB chunks
This is a tweak to the parsing logic for chunked encoding so that if a large chunk is sent it will only copy in 20KB at a time before distributing to the body listeners. This also is slightly more optimized in that it will reduce the data copies by one
1 parent 515955a commit b6bea4d

File tree

2 files changed

+43
-37
lines changed

2 files changed

+43
-37
lines changed

protocol/src/main/java/org/threadly/litesockets/protocols/http/request/HTTPRequestProcessor.java

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
public class HTTPRequestProcessor {
2424
public static final int MAX_HEADER_LENGTH = 1024*128;
2525
public static final int MAX_HEADER_ROW_LENGTH = 1024*8;
26+
private static final int MAX_CHUNK_SIZE = 1024*20;
2627

2728
private final ReuseableMergedByteBuffers pendingBuffers = new ReuseableMergedByteBuffers();
2829
private final ListenerHelper<HTTPRequestCallback> listeners = new ListenerHelper<>(HTTPRequestCallback.class);
@@ -69,15 +70,6 @@ public void removeHTTPRequestCallback(HTTPRequestCallback hrc) {
6970
listeners.removeListener(hrc);
7071
}
7172

72-
/**
73-
* byte[] to send through the processor.
74-
*
75-
* @param ba to send through the processor.
76-
*/
77-
public void processData(byte[] ba) {
78-
processData(ByteBuffer.wrap(ba));
79-
}
80-
8173
/**
8274
* {@link ByteBuffer} to send through the processor.
8375
*
@@ -147,7 +139,7 @@ private void runProcessData() {
147139
String upgrade = hh.getHeader(HTTPConstants.HTTP_KEY_UPGRADE);
148140
if(hh.isChunked()) {
149141
bodySize = -1;
150-
isChunked = true;
142+
isChunked = true;
151143
} else if(upgrade != null && upgrade.equals(HTTPConstants.WEBSOCKET)) {
152144
bodySize = -1;
153145
isWebsocket = true;
@@ -163,10 +155,8 @@ private void runProcessData() {
163155
} else {
164156
break;
165157
}
166-
} else {
167-
if(!processBody()) {
168-
return;
169-
}
158+
} else if(!processBody()) {
159+
return;
170160
}
171161
}
172162
}
@@ -211,12 +201,12 @@ private boolean parseStreamBody() {
211201
return false;
212202
} else {
213203
if(currentBodySize < bodySize) {
214-
ByteBuffer bb = pendingBuffers.pullBuffer((int)Math.min(pendingBuffers.remaining(), bodySize - currentBodySize));
204+
ByteBuffer bb = pendingBuffers.pullBuffer((int)Math.min(pendingBuffers.remaining(),
205+
bodySize - currentBodySize));
215206
currentBodySize+=bb.remaining();
216207
sendDuplicateBBtoListeners(bb);
217208
if(currentBodySize == bodySize) {
218209
reset();
219-
return true;
220210
}
221211
return true;
222212
} else {
@@ -237,8 +227,8 @@ private boolean parseChunkData() {
237227
reset();
238228
return false;
239229
} else {
240-
// TODO - don't allocate a full chunk, chunks may be large
241-
chunkedBB = ByteBuffer.allocate((int)bodySize); // we can int cast safely due to int parse above
230+
// we can int cast safely due to int parse above
231+
chunkedBB = ByteBuffer.allocate(Math.min((int)bodySize, MAX_CHUNK_SIZE));
242232
return true;
243233
}
244234
} else {
@@ -248,23 +238,40 @@ private boolean parseChunkData() {
248238
listeners.call().hasError(new HTTPParsingException("Problem reading chunk size!", e));
249239
return false;
250240
}
251-
} else {
252-
if(currentBodySize == bodySize && pendingBuffers.remaining() >=2) {
253-
chunkedBB.flip();
254-
sendDuplicateBBtoListeners(chunkedBB);
255-
chunkedBB = null;
241+
} // if not returned we can now try to read
242+
243+
if (currentBodySize == bodySize) {
244+
if(pendingBuffers.remaining() >=2) {
245+
if (chunkedBB != null) {
246+
if (chunkedBB.position() > 0) {
247+
chunkedBB.flip();
248+
sendDuplicateBBtoListeners(chunkedBB);
249+
}
250+
chunkedBB = null;
251+
}
256252
pendingBuffers.discard(2);
257253
bodySize = -1;
258254
currentBodySize = 0;
259-
} else if(currentBodySize == bodySize && pendingBuffers.remaining() < 2) {
255+
} else { // waiting for chunk termination
260256
return false;
261-
} else {
262-
ByteBuffer bb = pendingBuffers.pullBuffer((int)Math.min(pendingBuffers.remaining(), bodySize - currentBodySize));
263-
currentBodySize+=bb.remaining();
264-
chunkedBB.put(bb);
265257
}
266-
return true;
258+
} else {
259+
int read = pendingBuffers.get(chunkedBB.array(), chunkedBB.position(), chunkedBB.remaining());
260+
chunkedBB.position(chunkedBB.position() + read);
261+
currentBodySize += read;
262+
263+
if (! chunkedBB.hasRemaining()) {
264+
chunkedBB.flip();
265+
sendDuplicateBBtoListeners(chunkedBB);
266+
int remaining = Math.min(((int)bodySize) - currentBodySize, MAX_CHUNK_SIZE);
267+
if (remaining > 0) {
268+
chunkedBB = ByteBuffer.allocate(remaining);
269+
} else {
270+
chunkedBB = null;
271+
}
272+
}
267273
}
274+
return true;
268275
}
269276

270277
private void sendDuplicateBBtoListeners(ByteBuffer bb) {
@@ -291,11 +298,10 @@ public void reset() {
291298
* NOTE: any currently unprocessed buffer will remain! see {@link #clearBuffer()}
292299
*/
293300
public void reset(Throwable t) {
294-
if(this.request != null && t == null) {
295-
this.listeners.call().finished();
296-
}
297301
if(t != null) {
298302
this.listeners.call().hasError(t);
303+
} else if(this.request != null) {
304+
this.listeners.call().finished();
299305
}
300306
this.request = null;
301307
this.currentBodySize = 0;

protocol/src/test/java/org/threadly/litesockets/protocols/http/RequestTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public void basicBuildAndParsingWithDataTest() throws MalformedURLException {
149149
assertEquals("/test12334", cb.request.getHTTPRequestHeader().getRequestPath());
150150
assertEquals(HTTPRequestMethod.GET.toString(), cb.request.getHTTPRequestHeader().getRequestMethod());
151151
assertEquals("1", cb.request.getHTTPRequestHeader().getRequestQueryValue("query"));
152-
hrp.processData(DATA_BA);
152+
hrp.processData(ByteBuffer.wrap(DATA_BA));
153153
assertTrue(cb.finished);
154154
assertEquals(1, cb.bbs.size());
155155
assertEquals(DATA, bbToString(cb.bbs.get(0).duplicate()));
@@ -216,7 +216,7 @@ public void basicParsingChunkedBadChunkSize() throws MalformedURLException {
216216
hrp.addHTTPRequestCallback(cb);
217217
HTTPRequest hr = hrb.buildHTTPRequest();
218218
hrp.processData(hr.getMergedByteBuffers());
219-
hrp.processData("TRE\r\n".getBytes());
219+
hrp.processData(ByteBuffer.wrap("TRE\r\n".getBytes()));
220220
assertTrue(cb.error != null);
221221
assertTrue(cb.error instanceof HTTPParsingException);
222222
}
@@ -233,7 +233,7 @@ public void basicParsingChunkedManyReads() throws MalformedURLException {
233233
for(int i = 0; i<ba.length; i++) {
234234
byte[] nba = new byte[1];
235235
nba[0] = ba[i];
236-
hrp.processData(nba);
236+
hrp.processData(ByteBuffer.wrap(nba));
237237
}
238238
hrp.processData(HTTPUtils.wrapInChunk(ByteBuffer.allocate(0)));
239239
assertTrue(cb.finished);
@@ -255,10 +255,10 @@ public void basicBuildAndParsingChunked() throws MalformedURLException {
255255
assertEquals("/test12334", cb.request.getHTTPRequestHeader().getRequestPath());
256256
assertEquals(HTTPRequestMethod.GET.toString(), cb.request.getHTTPRequestHeader().getRequestMethod());
257257
assertEquals("1", cb.request.getHTTPRequestHeader().getRequestQueryValue("query"));
258-
hrp.processData(wrapInChunk(DATA_BA));
258+
hrp.processData(ByteBuffer.wrap(wrapInChunk(DATA_BA)));
259259
assertEquals(1, cb.bbs.size());
260260
assertEquals(DATA, bbToString(cb.bbs.get(0).duplicate()));
261-
hrp.processData(wrapInChunk(DATA_BA));
261+
hrp.processData(ByteBuffer.wrap(wrapInChunk(DATA_BA)));
262262
assertFalse(cb.finished);
263263
assertEquals(2, cb.bbs.size());
264264
assertEquals(DATA, bbToString(cb.bbs.get(1).duplicate()));

0 commit comments

Comments
 (0)