From 52bef07be315cc51dbfc219e1e63c17f137b0648 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Tue, 24 Sep 2024 15:11:20 +0100 Subject: [PATCH 1/4] withConnectionPoolSettingsBuilder --- .../matsluni/akkahttpspi/AkkaHttpClient.scala | 38 ++++++++-- .../akkahttpspi/AkkaHttpClientSpec.scala | 71 +++++++++++++++++++ .../akkahttpspi/BaseAwsClientTest.scala | 2 +- 3 files changed, 104 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/github/matsluni/akkahttpspi/AkkaHttpClient.scala b/src/main/scala/com/github/matsluni/akkahttpspi/AkkaHttpClient.scala index 8275892..a25dbd3 100644 --- a/src/main/scala/com/github/matsluni/akkahttpspi/AkkaHttpClient.scala +++ b/src/main/scala/com/github/matsluni/akkahttpspi/AkkaHttpClient.scala @@ -28,20 +28,21 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.{`Content-Length`, `Content-Type`} import akka.http.scaladsl.settings.ConnectionPoolSettings import akka.stream.scaladsl.Source -import akka.stream.{ActorMaterializer, Materializer, SystemMaterializer} +import akka.stream.{Materializer, SystemMaterializer} import akka.util.ByteString import org.slf4j.LoggerFactory import software.amazon.awssdk.http.async._ -import software.amazon.awssdk.http.SdkHttpRequest +import software.amazon.awssdk.http.{SdkHttpConfigurationOption, SdkHttpRequest} import software.amazon.awssdk.utils.AttributeMap import scala.collection.immutable import scala.jdk.CollectionConverters._ +import scala.compat.java8.DurationConverters._ import scala.compat.java8.OptionConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext} -class AkkaHttpClient(shutdownHandle: () => Unit, connectionSettings: ConnectionPoolSettings)(implicit actorSystem: ActorSystem, ec: ExecutionContext, mat: Materializer) extends SdkAsyncHttpClient { +class AkkaHttpClient(shutdownHandle: () => Unit, private[akkahttpspi] val connectionSettings: ConnectionPoolSettings)(implicit actorSystem: ActorSystem, ec: ExecutionContext, mat: Materializer) extends SdkAsyncHttpClient { import AkkaHttpClient._ lazy val runner = new RequestRunner() @@ -135,17 +136,39 @@ object AkkaHttpClient { else throw new RuntimeException(s"Could not parse custom content type '$contentTypeStr'.") } + // based on NettyNioAsyncHttpClient and ApacheHttpClient + // https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fnetty-nio-client%5C%2Fsrc%5C%2Fmain%2F&type=code + // https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fapache-client%5C%2Fsrc%5C%2Fmain%2F&type=code + private[akkahttpspi] def buildConnectionPoolSettings(base: ConnectionPoolSettings,attributeMap: AttributeMap): ConnectionPoolSettings = { + def zeroToInfinite(duration: java.time.Duration): scala.concurrent.duration.Duration = + if (duration.isZero) scala.concurrent.duration.Duration.Inf + else duration.toScala + + base + .withUpdatedConnectionSettings(s => + s.withConnectingTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).toScala) + .withIdleTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toScala) + ) + .withMaxConnections(attributeMap.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue()) + .withMaxConnectionLifetime(zeroToInfinite(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE))) + } + def builder() = AkkaHttpClientBuilder() case class AkkaHttpClientBuilder(private val actorSystem: Option[ActorSystem] = None, private val executionContext: Option[ExecutionContext] = None, - private val connectionPoolSettings: Option[ConnectionPoolSettings] = None) extends SdkAsyncHttpClient.Builder[AkkaHttpClientBuilder] { - def buildWithDefaults(attributeMap: AttributeMap): SdkAsyncHttpClient = { + private val connectionPoolSettings: Option[ConnectionPoolSettings] = None, + private val connectionPoolSettingsBuilder: (ConnectionPoolSettings, AttributeMap) => ConnectionPoolSettings = (c, _) => c + ) extends SdkAsyncHttpClient.Builder[AkkaHttpClientBuilder] { + + def buildWithDefaults(serviceDefaults: AttributeMap): SdkAsyncHttpClient = { implicit val as = actorSystem.getOrElse(ActorSystem("aws-akka-http")) implicit val ec = executionContext.getOrElse(as.dispatcher) val mat: Materializer = SystemMaterializer(as).materializer - val cps = connectionPoolSettings.getOrElse(ConnectionPoolSettings(as)) + val resolvedOptions = serviceDefaults.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS); + + val cps = connectionPoolSettingsBuilder(connectionPoolSettings.getOrElse(ConnectionPoolSettings(as)), resolvedOptions) val shutdownhandleF = () => { if (actorSystem.isEmpty) { Await.result(Http().shutdownAllConnectionPools().flatMap(_ => as.terminate()), Duration.apply(10, TimeUnit.SECONDS)) @@ -158,6 +181,9 @@ object AkkaHttpClient { def withActorSystem(actorSystem: ClassicActorSystemProvider): AkkaHttpClientBuilder = copy(actorSystem = Some(actorSystem.classicSystem)) def withExecutionContext(executionContext: ExecutionContext): AkkaHttpClientBuilder = copy(executionContext = Some(executionContext)) def withConnectionPoolSettings(connectionPoolSettings: ConnectionPoolSettings): AkkaHttpClientBuilder = copy(connectionPoolSettings = Some(connectionPoolSettings)) + def withConnectionPoolSettingsBuilder(connectionPoolSettingsBuilder: (ConnectionPoolSettings, AttributeMap) => ConnectionPoolSettings): AkkaHttpClientBuilder = + copy(connectionPoolSettingsBuilder = connectionPoolSettingsBuilder) + def withConnectionPoolSettingsBuilderFromAttributeMap(): AkkaHttpClientBuilder = copy(connectionPoolSettingsBuilder = buildConnectionPoolSettings) } lazy val xAmzJson = ContentType(MediaType.customBinary("application", "x-amz-json-1.0", Compressible)) diff --git a/src/test/scala/com/github/matsluni/akkahttpspi/AkkaHttpClientSpec.scala b/src/test/scala/com/github/matsluni/akkahttpspi/AkkaHttpClientSpec.scala index 15f6a2c..40830e4 100644 --- a/src/test/scala/com/github/matsluni/akkahttpspi/AkkaHttpClientSpec.scala +++ b/src/test/scala/com/github/matsluni/akkahttpspi/AkkaHttpClientSpec.scala @@ -18,11 +18,17 @@ package com.github.matsluni.akkahttpspi import akka.http.scaladsl.model.headers.`Content-Type` import akka.http.scaladsl.model.MediaTypes +import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} +import com.typesafe.config.ConfigFactory import org.scalatest.OptionValues import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec +import software.amazon.awssdk.http.SdkHttpConfigurationOption +import software.amazon.awssdk.utils.AttributeMap +import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ +import scala.jdk.DurationConverters._ class AkkaHttpClientSpec extends AnyWordSpec with Matchers with OptionValues { @@ -46,5 +52,70 @@ class AkkaHttpClientSpec extends AnyWordSpec with Matchers with OptionValues { contentTypeHeader.value.lowercaseName() shouldBe `Content-Type`.lowercaseName reqHeaders should have size 1 } + + "build() should use default ConnectionPoolSettings" in { + val akkaClient: AkkaHttpClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory() + .build() + .asInstanceOf[AkkaHttpClient] + + akkaClient.connectionSettings shouldBe ConnectionPoolSettings(ConfigFactory.load()) + } + + "withConnectionPoolSettingsBuilderFromAttributeMap().buildWithDefaults() should propagate configuration options" in { + val attributeMap = AttributeMap.builder() + .put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, 1.second.toJava) + .put(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT, 2.second.toJava) + .put(SdkHttpConfigurationOption.MAX_CONNECTIONS, 3) + .put(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE, 4.second.toJava) + .build() + val akkaClient: AkkaHttpClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory() + .withConnectionPoolSettingsBuilderFromAttributeMap() + .buildWithDefaults(attributeMap) + .asInstanceOf[AkkaHttpClient] + + akkaClient.connectionSettings.connectionSettings.connectingTimeout shouldBe 1.second + akkaClient.connectionSettings.connectionSettings.idleTimeout shouldBe 2.seconds + akkaClient.connectionSettings.maxConnections shouldBe 3 + akkaClient.connectionSettings.maxConnectionLifetime shouldBe 4.seconds + } + + "withConnectionPoolSettingsBuilderFromAttributeMap().build() should fallback to GLOBAL_HTTP_DEFAULTS" in { + val akkaClient: AkkaHttpClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory() + .withConnectionPoolSettingsBuilderFromAttributeMap() + .build() + .asInstanceOf[AkkaHttpClient] + + akkaClient.connectionSettings.connectionSettings.connectingTimeout shouldBe + SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).toScala + akkaClient.connectionSettings.connectionSettings.idleTimeout shouldBe + SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toScala + akkaClient.connectionSettings.maxConnections shouldBe + SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue() + infiniteToZero(akkaClient.connectionSettings.maxConnectionLifetime) shouldBe + SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE) + } + + "withConnectionPoolSettingsBuilder().build() should use passed connectionPoolSettings builder" in { + val connectionPoolSettings = ConnectionPoolSettings(ConfigFactory.load()) + .withConnectionSettings( + ClientConnectionSettings(ConfigFactory.load()) + .withConnectingTimeout(1.second) + .withIdleTimeout(2.seconds) + ) + .withMaxConnections(3) + .withMaxConnectionLifetime(4.seconds) + + val akkaClient: AkkaHttpClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory() + .withConnectionPoolSettingsBuilder((_, _) => connectionPoolSettings) + .build() + .asInstanceOf[AkkaHttpClient] + + akkaClient.connectionSettings shouldBe connectionPoolSettings + } + } + + private def infiniteToZero(duration: scala.concurrent.duration.Duration): java.time.Duration = duration match { + case _: scala.concurrent.duration.Duration.Infinite => java.time.Duration.ZERO + case duration: FiniteDuration => duration.toJava } } diff --git a/src/test/scala/com/github/matsluni/akkahttpspi/BaseAwsClientTest.scala b/src/test/scala/com/github/matsluni/akkahttpspi/BaseAwsClientTest.scala index a93ad50..94fbc03 100644 --- a/src/test/scala/com/github/matsluni/akkahttpspi/BaseAwsClientTest.scala +++ b/src/test/scala/com/github/matsluni/akkahttpspi/BaseAwsClientTest.scala @@ -53,7 +53,7 @@ trait LocalstackBaseAwsClientTest[C <: SdkClient] extends BaseAwsClientTest[C] { lazy val exposedServicePort: Int = 4566 private lazy val containerInstance = new GenericContainer( - dockerImage = "localstack/localstack", + dockerImage = "localstack/localstack:1.4.0", exposedPorts = Seq(exposedServicePort), env = Map("SERVICES" -> service), waitStrategy = Some(LocalStackReadyLogWaitStrategy) From e8ede5ea9a6b7f5ae8366317a6a7d09f69c0e678 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Tue, 24 Sep 2024 15:41:11 +0100 Subject: [PATCH 2/4] add test for withConnectionPoolSettings --- .../akkahttpspi/AkkaHttpClientSpec.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/test/scala/com/github/matsluni/akkahttpspi/AkkaHttpClientSpec.scala b/src/test/scala/com/github/matsluni/akkahttpspi/AkkaHttpClientSpec.scala index 40830e4..1fb8ec4 100644 --- a/src/test/scala/com/github/matsluni/akkahttpspi/AkkaHttpClientSpec.scala +++ b/src/test/scala/com/github/matsluni/akkahttpspi/AkkaHttpClientSpec.scala @@ -112,6 +112,23 @@ class AkkaHttpClientSpec extends AnyWordSpec with Matchers with OptionValues { akkaClient.connectionSettings shouldBe connectionPoolSettings } + + "withConnectionPoolSettings().build() should use passed ConnectionPoolSettings" in { + val connectionPoolSettings = ConnectionPoolSettings(ConfigFactory.load()) + .withConnectionSettings( + ClientConnectionSettings(ConfigFactory.load()) + .withConnectingTimeout(1.second) + .withIdleTimeout(2.seconds) + ) + .withMaxConnections(3) + .withMaxConnectionLifetime(4.seconds) + val akkaClient: AkkaHttpClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory() + .withConnectionPoolSettings(connectionPoolSettings) + .build() + .asInstanceOf[AkkaHttpClient] + + akkaClient.connectionSettings shouldBe connectionPoolSettings + } } private def infiniteToZero(duration: scala.concurrent.duration.Duration): java.time.Duration = duration match { From e5ac1cae3470fc7ef952e534ac48debdb34cb665 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Tue, 24 Sep 2024 17:04:09 +0100 Subject: [PATCH 3/4] avoid DurationConverters --- .../github/matsluni/akkahttpspi/AkkaHttpClient.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/github/matsluni/akkahttpspi/AkkaHttpClient.scala b/src/main/scala/com/github/matsluni/akkahttpspi/AkkaHttpClient.scala index a25dbd3..f534440 100644 --- a/src/main/scala/com/github/matsluni/akkahttpspi/AkkaHttpClient.scala +++ b/src/main/scala/com/github/matsluni/akkahttpspi/AkkaHttpClient.scala @@ -37,7 +37,6 @@ import software.amazon.awssdk.utils.AttributeMap import scala.collection.immutable import scala.jdk.CollectionConverters._ -import scala.compat.java8.DurationConverters._ import scala.compat.java8.OptionConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext} @@ -142,12 +141,15 @@ object AkkaHttpClient { private[akkahttpspi] def buildConnectionPoolSettings(base: ConnectionPoolSettings,attributeMap: AttributeMap): ConnectionPoolSettings = { def zeroToInfinite(duration: java.time.Duration): scala.concurrent.duration.Duration = if (duration.isZero) scala.concurrent.duration.Duration.Inf - else duration.toScala + else toScala(duration) + + def toScala(duration: java.time.Duration): scala.concurrent.duration.FiniteDuration = + scala.concurrent.duration.Duration.fromNanos(duration.toNanos) base .withUpdatedConnectionSettings(s => - s.withConnectingTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).toScala) - .withIdleTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toScala) + s.withConnectingTimeout(toScala(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT))) + .withIdleTimeout(toScala(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT))) ) .withMaxConnections(attributeMap.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue()) .withMaxConnectionLifetime(zeroToInfinite(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE))) From 406d7257f2edce12ee02f6e2a79dc9676dc9183c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Tue, 24 Sep 2024 17:41:44 +0100 Subject: [PATCH 4/4] avoid DurationConverters in tests as well --- .../akkahttpspi/AkkaHttpClientSpec.scala | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/test/scala/com/github/matsluni/akkahttpspi/AkkaHttpClientSpec.scala b/src/test/scala/com/github/matsluni/akkahttpspi/AkkaHttpClientSpec.scala index 1fb8ec4..0b0a413 100644 --- a/src/test/scala/com/github/matsluni/akkahttpspi/AkkaHttpClientSpec.scala +++ b/src/test/scala/com/github/matsluni/akkahttpspi/AkkaHttpClientSpec.scala @@ -28,7 +28,6 @@ import software.amazon.awssdk.utils.AttributeMap import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ -import scala.jdk.DurationConverters._ class AkkaHttpClientSpec extends AnyWordSpec with Matchers with OptionValues { @@ -63,10 +62,10 @@ class AkkaHttpClientSpec extends AnyWordSpec with Matchers with OptionValues { "withConnectionPoolSettingsBuilderFromAttributeMap().buildWithDefaults() should propagate configuration options" in { val attributeMap = AttributeMap.builder() - .put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, 1.second.toJava) - .put(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT, 2.second.toJava) - .put(SdkHttpConfigurationOption.MAX_CONNECTIONS, 3) - .put(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE, 4.second.toJava) + .put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, toJava(1.second)) + .put(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT, toJava(2.second)) + .put(SdkHttpConfigurationOption.MAX_CONNECTIONS, Integer.valueOf(3)) + .put(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE, toJava(4.second)) .build() val akkaClient: AkkaHttpClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory() .withConnectionPoolSettingsBuilderFromAttributeMap() @@ -86,9 +85,9 @@ class AkkaHttpClientSpec extends AnyWordSpec with Matchers with OptionValues { .asInstanceOf[AkkaHttpClient] akkaClient.connectionSettings.connectionSettings.connectingTimeout shouldBe - SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).toScala + toScala(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT)) akkaClient.connectionSettings.connectionSettings.idleTimeout shouldBe - SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toScala + toScala(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT)) akkaClient.connectionSettings.maxConnections shouldBe SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue() infiniteToZero(akkaClient.connectionSettings.maxConnectionLifetime) shouldBe @@ -133,6 +132,14 @@ class AkkaHttpClientSpec extends AnyWordSpec with Matchers with OptionValues { private def infiniteToZero(duration: scala.concurrent.duration.Duration): java.time.Duration = duration match { case _: scala.concurrent.duration.Duration.Infinite => java.time.Duration.ZERO - case duration: FiniteDuration => duration.toJava + case duration: FiniteDuration => toJava(duration) + } + + private def toJava(duration: scala.concurrent.duration.FiniteDuration): java.time.Duration = { + java.time.Duration.ofNanos(duration.toNanos) + } + + private def toScala(duration: java.time.Duration): scala.concurrent.duration.FiniteDuration = { + scala.concurrent.duration.Duration.fromNanos(duration.toNanos) } }