@@ -16,7 +16,7 @@ import scala.concurrent.{ExecutionException, Future}
1616
1717import javax .net .ssl .SSLContext
1818
19- trait BaseSession {
19+ trait BaseSession extends AutoCloseable {
2020 def headers : Map [String , String ]
2121 def cookies : mutable.Map [String , HttpCookie ]
2222 def readTimeout : Int
@@ -42,6 +42,21 @@ trait BaseSession {
4242 lazy val patch = Requester (" PATCH" , this )
4343
4444 def send (method : String ) = Requester (method, this )
45+
46+ // Executor and HttpClient for this session, lazily initialized
47+ lazy val executor : ExecutorService = Executors .newCachedThreadPool()
48+ lazy val sharedHttpClient : HttpClient = BaseSession .buildHttpClient(
49+ proxy, cert, sslContext, verifySslCerts, connectTimeout, executor
50+ )
51+
52+ /**
53+ * Closes the shared HttpClient and its executor. Call this when you're done
54+ * with the session to release resources and prevent thread leaks.
55+ */
56+ def close (): Unit = {
57+ BaseSession .closeHttpClient(sharedHttpClient)
58+ executor.shutdown()
59+ }
4560}
4661
4762object BaseSession {
@@ -50,6 +65,74 @@ object BaseSession {
5065 " Accept-Encoding" -> " gzip, deflate" ,
5166 " Accept" -> " */*" ,
5267 )
68+
69+ def buildHttpClient (
70+ proxy : (String , Int ),
71+ cert : Cert ,
72+ sslContext : SSLContext ,
73+ verifySslCerts : Boolean ,
74+ connectTimeout : Int ,
75+ executor : ExecutorService
76+ ): HttpClient = {
77+ val builder = HttpClient
78+ .newBuilder()
79+ .executor(executor)
80+ .followRedirects(HttpClient .Redirect .NEVER )
81+ .proxy(proxy match {
82+ case null => ProxySelector .getDefault
83+ case (ip, port) => ProxySelector .of(new InetSocketAddress (ip, port))
84+ })
85+ .sslContext(
86+ if (cert != null )
87+ Util .clientCertSSLContext(cert, verifySslCerts)
88+ else if (sslContext != null )
89+ sslContext
90+ else if (! verifySslCerts)
91+ Util .noVerifySSLContext
92+ else
93+ SSLContext .getDefault,
94+ )
95+ .connectTimeout(Duration .ofMillis(connectTimeout))
96+
97+ builder.build()
98+ }
99+
100+ /**
101+ * Closes an HttpClient, using reflection to handle both Java 21+ (which has close())
102+ * and earlier versions (which require accessing internal selector).
103+ */
104+ def closeHttpClient (httpClient : HttpClient ): Unit = {
105+ try {
106+ val closeMethod = classOf [HttpClient ].getMethod(" close" )
107+ closeMethod.invoke(httpClient)
108+ } catch {
109+ case _ : NoSuchMethodException =>
110+ // Java < 21: use reflection to access internal selectorManager and close its selector
111+ // HttpClient.newBuilder().build() returns HttpClientFacade which wraps HttpClientImpl
112+ try {
113+ val facadeClass = httpClient.getClass
114+ val implField = facadeClass.getDeclaredField(" impl" )
115+ implField.setAccessible(true )
116+ val impl = implField.get(httpClient)
117+ val selectorManagerField = impl.getClass.getDeclaredField(" selmgr" )
118+ selectorManagerField.setAccessible(true )
119+ val selectorManager = selectorManagerField.get(impl)
120+ // SelectorManager has a 'selector' field we can close
121+ val selectorField = selectorManager.getClass.getDeclaredField(" selector" )
122+ selectorField.setAccessible(true )
123+ val selector = selectorField.get(selectorManager)
124+ val closeMethod = selector.getClass.getMethod(" close" )
125+ closeMethod.invoke(selector)
126+ } catch {
127+ case _ : Exception =>
128+ System .err.println(
129+ " requests: Unable to close HttpClient SelectorManager thread. " +
130+ " To fix thread leaks on Java <21, add JVM arg: " +
131+ " --add-opens java.net.http/jdk.internal.net.http=ALL-UNNAMED"
132+ )
133+ }
134+ }
135+ }
53136}
54137
55138object Requester {
@@ -59,6 +142,14 @@ object Requester {
59142 m.setAccessible(true )
60143 m
61144 }
145+
146+ /** Check if an exception's direct cause is a certificate-related error */
147+ def causedByCertificateError (e : Throwable ): Boolean = {
148+ e.getCause match {
149+ case _ : java.security.cert.CertificateException | _ : java.security.cert.CertPathValidatorException => true
150+ case _ => false
151+ }
152+ }
62153}
63154
64155case class Requester (verb : String , sess : BaseSession ) {
@@ -211,28 +302,18 @@ case class Requester(verb: String, sess: BaseSession) {
211302 new java.net.URL (url + firstSep + encodedParams)
212303 } else url0
213304
214- val executor : ExecutorService = Executors .newSingleThreadExecutor()
305+ // Check if we can reuse the session's shared HttpClient
306+ val useSharedClient =
307+ proxy == sess.proxy &&
308+ cert == sess.cert &&
309+ sslContext == sess.sslContext &&
310+ verifySslCerts == sess.verifySslCerts &&
311+ connectTimeout == sess.connectTimeout
312+
215313 val httpClient : HttpClient =
216- HttpClient
217- .newBuilder()
218- .executor(executor)
219- .followRedirects(HttpClient .Redirect .NEVER )
220- .proxy(proxy match {
221- case null => ProxySelector .getDefault
222- case (ip, port) => ProxySelector .of(new InetSocketAddress (ip, port))
223- })
224- .sslContext(
225- if (cert != null )
226- Util .clientCertSSLContext(cert, verifySslCerts)
227- else if (sslContext != null )
228- sslContext
229- else if (! verifySslCerts)
230- Util .noVerifySSLContext
231- else
232- SSLContext .getDefault,
233- )
234- .connectTimeout(Duration .ofMillis(connectTimeout))
235- .build()
314+ if (useSharedClient) sess.sharedHttpClient
315+ else BaseSession .buildHttpClient(proxy, cert, sslContext, verifySslCerts, connectTimeout, sess.executor)
316+
236317 try {
237318
238319 val sessionCookieValues = for {
@@ -268,51 +349,42 @@ case class Requester(verb: String, sess: BaseSession) {
268349 val headersKeyValueAlternating = lastOfEachHeader.values.toList.flatMap {
269350 case (k, v) => Seq (k, v)
270351 }
271-
272- val requestBodyInputStream = new PipedInputStream ()
273- val requestBodyOutputStream = new PipedOutputStream (requestBodyInputStream)
274-
352+
353+ // Buffer the request body
354+ val requestBodyBuffer = new ByteArrayOutputStream ()
355+ usingOutputStream(compress.wrap(requestBodyBuffer)) { os => data.write(os) }
356+ val requestBodyBytes = requestBodyBuffer.toByteArray
357+
275358 val bodyPublisher : HttpRequest .BodyPublisher =
276- HttpRequest .BodyPublishers .ofInputStream(new Supplier [InputStream ] {
277- override def get () = requestBodyInputStream
278- })
279-
359+ if (requestBodyBytes.isEmpty) HttpRequest .BodyPublishers .noBody()
360+ else HttpRequest .BodyPublishers .ofByteArray(requestBodyBytes)
361+
280362 val requestBuilder =
281363 HttpRequest
282364 .newBuilder()
283365 .uri(url1.toURI)
284366 .timeout(Duration .ofMillis(readTimeout))
285367 .headers(headersKeyValueAlternating : _* )
286- .method(
287- upperCaseVerb,
288- (contentLengthHeader.headOption.map(_._2), compress) match {
289- case (Some (" 0" ), _) => HttpRequest .BodyPublishers .noBody()
290- case (Some (n), Compress .None ) =>
291- HttpRequest .BodyPublishers .fromPublisher(bodyPublisher, n.toInt)
292- case _ => bodyPublisher
293- },
294- )
295-
296- val fut = httpClient.sendAsync(
297- requestBuilder.build(),
298- HttpResponse .BodyHandlers .ofInputStream(),
299- )
300-
301- usingOutputStream(compress.wrap(requestBodyOutputStream)) { os => data.write(os) }
302-
368+ .method(upperCaseVerb, bodyPublisher)
369+
370+ def wrapError : PartialFunction [Throwable , Nothing ] = {
371+ case e : javax.net.ssl.SSLException => throw new InvalidCertException (url, e)
372+ case _ : HttpConnectTimeoutException | _ : HttpTimeoutException =>
373+ throw new TimeoutException (url, readTimeout, connectTimeout)
374+ case e : java.net.UnknownHostException => throw new UnknownHostException (url, e.getMessage)
375+ case e : java.net.ConnectException => throw new UnknownHostException (url, e.getMessage)
376+ case e : IOException if Requester .causedByCertificateError(e) => throw new InvalidCertException (url, e)
377+ }
378+
303379 val response =
304- try
305- fut.get()
380+ try httpClient.send(requestBuilder.build(), HttpResponse .BodyHandlers .ofInputStream())
306381 catch {
307- case e : ExecutionException =>
308- throw e.getCause match {
309- case e : javax.net.ssl.SSLHandshakeException => new InvalidCertException (url, e)
310- case _ : HttpConnectTimeoutException | _ : HttpTimeoutException =>
311- new TimeoutException (url, readTimeout, connectTimeout)
312- case e : java.net.UnknownHostException => new UnknownHostException (url, e.getMessage)
313- case e : java.net.ConnectException => new UnknownHostException (url, e.getMessage)
314- case e => new RequestsException (e.getMessage, Some (e))
315- }
382+ case e : Throwable =>
383+ wrapError.lift(e)
384+ // Sometimes the error we care about is wrapped in an IOException
385+ // so check inside to see if there's something we want to handle
386+ .orElse(wrapError.lift(e.getCause))
387+ .getOrElse(throw new RequestsException (e.getMessage, Some (e)))
316388 }
317389
318390 val responseCode = response.statusCode()
@@ -337,7 +409,7 @@ case class Requester(verb: String, sess: BaseSession) {
337409 .flatten
338410 .exists(_.contains(" deflate" ))
339411 def persistCookies () = {
340- if (sess.persistCookies) {
412+ if (sess.persistCookies) sess.cookies. synchronized {
341413 headerFields
342414 .get(" set-cookie" )
343415 .iterator
@@ -439,38 +511,10 @@ case class Requester(verb: String, sess: BaseSession) {
439511 }
440512 }
441513 } finally {
442- // Try to close HttpClient if close() method exists (Java 21+)
443- try {
444- val closeMethod = classOf [HttpClient ].getMethod(" close" )
445- closeMethod.invoke(httpClient)
446- } catch {
447- case _ : NoSuchMethodException =>
448- // Java < 21: use reflection to access internal selectorManager and close its selector
449- // HttpClient.newBuilder().build() returns HttpClientFacade which wraps HttpClientImpl
450- try {
451- val facadeClass = httpClient.getClass
452- val implField = facadeClass.getDeclaredField(" impl" )
453- implField.setAccessible(true )
454- val impl = implField.get(httpClient)
455- val selectorManagerField = impl.getClass.getDeclaredField(" selmgr" )
456- selectorManagerField.setAccessible(true )
457- val selectorManager = selectorManagerField.get(impl)
458- // SelectorManager has a 'selector' field we can close
459- val selectorField = selectorManager.getClass.getDeclaredField(" selector" )
460- selectorField.setAccessible(true )
461- val selector = selectorField.get(selectorManager)
462- val closeMethod = selector.getClass.getMethod(" close" )
463- closeMethod.invoke(selector)
464- } catch {
465- case _ : Exception =>
466- System .err.println(
467- " requests: Unable to close HttpClient SelectorManager thread. " +
468- " To fix thread leaks on Java <21, add JVM arg: " +
469- " --add-opens java.net.http/jdk.internal.net.http=ALL-UNNAMED"
470- )
471- }
514+ // Only clean up if we created a temporary HttpClient (not using shared)
515+ if (! useSharedClient) {
516+ BaseSession .closeHttpClient(httpClient)
472517 }
473- executor.shutdown()
474518 }
475519 }
476520 }
0 commit comments