@@ -91,121 +91,95 @@ private fun onResponseHeaders(
9191 headerArray : CPointer <aws_http_header>? ,
9292 numHeaders : size_t,
9393 userdata : COpaquePointer ? ,
94- ): Int {
95- val stableRef = dereferenceUserdata(userdata) ? : return callbackError()
96- val ctx = stableRef.safeGet() ? : return callbackError()
97- val stream = ctx.stream ? : return callbackError()
98-
99- val hdrCnt = numHeaders.toInt()
100- val headers: List <HttpHeader >? = if (hdrCnt > 0 && headerArray != null ) {
101- val kheaders = mutableListOf<HttpHeader >()
102- for (i in 0 until hdrCnt) {
103- val nativeHdr = headerArray[i]
104- val hdr = HttpHeader (nativeHdr.name.toKString(), nativeHdr.value.toKString())
105- kheaders.add(hdr)
106- }
107- kheaders
108- } else {
109- null
110- }
111-
112- try {
113- ctx.handler.onResponseHeaders(stream, stream.responseStatusCode, blockType.value.toInt(), headers)
114- } catch (ex: Exception ) {
115- log(LogLevel .Error , " onResponseHeaders: $ex " )
116- return callbackError()
117- }
94+ ): Int =
95+ userdata?.withDereferenced<HttpStreamContext , _ > { ctx ->
96+ ctx.stream?.let { stream ->
97+ val hdrCnt = numHeaders.toInt()
98+ val headers: List <HttpHeader >? = if (hdrCnt > 0 && headerArray != null ) {
99+ val kheaders = mutableListOf<HttpHeader >()
100+ for (i in 0 until hdrCnt) {
101+ val nativeHdr = headerArray[i]
102+ val hdr = HttpHeader (nativeHdr.name.toKString(), nativeHdr.value.toKString())
103+ kheaders.add(hdr)
104+ }
105+ kheaders
106+ } else {
107+ null
108+ }
118109
119- return AWS_OP_SUCCESS
120- }
110+ try {
111+ ctx.handler.onResponseHeaders(stream, stream.responseStatusCode, blockType.value.toInt(), headers)
112+ AWS_OP_SUCCESS
113+ } catch (ex: Exception ) {
114+ log(LogLevel .Error , " onResponseHeaders: $ex " )
115+ null
116+ }
117+ }
118+ } ? : callbackError()
121119
122120private fun onResponseHeaderBlockDone (
123121 nativeStream : CPointer <cnames.structs.aws_http_stream>? ,
124122 blockType : aws_http_header_block,
125123 userdata : COpaquePointer ? ,
126- ): Int {
127- val stableRef = dereferenceUserdata(userdata) ? : return callbackError()
128- val ctx = stableRef.safeGet() ? : return callbackError()
129- val stream = ctx.stream ? : return callbackError()
130-
131- try {
132- ctx.handler.onResponseHeadersDone(stream, blockType.value.toInt())
133- } catch (ex: Exception ) {
134- log(LogLevel .Error , " onResponseHeaderBlockDone: $ex " )
135- return callbackError()
136- }
137-
138- return AWS_OP_SUCCESS
139- }
124+ ): Int =
125+ userdata?.withDereferenced<HttpStreamContext , _ > { ctx ->
126+ ctx.stream?.let { stream ->
127+ try {
128+ ctx.handler.onResponseHeadersDone(stream, blockType.value.toInt())
129+ AWS_OP_SUCCESS
130+ } catch (ex: Exception ) {
131+ log(LogLevel .Error , " onResponseHeaderBlockDone: $ex " )
132+ null
133+ }
134+ }
135+ } ? : callbackError()
140136
141137private fun onIncomingBody (
142138 nativeStream : CPointer <cnames.structs.aws_http_stream>? ,
143139 data : CPointer <aws_byte_cursor>? ,
144140 userdata : COpaquePointer ? ,
145- ): Int {
146- val stableRef = dereferenceUserdata(userdata) ? : return callbackError()
147- val ctx = stableRef.safeGet() ? : return callbackError()
148- val stream = ctx.stream ? : return callbackError()
149-
150- try {
151- val body = if (data != null ) ByteCursorBuffer (data) else Buffer .Empty
152- val windowIncrement = ctx.handler.onResponseBody(stream, body)
153- if (windowIncrement < 0 ) {
154- return callbackError()
155- }
156-
157- if (windowIncrement > 0 ) {
158- aws_http_stream_update_window(nativeStream, windowIncrement.convert())
141+ ): Int =
142+ userdata?.withDereferenced<HttpStreamContext , _ > { ctx ->
143+ ctx.stream?.let { stream ->
144+ try {
145+ val body = if (data != null ) ByteCursorBuffer (data) else Buffer .Empty
146+ val windowIncrement = ctx.handler.onResponseBody(stream, body)
147+
148+ if (windowIncrement < 0 ) {
149+ null
150+ } else {
151+ if (windowIncrement > 0 ) {
152+ aws_http_stream_update_window(nativeStream, windowIncrement.convert())
153+ }
154+ AWS_OP_SUCCESS
155+ }
156+ } catch (ex: Exception ) {
157+ log(LogLevel .Error , " onIncomingBody: $ex " )
158+ null
159+ }
159160 }
160- } catch (ex: Exception ) {
161- log(LogLevel .Error , " onIncomingBody: $ex " )
162- return callbackError()
163- }
164-
165- return AWS_OP_SUCCESS
166- }
161+ } ? : callbackError()
167162
168163private fun onStreamComplete (
169164 nativeStream : CPointer <cnames.structs.aws_http_stream>? ,
170165 errorCode : Int ,
171166 userdata : COpaquePointer ? ,
172167) {
173- val stableRef = dereferenceUserdata(userdata) ? : return
174- try {
175- val ctx = stableRef.safeGet() ? : return
168+ userdata?.withDereferenced<HttpStreamContext >(dispose = true ) { ctx ->
176169 try {
177170 val stream = ctx.stream ? : return
178171 ctx.handler.onResponseComplete(stream, errorCode)
172+ } catch (ex: Exception ) {
173+ log(LogLevel .Error , " onStreamComplete: $ex " )
174+ // close connection if callback throws an exception
175+ aws_http_connection_close(aws_http_stream_get_connection(nativeStream))
179176 } finally {
180177 // cleanup request object
181178 aws_http_message_release(ctx.nativeReq)
182179 }
183- } catch (ex: Exception ) {
184- log(LogLevel .Error , " onStreamComplete: $ex " )
185- // close connection if callback throws an exception
186- aws_http_connection_close(aws_http_stream_get_connection(nativeStream))
187- } finally {
188- // cleanup userdata
189- stableRef.dispose()
190180 }
191181}
192182
193- private fun dereferenceUserdata (userdata : COpaquePointer ? ): StableRef <HttpStreamContext >? =
194- try {
195- userdata?.asStableRef<HttpStreamContext >()
196- } catch (_: NullPointerException ) {
197- // `asStableRef()` can throw `NullPointerException` when target type can't be coerced to HttpStreamContext
198- null
199- }
200-
201- private fun <T : Any > StableRef<T>.safeGet (): T ? =
202- try {
203- get()
204- } catch (_: NullPointerException ) {
205- // `get()` can throw `NullPointerException` when stream has been canceled and CRT is cleaning up resources
206- null
207- }
208-
209183internal fun HttpRequest.toNativeRequest (): CPointer <cnames.structs.aws_http_message> {
210184 val nativeReq = checkNotNull(
211185 aws_http_message_new_request(Allocator .Default ),
0 commit comments