@@ -16,56 +16,50 @@ public class CRTClientEngine: HttpClientEngine {
1616
1717 private let windowSize : Int
1818 private let maxConnectionsPerEndpoint : Int
19- private var connectionPools : [ Endpoint : HttpClientConnectionManager ] = [ : ]
19+ private var connectionPools : [ Endpoint : HTTPClientConnectionManager ] = [ : ]
20+ private let sharedDefaultIO = SDKDefaultIO . shared
2021
2122 init ( config: CRTClientEngineConfig ) {
2223 self . windowSize = config. windowSize
2324 self . maxConnectionsPerEndpoint = config. maxConnectionsPerEndpoint
2425 self . logger = SwiftLogger ( label: " SerialExecutor " )
2526 }
2627
27- func getOrCreateConnectionPool( endpoint: Endpoint ) -> HttpClientConnectionManager {
28+ func getOrCreateConnectionPool( endpoint: Endpoint ) throws -> HTTPClientConnectionManager {
2829 guard let connectionPool = connectionPools [ endpoint] else {
29- let newConnectionPool = createConnectionPool ( endpoint: endpoint)
30+ let newConnectionPool = try createConnectionPool ( endpoint: endpoint)
3031 connectionPools [ endpoint] = newConnectionPool // save in dictionary
3132 return newConnectionPool
3233 }
3334
3435 return connectionPool
3536 }
3637
37- func closeAllPendingConnections( ) {
38- for (endpoint, value) in connectionPools {
39- logger. debug ( " Connection to endpoint: \( String ( describing: endpoint. url? . absoluteString) ) is closing " )
40- value. closePendingConnections ( )
41- }
42- }
38+ private func createConnectionPool( endpoint: Endpoint ) throws -> HTTPClientConnectionManager {
39+ let tlsConnectionOptions = TLSConnectionOptions (
40+ context: sharedDefaultIO. tlsContext,
41+ serverName: endpoint. host
42+ )
4343
44- private func createConnectionPool( endpoint: Endpoint ) -> HttpClientConnectionManager {
45- let tlsConnectionOptions = SDKDefaultIO . shared. tlsContext. newConnectionOptions ( )
46- do {
47- try tlsConnectionOptions. setServerName ( endpoint. host)
48- } catch let err {
49- logger. error ( " Server name was not able to be set in TLS Connection Options. TLS Negotiation will fail. " )
50- logger. error ( " Error: \( err. localizedDescription) " )
51- }
5244 let socketOptions = SocketOptions ( socketType: . stream)
5345 #if os(iOS) || os(watchOS)
5446 socketOptions. connectTimeoutMs = 30_000
5547 #endif
56- let options = HttpClientConnectionOptions ( clientBootstrap: SDKDefaultIO . shared. clientBootstrap,
57- hostName: endpoint. host,
58- initialWindowSize: windowSize,
59- port: UInt16 ( endpoint. port) ,
60- proxyOptions: nil ,
61- socketOptions: socketOptions,
62- tlsOptions: tlsConnectionOptions,
63- monitoringOptions: nil ,
64- maxConnections: maxConnectionsPerEndpoint,
65- enableManualWindowManagement: false ) // not using backpressure yet
48+ let options = HTTPClientConnectionOptions (
49+ clientBootstrap: sharedDefaultIO. clientBootstrap,
50+ hostName: endpoint. host,
51+ initialWindowSize: windowSize,
52+ port: UInt16 ( endpoint. port) ,
53+ proxyOptions: nil ,
54+ socketOptions: socketOptions,
55+ tlsOptions: tlsConnectionOptions,
56+ monitoringOptions: nil ,
57+ maxConnections: maxConnectionsPerEndpoint,
58+ enableManualWindowManagement: false
59+ ) // not using backpressure yet
6660 logger. debug ( " Creating connection pool for \( String ( describing: endpoint. url? . absoluteString) ) " +
6761 " with max connections: \( maxConnectionsPerEndpoint) " )
68- return HttpClientConnectionManager ( options: options)
62+ return try HTTPClientConnectionManager ( options: options)
6963 }
7064 }
7165
@@ -78,7 +72,6 @@ public class CRTClientEngine: HttpClientEngine {
7872
7973 private let windowSize : Int
8074 private let maxConnectionsPerEndpoint : Int
81- private let sharedDefaultIO : SDKDefaultIO = SDKDefaultIO . shared
8275
8376 init ( config: CRTClientEngineConfig = CRTClientEngineConfig ( ) ) {
8477 self . maxConnectionsPerEndpoint = config. maxConnectionsPerEndpoint
@@ -88,55 +81,58 @@ public class CRTClientEngine: HttpClientEngine {
8881 }
8982
9083 public func execute( request: SdkHttpRequest ) async throws -> HttpResponse {
91- let connectionMgr = await serialExecutor. getOrCreateConnectionPool ( endpoint: request. endpoint)
84+ let connectionMgr = try await serialExecutor. getOrCreateConnectionPool ( endpoint: request. endpoint)
9285 let connection = try await connectionMgr. acquireConnection ( )
9386 self . logger. debug ( " Connection was acquired to: \( String ( describing: request. endpoint. url? . absoluteString) ) " )
9487 return try await withCheckedThrowingContinuation ( { ( continuation: StreamContinuation ) in
95- let requestOptions = makeHttpRequestStreamOptions ( request, continuation)
96- let stream = connection. makeRequest ( requestOptions: requestOptions)
97- stream. activate ( )
88+ do {
89+ let requestOptions = try makeHttpRequestStreamOptions ( request, continuation)
90+ let stream = try connection. makeRequest ( requestOptions: requestOptions)
91+ try stream. activate ( )
92+ } catch {
93+ continuation. resume ( throwing: error)
94+ }
9895 } )
9996 }
10097
101- public func close( ) async {
102- await serialExecutor. closeAllPendingConnections ( )
103- }
104-
10598 public func makeHttpRequestStreamOptions(
10699 _ request: SdkHttpRequest ,
107100 _ continuation: StreamContinuation
108- ) -> HttpRequestOptions {
101+ ) throws -> HTTPRequestOptions {
109102 let response = HttpResponse ( )
110- let crtRequest = request. toHttpRequest ( bufferSize : windowSize )
103+ let crtRequest = try request. toHttpRequest ( )
111104 let streamReader : StreamReader = DataStreamReader ( )
105+
106+ let makeStatusCode : ( HTTPStream ) -> HttpStatusCode = { stream in
107+ guard
108+ let statusCodeInt = try ? stream. statusCode ( ) ,
109+ let statusCode = HttpStatusCode ( rawValue: statusCodeInt)
110+ else { return . notFound }
111+ return statusCode
112+ }
112113
113- let requestOptions = HttpRequestOptions ( request: crtRequest) { [ self ] ( stream, _, httpHeaders) in
114+ let requestOptions = HTTPRequestOptions ( request: crtRequest) { [ self ] ( stream, _, httpHeaders) in
114115 logger. debug ( " headers were received " )
115- response. statusCode = HttpStatusCode ( rawValue : Int ( stream. statusCode ) ) ?? HttpStatusCode . notFound
116+ response. statusCode = makeStatusCode ( stream)
116117 response. headers. addAll ( httpHeaders: httpHeaders)
117118 } onIncomingHeadersBlockDone: { [ self ] ( stream, _) in
118119 logger. debug ( " header block is done " )
119- response. statusCode = HttpStatusCode ( rawValue : Int ( stream. statusCode ) ) ?? HttpStatusCode . notFound
120+ response. statusCode = makeStatusCode ( stream)
120121 } onIncomingBody: { [ self ] ( stream, data) in
121122 logger. debug ( " incoming data " )
122- response. statusCode = HttpStatusCode ( rawValue : Int ( stream. statusCode ) ) ?? HttpStatusCode . notFound
123+ response. statusCode = makeStatusCode ( stream)
123124 let byteBuffer = ByteBuffer ( data: data)
124125 streamReader. write ( buffer: byteBuffer)
125126 } onStreamComplete: { [ self ] ( stream, error) in
126127 logger. debug ( " stream completed " )
127- streamReader. hasFinishedWriting = true
128- if case let CRTError . crtError( unwrappedError) = error {
129- if unwrappedError. errorCode != 0 {
130- logger. error ( " Response encountered an error: \( error) " )
131- streamReader. onError ( error: ClientError . crtError ( error) )
132- continuation. resume ( throwing: error)
133- return
134- }
128+ if let error = error, error. code != 0 {
129+ logger. error ( " Response encountered an error: \( error) " )
130+ continuation. resume ( throwing: CommonRunTimeError . crtError ( error) )
131+ return
135132 }
136-
133+
137134 response. body = . stream( . reader( streamReader) )
138-
139- response. statusCode = HttpStatusCode ( rawValue: Int ( stream. statusCode) ) ?? HttpStatusCode . notFound
135+ response. statusCode = makeStatusCode ( stream)
140136
141137 continuation. resume ( returning: response)
142138 }
0 commit comments