55import java .net .URL ;
66import java .nio .ByteBuffer ;
77import java .util .ArrayDeque ;
8+ import java .util .Iterator ;
89import java .util .concurrent .CancellationException ;
910import java .util .concurrent .ConcurrentHashMap ;
1011import java .util .concurrent .ConcurrentLinkedQueue ;
4243import org .threadly .litesockets .utils .SSLUtils ;
4344import org .threadly .util .AbstractService ;
4445import org .threadly .util .Clock ;
46+ import org .threadly .util .Pair ;
4547
4648/**
4749 * <p>This is a HTTPClient for doing many simple HTTPRequests. Every request will be make a new connection and requests
5153public class HTTPClient extends AbstractService {
5254 public static final int DEFAULT_CONCURRENT = 2 ;
5355 public static final int DEFAULT_TIMEOUT = 15000 ;
56+ public static final int DEFAULT_MAX_IDLE = 45000 ;
5457 public static final int MAX_HTTP_RESPONSE = 1048576 ; //1MB
5558
5659 private final int maxResponseSize ;
5760 private final SubmitterScheduler ssi ;
5861 private final SocketExecuter sei ;
5962 private final ConcurrentLinkedQueue <HTTPRequestWrapper > queue = new ConcurrentLinkedQueue <>();
6063 private final ConcurrentHashMap <TCPClient , HTTPRequestWrapper > inProcess = new ConcurrentHashMap <>();
61- private final ConcurrentHashMap <HTTPAddress , ArrayDeque <TCPClient >> sockets = new ConcurrentHashMap <>();
64+ private final ConcurrentHashMap <HTTPAddress , ArrayDeque <Pair < Long , TCPClient > >> sockets = new ConcurrentHashMap <>();
6265 private final CopyOnWriteArraySet <TCPClient > tcpClients = new CopyOnWriteArraySet <>();
6366 private final MainClientProcessor mcp = new MainClientProcessor ();
6467 private final RunSocket runSocketTask ;
6568 private final int maxConcurrent ;
66- private volatile int defaultTimeoutMS = HTTPRequest .DEFAULT_TIMEOUT_MS ;
69+ private volatile Runnable checkIdle = null ;
70+ private volatile long defaultTimeoutMS = HTTPRequest .DEFAULT_TIMEOUT_MS ;
6771 private volatile SSLContext sslContext = SSLUtils .OPEN_SSL_CTX ;
72+ private volatile long maxIdleTime = DEFAULT_MAX_IDLE ;
6873
6974 private NoThreadSocketExecuter ntse = null ;
7075 private SingleThreadScheduler sts = null ;
@@ -96,7 +101,6 @@ public HTTPClient(int maxConcurrent, int maxResponseSize) {
96101 runSocketTask = new RunSocket (ssi );
97102 }
98103
99-
100104 /**
101105 * <p>This constructor will let you set the max Concurrent Requests and max Response Size
102106 * as well as your own {@link SocketExecuter} as the thread pool to use.</p>
@@ -168,8 +172,34 @@ public void closeAllClients() {
168172 *
169173 * @param timeout time in milliseconds to wait for HTTPRequests to finish.
170174 */
171- public void setTimeout (TimeUnit unit , int timeout ) {
172- this .defaultTimeoutMS = (int )Math .min (Math .max (unit .toMillis (timeout ),HTTPRequest .MIN_TIMEOUT_MS ), HTTPRequest .MAX_TIMEOUT_MS );
175+ public void setTimeout (long timeout , TimeUnit unit ) {
176+ this .defaultTimeoutMS = Math .min (Math .max (unit .toMillis (timeout ),HTTPRequest .MIN_TIMEOUT_MS ), HTTPRequest .MAX_TIMEOUT_MS );
177+ }
178+
179+ public long getMaxIdleTimeout () {
180+ return this .maxIdleTime ;
181+ }
182+
183+ /**
184+ * Sets the max amount of time we will hold onto idle connections. A 0 means we close connections when done, less
185+ * than zero means we will never expire connections.
186+ *
187+ * @param it the time in milliseconds to wait before timing out a connection.
188+ */
189+ public void setMaxIdleTimeout (long it , TimeUnit unit ) {
190+ this .maxIdleTime = unit .toMillis (it );
191+ if (this .maxIdleTime > 0 ) {
192+ this .checkIdle = new Runnable () {
193+ @ Override
194+ public void run () {
195+ if (checkIdle == this ) {
196+ checkIdleSockets ();
197+ ssi .schedule (this , Math .max (100 , maxIdleTime /2 ));
198+ }
199+ }
200+ };
201+ this .ssi .schedule (checkIdle , Math .max (100 , maxIdleTime /2 ));
202+ }
173203 }
174204
175205 /**
@@ -260,11 +290,10 @@ public ListenableFuture<HTTPResponseData> requestAsync(final URL url) {
260290 public ListenableFuture <HTTPResponseData > requestAsync (final URL url , final HTTPRequestType rt , final ByteBuffer bb ) {
261291 HTTPRequestBuilder hrb = new HTTPRequestBuilder (url );
262292 hrb .setRequestType (rt );
263- hrb .setTimeout (TimeUnit . MILLISECONDS , this . defaultTimeoutMS );
293+ hrb .setTimeout (this . defaultTimeoutMS , TimeUnit . MILLISECONDS );
264294 return requestAsync (hrb .buildClientHTTPRequest ());
265295 }
266296
267-
268297 /**
269298 * Sends an asynchronous HTTP request.
270299 *
@@ -319,6 +348,7 @@ protected void startupService() {
319348 if (ntse != null ) {
320349 ntse .start ();
321350 }
351+ setMaxIdleTimeout (this .maxIdleTime , TimeUnit .MILLISECONDS );
322352 }
323353
324354 @ Override
@@ -338,18 +368,18 @@ protected void shutdownService() {
338368 }
339369
340370 private TCPClient getTCPClient (final HTTPAddress ha ) throws IOException {
341- ArrayDeque <TCPClient > ll = sockets .get (ha );
371+ ArrayDeque <Pair < Long , TCPClient >> pl = sockets .get (ha );
342372 TCPClient tc = null ;
343- if (ll != null ) {
344- synchronized (ll ) {
345- while (ll .size () > 0 && tc == null ) {
346- if (ll .peek ().isClosed ()) {
347- ll .pop ();
373+ if (pl != null ) {
374+ synchronized (pl ) {
375+ while (pl .size () > 0 && tc == null ) {
376+ if (pl .peek (). getRight ().isClosed ()) {
377+ pl .pop ();
348378 } else {
349- tc = ll .pop ();
379+ tc = pl .pop (). getRight ();
350380 }
351381 }
352- if (ll .size () == 0 ) {
382+ if (pl .size () == 0 ) {
353383 sockets .remove (ha );
354384 }
355385 }
@@ -371,14 +401,35 @@ private TCPClient getTCPClient(final HTTPAddress ha) throws IOException {
371401 }
372402
373403 private void addBackTCPClient (final HTTPAddress ha , final TCPClient client ) {
404+ if (maxIdleTime == 0 ) {
405+ client .close ();
406+ return ;
407+ }
374408 if (!client .isClosed ()) {
375- ArrayDeque <TCPClient > ll = sockets .get (ha );
409+ ArrayDeque <Pair < Long , TCPClient > > ll = sockets .get (ha );
376410 if (ll == null ) {
377411 sockets .put (ha , new ArrayDeque <>(8 ));
378412 ll = sockets .get (ha );
379413 }
380414 synchronized (ll ) {
381- ll .add (client );
415+ ll .add (new Pair <>(Clock .lastKnownForwardProgressingMillis (), client ));
416+ }
417+ }
418+ }
419+
420+ private void checkIdleSockets () {
421+ if (maxIdleTime > 0 ) {
422+ for (ArrayDeque <Pair <Long ,TCPClient >> adq : sockets .values ()) {
423+ synchronized (adq ) {
424+ Iterator <Pair <Long ,TCPClient >> iter = adq .iterator ();
425+ while (iter .hasNext ()) {
426+ Pair <Long ,TCPClient > c = iter .next ();
427+ if (Clock .lastKnownForwardProgressingMillis () - c .getLeft () > maxIdleTime ) {
428+ iter .remove ();
429+ c .getRight ().close ();
430+ }
431+ }
432+ }
382433 }
383434 }
384435 }
@@ -417,7 +468,7 @@ public void onClose(Client client) {
417468 if (hrw != null ) {
418469 boolean wasProcessing = hrw .hrp .isProcessing ();
419470 hrw .hrp .connectionClosed ();
420- if (!hrw .slf .isDone () && !wasProcessing ) {
471+ if (! hrw .slf .isDone () && ! wasProcessing ) {
421472 hrw .client = null ;
422473 process (hrw );
423474 } else {
@@ -554,3 +605,4 @@ public String toString() {
554605 }
555606 }
556607}
608+
0 commit comments