Skip to content

Commit 80e4d8a

Browse files
authored
HTTPClient: Fix condition after timeout where client may remain queued or in-progress (#31)
This adds unit tests which originally failed due to the client remaining in-progress or in queue. The solution is to add a failure listener which will attempt to remove from queue or in-progress if the client is still active or queued.
1 parent 4728cc3 commit 80e4d8a

File tree

3 files changed

+77
-23
lines changed

3 files changed

+77
-23
lines changed

client/src/main/java/org/threadly/litesockets/client/http/HTTPClient.java

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ public ListenableFuture<HTTPResponseData> requestAsync(final ClientHTTPRequest r
322322
return hrw.slf;
323323
}
324324

325-
private void processQueue() {
325+
protected void processQueue() {
326326
//This should be done after we do a .select on the ntse to check for more jobs before it exits.
327327
HTTPRequestWrapper hrw;
328328
while(maxConcurrent > inProcess.size() && (hrw = queue.poll()) != null) {
@@ -337,8 +337,16 @@ private void process(HTTPRequestWrapper hrw) {
337337
}
338338
try {
339339
hrw.requestStarting();
340-
hrw.client = getTCPClient(hrw.chr.getHTTPAddress());
341-
inProcess.put(hrw.client, hrw);
340+
TCPClient freshClient = getTCPClient(hrw.chr.getHTTPAddress());
341+
hrw.client = freshClient;
342+
inProcess.put(freshClient, hrw);
343+
if (hrw.slf.isDone()) { // check if completed early, likely a timeout
344+
// since timeout logic may have missed cleanup we must ensure cleanup is done now
345+
// TODO - see if we can improve this by avoiding the extra check
346+
inProcess.remove(freshClient, hrw);
347+
addBackTCPClient(hrw.chr.getHTTPAddress(), freshClient); // if client is cleaned up this will ignore
348+
return;
349+
}
342350
SimpleMergedByteBuffers writeBuffer;
343351
if (hrw.chr.getBodyBuffer() == null) {
344352
writeBuffer = new SimpleMergedByteBuffers(false,
@@ -487,20 +495,18 @@ public void run() {
487495
private class MainClientProcessor implements Reader, ClientCloseListener {
488496
@Override
489497
public void onClose(Client client) {
490-
HTTPRequestWrapper hrw = inProcess.get(client);
498+
HTTPRequestWrapper hrw = inProcess.remove(client);
491499

492500
client.close();
493501
if(hrw != null) {
494502
boolean wasProcessing = hrw.hrp.isProcessing();
495503
hrw.hrp.connectionClosed();
496504
if(! hrw.slf.isDone() && ! wasProcessing) {
497-
hrw.client = null;
498-
process(hrw);
505+
process(hrw);
499506
} else {
500507
hrw.slf.setFailure(new HTTPParsingException("Did not get complete body!"));
501508
}
502509
}
503-
inProcess.remove(client);
504510
tcpClients.remove(client);
505511
}
506512

@@ -544,6 +550,18 @@ private HTTPRequestWrapper(ClientHTTPRequest chr) {
544550
this.chr = chr;
545551

546552
sei.watchFuture(slf, chr.getTimeoutMS());
553+
slf.failureCallback((t) -> {
554+
if (queue.remove(this)) {
555+
// was likely a timeout, avoid leaving the request suck in the queue
556+
} else {
557+
// since it was not in the queue, we need to release the client from being in-process
558+
TCPClient client = this.client;
559+
this.client = null;
560+
if (client != null) { // may have already been cleaned up
561+
client.close();
562+
}
563+
}
564+
});
547565
}
548566

549567
public void requestStarting() {
@@ -573,6 +591,7 @@ public void headersFinished(HTTPResponse hr) {
573591
public void bodyData(ByteBuffer bb) {
574592
responseMBB.add(bb);
575593
if(responseMBB.remaining() > maxResponseSize) {
594+
TCPClient client = this.client; // will be `null` after error
576595
slf.setFailure(new HTTPParsingException("Response Body to large!"));
577596
client.close();
578597
}
@@ -584,17 +603,26 @@ public void finished() {
584603
slf.setResult(new HTTPResponseData(HTTPClient.this, chr.getHTTPRequest(), response,
585604
responseMBB.duplicateAndClean()));
586605
hrp.removeHTTPResponseCallback(this);
587-
inProcess.remove(client);
588-
addBackTCPClient(chr.getHTTPAddress(), client);
606+
TCPClient client = this.client;
607+
this.client = null;
608+
if (client != null && inProcess.remove(client, this)) {
609+
addBackTCPClient(chr.getHTTPAddress(), client);
610+
}
589611
processQueue();
590612
}
591613

592614
@Override
593615
public void hasError(Throwable t) {
594-
if (hrp.isProcessing()) {
595-
slf.setFailure(t);
596-
} // if not processing we likely got a close that can work after a retry
597-
client.close();
616+
// since it was not in the queue, we need to release the client from being in-process
617+
TCPClient client = this.client;
618+
if (client != null) { // may have already been cleaned up
619+
this.client = null;
620+
621+
if (hrp.isProcessing()) {
622+
slf.setFailure(t);
623+
} // if not processing we likely got a close that can work after a retry
624+
client.close();
625+
}
598626
}
599627

600628
@Override

client/src/test/java/org/threadly/litesockets/client/http/HTTPClientTests.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,6 @@ public void handleFailure(Throwable t) {
165165
for(int i=0; i<number; i++) {
166166
CLIENT_PS.execute(run);
167167
}
168-
Thread.sleep(1000);
169-
System.out.println(count.get());
170168
new TestCondition(){
171169
@Override
172170
public boolean get() {
@@ -261,7 +259,7 @@ public void noContentLengthWithBody() throws IOException, HTTPParsingException {
261259
final HTTPClient httpClient = new HTTPClient();
262260
httpClient.start();
263261
HTTPResponseData hrs = httpClient.request(hrb.buildClientHTTPRequest());
264-
System.out.println(hrs.getResponse());
262+
//System.out.println(hrs.getResponse());
265263
assertEquals("TEST123", hrs.getBodyAsString());
266264
}
267265

@@ -275,7 +273,7 @@ public void contentLengthOnHeadRequest() throws IOException, HTTPParsingExceptio
275273
final HTTPClient httpClient = new HTTPClient();
276274
httpClient.start();
277275
HTTPResponseData hrs = httpClient.request(hrb.buildClientHTTPRequest());
278-
System.out.println(hrs.getResponse());
276+
//System.out.println(hrs.getResponse());
279277
assertEquals(CONTENT.length(), hrs.getContentLength());
280278
assertEquals("", hrs.getBodyAsString());
281279
}
@@ -305,14 +303,43 @@ public void timeoutRequest() throws IOException, HTTPParsingException {
305303
server.start();
306304
final HTTPRequestBuilder hrb = new HTTPRequestBuilder(new URL("http://localhost:"+port));
307305
hrb.setBody(IOUtils.EMPTY_BYTEBUFFER);
308-
hrb.setTimeout(500, TimeUnit.MILLISECONDS);
306+
hrb.setTimeout(200, TimeUnit.MILLISECONDS);
309307
final HTTPClient httpClient = new HTTPClient();
310308
httpClient.start();
311309
try{
312310
httpClient.request(hrb.buildClientHTTPRequest());
313311
fail();
314312
} catch(HTTPParsingException e) {
315313
assertEquals("HTTP Timeout!", e.getMessage());
314+
// below conditions may be slightly async due to future getting a result before listeners are invoked
315+
new TestCondition(() -> httpClient.getRequestQueueSize() == 0).blockTillTrue(1_000);
316+
new TestCondition(() -> httpClient.getInProgressSize() == 0).blockTillTrue(1_000);
317+
}
318+
}
319+
320+
@Test
321+
public void timeoutQueuedRequest() throws IOException, HTTPParsingException {
322+
int port = PortUtils.findTCPPort();
323+
TCPServer server = SEI.createTCPServer("localhost", port);
324+
server.start();
325+
final HTTPRequestBuilder hrb = new HTTPRequestBuilder(new URL("http://localhost:"+port));
326+
hrb.setBody(IOUtils.EMPTY_BYTEBUFFER);
327+
hrb.setTimeout(200, TimeUnit.MILLISECONDS);
328+
final HTTPClient httpClient = new HTTPClient() {
329+
@Override
330+
protected void processQueue() {
331+
// queue is never processed
332+
}
333+
};
334+
httpClient.start();
335+
try{
336+
httpClient.request(hrb.buildClientHTTPRequest());
337+
fail();
338+
} catch(HTTPParsingException e) {
339+
assertEquals("HTTP Timeout!", e.getMessage());
340+
// below conditions may be slightly async due to future getting a result before listeners are invoked
341+
new TestCondition(() -> httpClient.getRequestQueueSize() == 0).blockTillTrue(1_000);
342+
new TestCondition(() -> httpClient.getInProgressSize() == 0).blockTillTrue(1_000);
316343
}
317344
}
318345

@@ -412,7 +439,7 @@ public void timeOut() throws IOException, InterruptedException, ExecutionExcepti
412439
server.setClientAcceptor(new ClientAcceptor() {
413440
@Override
414441
public void accept(Client c) {
415-
System.out.println("new Client!");
442+
//System.out.println("new Client!");
416443
}});
417444
server.start();
418445
final HTTPRequestBuilder hrb = new HTTPRequestBuilder(new URL("http://localhost:"+port));
@@ -427,7 +454,6 @@ public void accept(Client c) {
427454
try{
428455
httpClient.request(chr);
429456
} catch(Exception e) {
430-
System.out.println(System.currentTimeMillis() - start);
431457
assertTrue(System.currentTimeMillis() - start < 700);
432458
return;
433459
}

gradle.properties

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
group = org.threadly
2-
version = 0.23-SNAPSHOT
3-
threadlyVersion = 5.34
4-
litesocketsVersion = 4.9
2+
version = 0.23
3+
threadlyVersion = 5.37
4+
litesocketsVersion = 4.10
55
org.gradle.parallel=false
66
junitVersion = 4.12
77

0 commit comments

Comments
 (0)