Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import akka.stream.{ActorMaterializer, Materializer, SystemMaterializer}
import akka.util.ByteString
import org.slf4j.LoggerFactory
import software.amazon.awssdk.http.async._
import software.amazon.awssdk.http.SdkHttpRequest
import software.amazon.awssdk.http.{Protocol, SdkHttpConfigurationOption, SdkHttpRequest}
import software.amazon.awssdk.utils.AttributeMap

import scala.collection.immutable
Expand All @@ -41,13 +41,13 @@ import scala.compat.java8.OptionConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext}

class AkkaHttpClient(shutdownHandle: () => Unit, connectionSettings: ConnectionPoolSettings)(implicit actorSystem: ActorSystem, ec: ExecutionContext, mat: Materializer) extends SdkAsyncHttpClient {
class AkkaHttpClient(shutdownHandle: () => Unit, connectionSettings: ConnectionPoolSettings, protocol: HttpProtocol)(implicit actorSystem: ActorSystem, ec: ExecutionContext, mat: Materializer) extends SdkAsyncHttpClient {
import AkkaHttpClient._

lazy val runner = new RequestRunner()

override def execute(request: AsyncExecuteRequest): CompletableFuture[Void] = {
val akkaHttpRequest = toAkkaRequest(request.request(), request.requestContentPublisher())
val akkaHttpRequest = toAkkaRequest(protocol, request.request(), request.requestContentPublisher())
runner.run(
() => Http().singleRequest(akkaHttpRequest, settings = connectionSettings),
request.responseHandler()
Expand All @@ -65,15 +65,15 @@ object AkkaHttpClient {

val logger = LoggerFactory.getLogger(this.getClass)

private[akkahttpspi] def toAkkaRequest(request: SdkHttpRequest, contentPublisher: SdkHttpContentPublisher): HttpRequest = {
private[akkahttpspi] def toAkkaRequest(protocol: HttpProtocol, request: SdkHttpRequest, contentPublisher: SdkHttpContentPublisher): HttpRequest = {
val (contentTypeHeader, reqheaders) = convertHeaders(request.headers())
val method = convertMethod(request.method().name())
HttpRequest(
method = method,
uri = Uri(request.getUri.toString),
headers = reqheaders,
entity = entityForMethodAndContentType(method, contentTypeHeaderToContentType(contentTypeHeader), contentPublisher),
protocol = HttpProtocols.`HTTP/1.1`
protocol = protocol
)
}

Expand Down Expand Up @@ -145,21 +145,31 @@ object AkkaHttpClient {
implicit val ec = executionContext.getOrElse(as.dispatcher)
val mat: Materializer = SystemMaterializer(as).materializer

val mergedAttributeMap = attributeMap.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except from Protocol nothing else is used from the GLOBAL_HTTP_DEFAULTS. Should we create a attributeMap with just the Protocol in it, with a default to Http/1.1?

Otherwise it may be confusing, what other settings are taken from there and right now I would prefer to configure the Akka-Http client via its normal methods of configuration.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except from Protocol nothing else is used from the GLOBAL_HTTP_DEFAULTS

The use of GLOBAL_HTTP_DEFAULTS, was inspired in the NettyNioAsyncHttpClient, and I was planning to submit another PR that reads other attributes from there like CONNECTION_TIMEOUT, MAX_CONNECTIONS, etc...

I would prefer to configure the Akka-Http client via its normal methods of configuration.

I understand right now that configuration is done via the normal methods of configuration i.e typesafe config library, but what are your thoughts about changing that to use the attributeMap? Asking because I see some specific clients configuring specific options like KinesisHttpConfigurationOptions and you can find other examples here
Maybe we should honor those...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the use GLOBAL_HTTP_DEFAULTS and SdkHttpConfigurationOption and added a simple protocol field to the builder

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created the issue #232 to track your idea with the config changes.


val cps = connectionPoolSettings.getOrElse(ConnectionPoolSettings(as))
val protocol = convertProtocol(mergedAttributeMap.get(SdkHttpConfigurationOption.PROTOCOL))
val shutdownhandleF = () => {
if (actorSystem.isEmpty) {
Await.result(Http().shutdownAllConnectionPools().flatMap(_ => as.terminate()), Duration.apply(10, TimeUnit.SECONDS))
}
()
}
new AkkaHttpClient(shutdownhandleF, cps)(as, ec, mat)
new AkkaHttpClient(shutdownhandleF, cps, protocol)(as, ec, mat)
}
def withActorSystem(actorSystem: ActorSystem): AkkaHttpClientBuilder = copy(actorSystem = Some(actorSystem))
def withActorSystem(actorSystem: ClassicActorSystemProvider): AkkaHttpClientBuilder = copy(actorSystem = Some(actorSystem.classicSystem))
def withExecutionContext(executionContext: ExecutionContext): AkkaHttpClientBuilder = copy(executionContext = Some(executionContext))
def withConnectionPoolSettings(connectionPoolSettings: ConnectionPoolSettings): AkkaHttpClientBuilder = copy(connectionPoolSettings = Some(connectionPoolSettings))
}

private def convertProtocol(protocol: Protocol) = {
protocol match {
case Protocol.HTTP1_1 => HttpProtocols.`HTTP/1.1`
case Protocol.HTTP2 => HttpProtocols.`HTTP/2.0`
}
}

lazy val xAmzJson = ContentType(MediaType.customBinary("application", "x-amz-json-1.0", Compressible))
lazy val xAmzJson11 = ContentType(MediaType.customBinary("application", "x-amz-json-1.1", Compressible))
lazy val xAmzCbor11 = ContentType(MediaType.customBinary("application", "x-amz-cbor-1.1", Compressible))
Expand Down