|
4 | 4 | */ |
5 | 5 | package aws.smithy.kotlin.runtime.http.engine.ktor |
6 | 6 |
|
7 | | -import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine |
8 | | -import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineBase |
9 | | -import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineConfig |
| 7 | +import aws.smithy.kotlin.runtime.http.Headers |
| 8 | +import aws.smithy.kotlin.runtime.http.HttpStatusCode |
| 9 | +import aws.smithy.kotlin.runtime.http.engine.* |
| 10 | +import aws.smithy.kotlin.runtime.http.request.HttpRequest |
| 11 | +import aws.smithy.kotlin.runtime.http.response.HttpCall |
| 12 | +import aws.smithy.kotlin.runtime.http.response.HttpResponse |
| 13 | +import aws.smithy.kotlin.runtime.logging.Logger |
| 14 | +import aws.smithy.kotlin.runtime.logging.trace |
| 15 | +import aws.smithy.kotlin.runtime.time.Instant |
| 16 | +import io.ktor.client.* |
| 17 | +import io.ktor.client.request.* |
| 18 | +import io.ktor.client.statement.* |
| 19 | +import io.ktor.http.* |
| 20 | +import io.ktor.util.* |
| 21 | +import kotlinx.coroutines.CoroutineDispatcher |
| 22 | +import kotlinx.coroutines.channels.Channel |
| 23 | +import kotlinx.coroutines.channels.SendChannel |
| 24 | +import kotlinx.coroutines.job |
| 25 | +import kotlinx.coroutines.launch |
| 26 | +import kotlinx.coroutines.sync.Mutex |
| 27 | +import kotlin.coroutines.CoroutineContext |
| 28 | +import io.ktor.client.engine.HttpClientEngine as KtorHttpClientEngine |
10 | 29 |
|
11 | 30 | /** |
12 | | - * Specifies the ktor http client of which platform specific [HttpClientEngine]'s actualize |
13 | | - * |
14 | | - * @param config Provides configuration for the DefaultHttpClientEngine |
| 31 | + * Utility class that wraps the given Ktor engine as an [HttpClientEngine]. |
| 32 | + * This class can be used to wrap any Ktor compliant engine (though not all engines |
| 33 | + * may support HTTP features required by any given SDK). |
15 | 34 | */ |
16 | | -expect class KtorEngine(config: HttpClientEngineConfig = HttpClientEngineConfig.Default) : HttpClientEngineBase { |
17 | | - val config: HttpClientEngineConfig |
| 35 | +class KtorEngine( |
| 36 | + private val engine: KtorHttpClientEngine |
| 37 | +) : HttpClientEngineBase("ktor") { |
| 38 | + |
| 39 | + @Suppress("UNUSED_PARAMETER") |
| 40 | + @Deprecated( |
| 41 | + message = "KtorEngine was previously synonymous with the OkHttp engine. It has been modified to wrap any " + |
| 42 | + "Ktor compliant engine. The default engine has been changed from CRT to Ktor/OkHttp. To fix either " + |
| 43 | + "remove setting `httpClientEngine` explicitly or instantiate a Ktor compliant engine of your own and " + |
| 44 | + "use KtorEngine to wrap it. This constructor will be removed in a future release before GA.", |
| 45 | + level = DeprecationLevel.ERROR |
| 46 | + ) |
| 47 | + constructor(config: HttpClientEngineConfig = HttpClientEngineConfig.Default) : this(DeprecationEngine) |
| 48 | + |
| 49 | + val client: HttpClient = HttpClient(engine) { |
| 50 | + // do not throw exceptions if status code < 300, error handling is expected by generated clients |
| 51 | + expectSuccess = false |
| 52 | + |
| 53 | + // do not attempt to follow redirects for status codes like 301 because they should be handled higher up |
| 54 | + followRedirects = false |
| 55 | + } |
| 56 | + |
| 57 | + private val logger = Logger.getLogger<KtorEngine>() |
| 58 | + |
| 59 | + override suspend fun roundTrip(request: HttpRequest): HttpCall { |
| 60 | + val callContext = callContext() |
| 61 | + |
| 62 | + val respChannel = Channel<HttpCall>(Channel.RENDEZVOUS) |
| 63 | + |
| 64 | + // run the request in another coroutine to allow streaming body to be handled |
| 65 | + launch(callContext + ioDispatcher()) { |
| 66 | + try { |
| 67 | + execute(callContext, request, respChannel) |
| 68 | + } catch (ex: Exception) { |
| 69 | + // signal the HTTP response isn't coming |
| 70 | + respChannel.close(ex) |
| 71 | + } |
| 72 | + } |
| 73 | + |
| 74 | + // wait for the response to be available, the content will be read as a stream |
| 75 | + logger.trace("waiting on response to be available") |
| 76 | + |
| 77 | + try { |
| 78 | + val resp = respChannel.receive() |
| 79 | + logger.trace("response is available continuing") |
| 80 | + return resp |
| 81 | + } catch (ex: Exception) { |
| 82 | + throw logger.throwing(ex) |
| 83 | + } |
| 84 | + } |
| 85 | + |
| 86 | + private suspend fun execute( |
| 87 | + callContext: CoroutineContext, |
| 88 | + sdkRequest: HttpRequest, |
| 89 | + channel: SendChannel<HttpCall> |
| 90 | + ) { |
| 91 | + val builder = KtorRequestAdapter(sdkRequest, callContext).toBuilder() |
| 92 | + val waiter = Waiter() |
| 93 | + val reqTime = Instant.now() |
| 94 | + client.request<HttpStatement>(builder).execute { httpResp -> |
| 95 | + val respTime = Instant.now() |
| 96 | + // we have a lifetime problem here...the stream (and HttpResponse instance) are only valid |
| 97 | + // until the end of this block. We don't know if the consumer wants to read the content fully or |
| 98 | + // stream it. We need to wait until the entire content has been read before leaving the block and |
| 99 | + // releasing the underlying network resources. We do this by blocking until the request job |
| 100 | + // completes, at which point we signal it's safe to exit the block and release the underlying resources. |
| 101 | + callContext.job.invokeOnCompletion { waiter.signal() } |
| 102 | + |
| 103 | + val body = KtorHttpBody(httpResp.contentLength(), httpResp.content) |
| 104 | + |
| 105 | + // copy the headers so that we no longer depend on the underlying ktor HttpResponse object |
| 106 | + // outside of the body content (which will signal once read that it is safe to exit the block) |
| 107 | + val headers = Headers { appendAll(KtorHeaders(httpResp.headers)) } |
| 108 | + |
| 109 | + val resp = HttpResponse( |
| 110 | + HttpStatusCode.fromValue(httpResp.status.value), |
| 111 | + headers, |
| 112 | + body, |
| 113 | + ) |
| 114 | + |
| 115 | + logger.trace("signalling response") |
| 116 | + val call = HttpCall(sdkRequest, resp, reqTime, respTime, callContext) |
| 117 | + channel.send(call) |
| 118 | + |
| 119 | + logger.trace("waiting on body to be consumed") |
| 120 | + // wait for the receiving end to finish with the HTTP body |
| 121 | + waiter.wait() |
| 122 | + logger.trace("request done") |
| 123 | + } |
| 124 | + } |
| 125 | + |
| 126 | + override fun close() { |
| 127 | + client.close() |
| 128 | + engine.close() |
| 129 | + } |
| 130 | +} |
| 131 | + |
| 132 | +/** |
| 133 | + * Simple notify mechanism that waits for a signal |
| 134 | + */ |
| 135 | +internal class Waiter { |
| 136 | + private val mutex = Mutex(locked = true) |
| 137 | + |
| 138 | + // wait for the signal |
| 139 | + suspend fun wait() { mutex.lock() } |
| 140 | + |
| 141 | + // give the signal to continue |
| 142 | + fun signal() { mutex.unlock() } |
| 143 | +} |
| 144 | + |
| 145 | +// FIXME - dummy engine for deprecated constructor, remove before GA |
| 146 | +private object DeprecationEngine : KtorHttpClientEngine { |
| 147 | + override val config: io.ktor.client.engine.HttpClientEngineConfig |
| 148 | + get() = error("not a real engine") |
| 149 | + override val coroutineContext: CoroutineContext |
| 150 | + get() = error("not a real engine") |
| 151 | + override val dispatcher: CoroutineDispatcher |
| 152 | + get() = error("not a real engine") |
| 153 | + |
| 154 | + override fun close() {} |
| 155 | + |
| 156 | + @InternalAPI |
| 157 | + override suspend fun execute(data: HttpRequestData): HttpResponseData { |
| 158 | + error("not a real engine") |
| 159 | + } |
18 | 160 | } |
0 commit comments