55package aws.sdk.kotlin.crt.http
66
77import aws.sdk.kotlin.crt.*
8- import aws.sdk.kotlin.crt.Allocator
9- import aws.sdk.kotlin.crt.NativeHandle
10- import aws.sdk.kotlin.crt.awsAssertOpSuccess
118import aws.sdk.kotlin.crt.io.Buffer
129import aws.sdk.kotlin.crt.io.ByteCursorBuffer
1310import aws.sdk.kotlin.crt.util.asAwsByteCursor
1411import aws.sdk.kotlin.crt.util.initFromCursor
1512import aws.sdk.kotlin.crt.util.toKString
1613import aws.sdk.kotlin.crt.util.withAwsByteCursor
14+ import kotlinx.atomicfu.atomic
1715import kotlinx.cinterop.*
1816import libcrt.*
1917import platform.posix.size_t
2018
2119internal class HttpClientConnectionNative (
2220 private val manager : HttpClientConnectionManager ,
2321 override val ptr : CPointer <cnames.structs.aws_http_connection>,
24- ) : Closeable,
22+ ) : WithCrt(),
23+ Closeable ,
2524 HttpClientConnection ,
2625 NativeHandle < cnames.structs.aws_http_connection> {
2726
27+ private val closed = atomic(false )
28+
2829 override val id: String = ptr.rawValue.toString()
2930 override fun makeRequest (httpReq : HttpRequest , handler : HttpStreamResponseHandler ): HttpStream {
3031 val nativeReq = httpReq.toNativeRequest()
31- val cbData = HttpStreamContext (handler, nativeReq)
32+ val cbData = HttpStreamContext (null , handler, nativeReq)
3233 val stableRef = StableRef .create(cbData)
3334 val reqOptions = cValue< aws_http_make_request_options> {
3435 self_size = sizeOf< aws_http_make_request_options> ().convert()
@@ -50,22 +51,30 @@ internal class HttpClientConnectionNative(
5051 throw CrtRuntimeException (" aws_http_connection_make_request()" )
5152 }
5253
53- return HttpStreamNative (stream)
54+ return HttpStreamNative (stream). also { cbData.stream = it }
5455 }
5556
5657 override fun shutdown () {
5758 aws_http_connection_close(ptr)
5859 }
5960
6061 override fun close () {
61- manager.releaseConnection(this )
62+ if (closed.compareAndSet(false , true )) {
63+ manager.releaseConnection(this )
64+ }
6265 }
6366}
6467
6568/* *
6669 * Userdata passed through the native callbacks for HTTP responses
6770 */
6871private class HttpStreamContext (
72+ /* *
73+ * The Kotlin stream object. This starts as null because the context is created before the stream itself. We need
74+ * the stream in callbacks so we set it lazily.
75+ */
76+ var stream : HttpStreamNative ? = null ,
77+
6978 /* *
7079 * The actual Kotlin handler for each callback
7180 */
@@ -85,7 +94,7 @@ private fun onResponseHeaders(
8594 userdata : COpaquePointer ? ,
8695): Int {
8796 val ctx = userdata?.asStableRef<HttpStreamContext >()?.get() ? : return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE .toInt())
88- val stream = nativeStream?. let { HttpStreamNative (it) } ? : return aws_raise_error( AWS_ERROR_HTTP_CALLBACK_FAILURE .toInt())
97+ val stream = ctx.stream ? : return AWS_OP_ERR
8998
9099 val hdrCnt = numHeaders.toInt()
91100 val headers: List <HttpHeader >? = if (hdrCnt > 0 && headerArray != null ) {
@@ -106,6 +115,7 @@ private fun onResponseHeaders(
106115 log(LogLevel .Error , " onResponseHeaders: $ex " )
107116 return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE .toInt())
108117 }
118+
109119 return AWS_OP_SUCCESS
110120}
111121
@@ -115,7 +125,8 @@ private fun onResponseHeaderBlockDone(
115125 userdata : COpaquePointer ? ,
116126): Int {
117127 val ctx = userdata?.asStableRef<HttpStreamContext >()?.get() ? : return AWS_OP_ERR
118- val stream = nativeStream?.let { HttpStreamNative (it) } ? : return AWS_OP_ERR
128+ val stream = ctx.stream ? : return AWS_OP_ERR
129+
119130 try {
120131 ctx.handler.onResponseHeadersDone(stream, blockType.value.toInt())
121132 } catch (ex: Exception ) {
@@ -132,7 +143,7 @@ private fun onIncomingBody(
132143 userdata : COpaquePointer ? ,
133144): Int {
134145 val ctx = userdata?.asStableRef<HttpStreamContext >()?.get() ? : return AWS_OP_ERR
135- val stream = nativeStream?. let { HttpStreamNative (it) } ? : return AWS_OP_ERR
146+ val stream = ctx.stream ? : return AWS_OP_ERR
136147
137148 try {
138149 val body = if (data != null ) ByteCursorBuffer (data) else Buffer .Empty
@@ -159,7 +170,8 @@ private fun onStreamComplete(
159170) {
160171 val stableRef = userdata?.asStableRef<HttpStreamContext >() ? : return
161172 val ctx = stableRef.get()
162- val stream = nativeStream?.let { HttpStreamNative (it) } ? : return
173+ val stream = ctx.stream ? : return
174+
163175 try {
164176 ctx.handler.onResponseComplete(stream, errorCode)
165177 } catch (ex: Exception ) {
0 commit comments