@@ -46,115 +46,118 @@ class FoundationStream : NSObject, WebSocketStream, StreamDelegate, SOCKSProxyab
46
46
private let workQueue = DispatchQueue ( label: " com.apollographql.websocket " , attributes: [ ] )
47
47
private var inputStream : InputStream ?
48
48
private var outputStream : OutputStream ?
49
+ private let serialQueue = DispatchQueue ( label: " com.apollographql.WebSocketStream.serial " , qos: . background)
49
50
weak var delegate : ( any WebSocketStreamDelegate ) ?
50
51
let BUFFER_MAX = 4096
51
52
52
53
var enableSOCKSProxy = false
53
54
54
55
func connect( url: URL , port: Int , timeout: TimeInterval , ssl: SSLSettings , completion: @escaping ( ( ( any Error ) ? ) -> Void ) ) {
55
- var readStream : Unmanaged < CFReadStream > ?
56
- var writeStream : Unmanaged < CFWriteStream > ?
57
- let h = url. host! as NSString
58
- CFStreamCreatePairWithSocketToHost ( nil , h, UInt32 ( port) , & readStream, & writeStream)
59
- inputStream = readStream!. takeRetainedValue ( )
60
- outputStream = writeStream!. takeRetainedValue ( )
61
-
62
- #if os(watchOS) //watchOS us unfortunately is missing the kCFStream properties to make this work
63
- #else
64
- if enableSOCKSProxy {
65
- let proxyDict = CFNetworkCopySystemProxySettings ( )
66
- let socksConfig = CFDictionaryCreateMutableCopy ( nil , 0 , proxyDict!. takeRetainedValue ( ) )
67
- let propertyKey = CFStreamPropertyKey ( rawValue: kCFStreamPropertySOCKSProxy)
68
- CFWriteStreamSetProperty ( outputStream, propertyKey, socksConfig)
69
- CFReadStreamSetProperty ( inputStream, propertyKey, socksConfig)
70
- }
71
- #endif
72
-
73
- guard let inStream = inputStream, let outStream = outputStream else { return }
74
- inStream. delegate = self
75
- outStream. delegate = self
76
- if ssl. useSSL {
77
- inStream. setProperty ( StreamSocketSecurityLevel . negotiatedSSL as AnyObject , forKey: Stream . PropertyKey. socketSecurityLevelKey)
78
- outStream. setProperty ( StreamSocketSecurityLevel . negotiatedSSL as AnyObject , forKey: Stream . PropertyKey. socketSecurityLevelKey)
56
+ serialQueue. sync {
57
+ var readStream : Unmanaged < CFReadStream > ?
58
+ var writeStream : Unmanaged < CFWriteStream > ?
59
+ let h = url. host! as NSString
60
+ CFStreamCreatePairWithSocketToHost ( nil , h, UInt32 ( port) , & readStream, & writeStream)
61
+ inputStream = readStream!. takeRetainedValue ( )
62
+ outputStream = writeStream!. takeRetainedValue ( )
63
+
79
64
#if os(watchOS) //watchOS us unfortunately is missing the kCFStream properties to make this work
80
65
#else
81
- var settings = [ NSObject: NSObject] ( )
82
- if ssl. disableCertValidation {
83
- settings [ kCFStreamSSLValidatesCertificateChain] = NSNumber ( value: false )
84
- }
85
- if ssl. overrideTrustHostname {
86
- if let hostname = ssl. desiredTrustHostname {
87
- settings [ kCFStreamSSLPeerName] = hostname as NSString
88
- } else {
89
- settings [ kCFStreamSSLPeerName] = kCFNull
90
- }
91
- }
92
- if let sslClientCertificate = ssl. sslClientCertificate {
93
- settings [ kCFStreamSSLCertificates] = sslClientCertificate. streamSSLCertificates
66
+ if enableSOCKSProxy {
67
+ let proxyDict = CFNetworkCopySystemProxySettings ( )
68
+ let socksConfig = CFDictionaryCreateMutableCopy ( nil , 0 , proxyDict!. takeRetainedValue ( ) )
69
+ let propertyKey = CFStreamPropertyKey ( rawValue: kCFStreamPropertySOCKSProxy)
70
+ CFWriteStreamSetProperty ( outputStream, propertyKey, socksConfig)
71
+ CFReadStreamSetProperty ( inputStream, propertyKey, socksConfig)
94
72
}
95
-
96
- inStream. setProperty ( settings, forKey: kCFStreamPropertySSLSettings as Stream . PropertyKey )
97
- outStream. setProperty ( settings, forKey: kCFStreamPropertySSLSettings as Stream . PropertyKey )
98
73
#endif
99
74
100
- #if os(Linux)
101
- #else
102
- if let cipherSuites = ssl. cipherSuites {
75
+ guard let inStream = inputStream, let outStream = outputStream else { return }
76
+ inStream. delegate = self
77
+ outStream. delegate = self
78
+ if ssl. useSSL {
79
+ inStream. setProperty ( StreamSocketSecurityLevel . negotiatedSSL as AnyObject , forKey: Stream . PropertyKey. socketSecurityLevelKey)
80
+ outStream. setProperty ( StreamSocketSecurityLevel . negotiatedSSL as AnyObject , forKey: Stream . PropertyKey. socketSecurityLevelKey)
103
81
#if os(watchOS) //watchOS us unfortunately is missing the kCFStream properties to make this work
104
82
#else
105
- if let sslContextIn = CFReadStreamCopyProperty ( inputStream , CFStreamPropertyKey ( rawValue : kCFStreamPropertySSLContext ) ) as! SSLContext ? ,
106
- let sslContextOut = CFWriteStreamCopyProperty ( outputStream , CFStreamPropertyKey ( rawValue : kCFStreamPropertySSLContext ) ) as! SSLContext ? {
107
- let resIn = SSLSetEnabledCiphers ( sslContextIn , cipherSuites , cipherSuites . count )
108
- let resOut = SSLSetEnabledCiphers ( sslContextOut , cipherSuites , cipherSuites . count )
109
- if resIn != errSecSuccess {
110
- completion ( WebSocket . WSError (
111
- type : . invalidSSLError ,
112
- message : " Error setting ingoing cypher suites " ,
113
- code : Int ( resIn ) ) )
83
+ var settings = [ NSObject : NSObject ] ( )
84
+ if ssl . disableCertValidation {
85
+ settings [ kCFStreamSSLValidatesCertificateChain ] = NSNumber ( value : false )
86
+ }
87
+ if ssl . overrideTrustHostname {
88
+ if let hostname = ssl . desiredTrustHostname {
89
+ settings [ kCFStreamSSLPeerName ] = hostname as NSString
90
+ } else {
91
+ settings [ kCFStreamSSLPeerName ] = kCFNull
114
92
}
115
- if resOut != errSecSuccess {
116
- completion ( WebSocket . WSError (
117
- type: . invalidSSLError,
118
- message: " Error setting outgoing cypher suites " ,
119
- code: Int ( resOut) ) )
93
+ }
94
+ if let sslClientCertificate = ssl. sslClientCertificate {
95
+ settings [ kCFStreamSSLCertificates] = sslClientCertificate. streamSSLCertificates
96
+ }
97
+
98
+ inStream. setProperty ( settings, forKey: kCFStreamPropertySSLSettings as Stream . PropertyKey )
99
+ outStream. setProperty ( settings, forKey: kCFStreamPropertySSLSettings as Stream . PropertyKey )
100
+ #endif
101
+
102
+ #if os(Linux)
103
+ #else
104
+ if let cipherSuites = ssl. cipherSuites {
105
+ #if os(watchOS) //watchOS us unfortunately is missing the kCFStream properties to make this work
106
+ #else
107
+ if let sslContextIn = CFReadStreamCopyProperty ( inputStream, CFStreamPropertyKey ( rawValue: kCFStreamPropertySSLContext) ) as! SSLContext ? ,
108
+ let sslContextOut = CFWriteStreamCopyProperty ( outputStream, CFStreamPropertyKey ( rawValue: kCFStreamPropertySSLContext) ) as! SSLContext ? {
109
+ let resIn = SSLSetEnabledCiphers ( sslContextIn, cipherSuites, cipherSuites. count)
110
+ let resOut = SSLSetEnabledCiphers ( sslContextOut, cipherSuites, cipherSuites. count)
111
+ if resIn != errSecSuccess {
112
+ completion ( WebSocket . WSError (
113
+ type: . invalidSSLError,
114
+ message: " Error setting ingoing cypher suites " ,
115
+ code: Int ( resIn) ) )
116
+ }
117
+ if resOut != errSecSuccess {
118
+ completion ( WebSocket . WSError (
119
+ type: . invalidSSLError,
120
+ message: " Error setting outgoing cypher suites " ,
121
+ code: Int ( resOut) ) )
122
+ }
120
123
}
124
+ #endif
121
125
}
122
126
#endif
123
127
}
124
- #endif
125
- }
126
128
127
- CFReadStreamSetDispatchQueue ( inStream, workQueue)
128
- CFWriteStreamSetDispatchQueue ( outStream, workQueue)
129
- inStream. open ( )
130
- outStream. open ( )
131
-
132
- var out = timeout// wait X seconds before giving up
133
- workQueue. async { [ weak self] in
134
- while !outStream. hasSpaceAvailable {
135
- usleep ( 100 ) // wait until the socket is ready
136
- out -= 100
137
- if out < 0 {
138
- completion (
139
- WebSocket . WSError (
140
- type: . writeTimeoutError,
141
- message: " Timed out waiting for the socket to be ready for a write " ,
142
- code: 0 ) )
143
- return
144
-
145
- } else if let error = outStream. streamError {
146
- completion ( error)
147
- return // disconnectStream will be called.
148
-
149
- } else if self == nil {
150
- completion ( WebSocket . WSError (
151
- type: . closeError,
152
- message: " socket object has been dereferenced " ,
153
- code: 0 ) )
154
- return
129
+ CFReadStreamSetDispatchQueue ( inStream, workQueue)
130
+ CFWriteStreamSetDispatchQueue ( outStream, workQueue)
131
+ inStream. open ( )
132
+ outStream. open ( )
133
+
134
+ var out = timeout// wait X seconds before giving up
135
+ workQueue. async { [ weak self] in
136
+ while !outStream. hasSpaceAvailable {
137
+ usleep ( 100 ) // wait until the socket is ready
138
+ out -= 100
139
+ if out < 0 {
140
+ completion (
141
+ WebSocket . WSError (
142
+ type: . writeTimeoutError,
143
+ message: " Timed out waiting for the socket to be ready for a write " ,
144
+ code: 0 ) )
145
+ return
146
+
147
+ } else if let error = outStream. streamError {
148
+ completion ( error)
149
+ return // disconnectStream will be called.
150
+
151
+ } else if self == nil {
152
+ completion ( WebSocket . WSError (
153
+ type: . closeError,
154
+ message: " socket object has been dereferenced " ,
155
+ code: 0 ) )
156
+ return
157
+ }
155
158
}
159
+ completion ( nil ) //success!
156
160
}
157
- completion ( nil ) //success!
158
161
}
159
162
}
160
163
@@ -176,18 +179,20 @@ class FoundationStream : NSObject, WebSocketStream, StreamDelegate, SOCKSProxyab
176
179
}
177
180
178
181
func cleanup( ) {
179
- if let stream = inputStream {
180
- stream. delegate = nil
181
- CFReadStreamSetDispatchQueue ( stream, nil )
182
- stream. close ( )
183
- }
184
- if let stream = outputStream {
185
- stream. delegate = nil
186
- CFWriteStreamSetDispatchQueue ( stream, nil )
187
- stream. close ( )
182
+ serialQueue. sync {
183
+ if let stream = inputStream {
184
+ stream. delegate = nil
185
+ CFReadStreamSetDispatchQueue ( stream, nil )
186
+ stream. close ( )
187
+ }
188
+ if let stream = outputStream {
189
+ stream. delegate = nil
190
+ CFWriteStreamSetDispatchQueue ( stream, nil )
191
+ stream. close ( )
192
+ }
193
+ outputStream = nil
194
+ inputStream = nil
188
195
}
189
- outputStream = nil
190
- inputStream = nil
191
196
}
192
197
193
198
#if os(Linux) || os(watchOS)
0 commit comments