1
1
package com.apollographql.apollo.network.websocket
2
2
3
- import com.apollographql.apollo.annotations.ApolloDeprecatedSince
4
3
import com.apollographql.apollo.annotations.ApolloExperimental
5
4
import com.apollographql.apollo.api.ApolloRequest
6
5
import com.apollographql.apollo.api.ApolloResponse
7
6
import com.apollographql.apollo.api.CustomScalarAdapters
8
7
import com.apollographql.apollo.api.Operation
9
- import com.apollographql.apollo.api.http.HttpHeader
10
8
import com.apollographql.apollo.api.json.ApolloJsonElement
11
9
import com.apollographql.apollo.api.json.jsonReader
12
10
import com.apollographql.apollo.api.toApolloResponse
@@ -16,14 +14,16 @@ import com.apollographql.apollo.exception.SubscriptionOperationException
16
14
import com.apollographql.apollo.internal.DeferredJsonMerger
17
15
import com.apollographql.apollo.network.NetworkTransport
18
16
import com.apollographql.apollo.network.websocket.internal.OperationListener
19
- import com.apollographql.apollo.network.websocket.internal.WebSocketHolder
17
+ import com.apollographql.apollo.network.websocket.internal.WebSocketPool
20
18
import com.benasher44.uuid.uuid4
21
19
import kotlinx.coroutines.channels.Channel
22
20
import kotlinx.coroutines.channels.ProducerScope
23
21
import kotlinx.coroutines.channels.awaitClose
24
22
import kotlinx.coroutines.flow.Flow
25
23
import kotlinx.coroutines.flow.buffer
26
24
import kotlinx.coroutines.flow.callbackFlow
25
+ import kotlin.time.Duration
26
+ import kotlin.time.Duration.Companion.seconds
27
27
28
28
/* *
29
29
* A [NetworkTransport] that uses WebSockets to execute GraphQL operations. Most of the time, it is used
@@ -36,22 +36,20 @@ import kotlinx.coroutines.flow.callbackFlow
36
36
class WebSocketNetworkTransport private constructor(
37
37
private val webSocketEngine : WebSocketEngine ,
38
38
private val serverUrl : String ,
39
- private val httpHeaders : List <HttpHeader >,
40
39
private val wsProtocol : WsProtocol ,
41
- private val connectionAcknowledgeTimeoutMillis : Long ,
42
- private val pingIntervalMillis : Long ,
43
- private val idleTimeoutMillis : Long ,
40
+ private val connectionAcknowledgeTimeout : Duration ,
41
+ private val pingInterval : Duration ? ,
42
+ private val idleTimeout : Duration ,
44
43
private val parserFactory : SubscriptionParserFactory
45
44
) : NetworkTransport {
46
45
47
- private val holder = WebSocketHolder (
46
+ private val pool = WebSocketPool (
48
47
webSocketEngine = webSocketEngine,
49
48
serverUrl = serverUrl,
50
- httpHeaders = httpHeaders,
51
49
wsProtocol = wsProtocol,
52
- connectionAcknowledgeTimeoutMillis = connectionAcknowledgeTimeoutMillis ,
53
- pingIntervalMillis = pingIntervalMillis ,
54
- idleTimeoutMillis = idleTimeoutMillis
50
+ connectionAcknowledgeTimeout = connectionAcknowledgeTimeout ,
51
+ pingInterval = pingInterval ,
52
+ idleTimeout = idleTimeout
55
53
)
56
54
57
55
/* *
@@ -78,7 +76,8 @@ class WebSocketNetworkTransport private constructor(
78
76
79
77
val operationListener = DefaultOperationListener (newRequest, this , parserFactory.createParser(request))
80
78
81
- val webSocket = holder.acquire()
79
+ val webSocket = pool.acquire(newRequest.httpHeaders.orEmpty())
80
+
82
81
webSocket.startOperation(newRequest, operationListener)
83
82
84
83
awaitClose {
@@ -91,7 +90,7 @@ class WebSocketNetworkTransport private constructor(
91
90
}
92
91
93
92
override fun dispose () {
94
- holder .close()
93
+ pool .close()
95
94
}
96
95
97
96
/* *
@@ -102,18 +101,17 @@ class WebSocketNetworkTransport private constructor(
102
101
* The given [reason] will be propagated to active subscriptions.
103
102
*/
104
103
fun closeConnection (reason : ApolloException ) {
105
- holder.closeCurrentConnection (reason)
104
+ pool.closeAllConnections (reason)
106
105
}
107
106
108
107
@ApolloExperimental
109
108
class Builder {
110
109
private var serverUrl: String? = null
111
- private var httpHeaders: List <HttpHeader >? = null
112
110
private var webSocketEngine: WebSocketEngine ? = null
113
111
private var wsProtocol: WsProtocol ? = null
114
- private var connectionAcknowledgeTimeoutMillis : Long ? = null
115
- private var pingIntervalMillis : Long ? = null
116
- private var idleTimeoutMillis : Long ? = null
112
+ private var connectionAcknowledgeTimeout : Duration ? = null
113
+ private var pingInterval : Duration ? = null
114
+ private var idleTimeout : Duration ? = null
117
115
private var parserFactory: SubscriptionParserFactory ? = null
118
116
119
117
/* *
@@ -129,29 +127,6 @@ class WebSocketNetworkTransport private constructor(
129
127
this .serverUrl = serverUrl
130
128
}
131
129
132
- /* *
133
- * Headers to add to the HTTP handshake query.
134
- */
135
- fun httpHeaders (headers : List <HttpHeader >) = apply {
136
- this .httpHeaders = headers
137
- }
138
-
139
- /* *
140
- * Add a [HttpHeader] to the handshake query.
141
- */
142
- fun addHttpHeader (name : String , value : String ) = apply {
143
- this .httpHeaders = this .httpHeaders.orEmpty() + HttpHeader (name, value)
144
- }
145
-
146
- /* *
147
- * Add a [HttpHeader] to the handshake query.
148
- */
149
- @Deprecated(" use addHttpHeader instead" , ReplaceWith (" addHttpHeader(name, value)" ))
150
- @ApolloDeprecatedSince(ApolloDeprecatedSince .Version .v4_0_0)
151
- fun addHeader (name : String , value : String ) = apply {
152
- this .httpHeaders = this .httpHeaders.orEmpty() + HttpHeader (name, value)
153
- }
154
-
155
130
/* *
156
131
* Set the [WebSocketEngine] to use.
157
132
*/
@@ -160,12 +135,11 @@ class WebSocketNetworkTransport private constructor(
160
135
}
161
136
162
137
/* *
163
- * The number of milliseconds before a WebSocket with no active operations disconnects.
164
- *
165
- * Default: `60_000`
138
+ * @param idleTimeout the duration before a WebSocket with no active operations disconnects or
139
+ * null to use the default of 1 minute.
166
140
*/
167
- fun idleTimeoutMillis ( idleTimeoutMillis : Long ? ) = apply {
168
- this .idleTimeoutMillis = idleTimeoutMillis
141
+ fun idleTimeout ( idleTimeout : Duration ? ) = apply {
142
+ this .idleTimeout = idleTimeout
169
143
}
170
144
171
145
/* *
@@ -182,22 +156,19 @@ class WebSocketNetworkTransport private constructor(
182
156
}
183
157
184
158
/* *
185
- * The interval in milliseconds between two client pings or -1 to disable client pings.
159
+ * @param pingInterval the interval between two client pings or null to disable client pings.
186
160
* The [WsProtocol] used must also support client pings.
187
- *
188
- * Default: -1
189
161
*/
190
- fun pingIntervalMillis ( pingIntervalMillis : Long ? ) = apply {
191
- this .pingIntervalMillis = pingIntervalMillis
162
+ fun pingInterval ( pingInterval : Duration ? ) = apply {
163
+ this .pingInterval = pingInterval
192
164
}
193
165
194
166
/* *
195
- * The maximum number of milliseconds between a "connection_init" message and its acknowledgement
196
- *
197
- * Default: 10_000
167
+ * @param connectionAcknowledgeTimeout the maximum duration between a "connection_init" message and its acknowledgement
168
+ * or null to use the default of 10 seconds
198
169
*/
199
- fun connectionAcknowledgeTimeoutMillis ( connectionAcknowledgeTimeoutMillis : Long ? ) = apply {
200
- this .connectionAcknowledgeTimeoutMillis = connectionAcknowledgeTimeoutMillis
170
+ fun connectionAcknowledgeTimeout ( connectionAcknowledgeTimeout : Duration ? ) = apply {
171
+ this .connectionAcknowledgeTimeout = connectionAcknowledgeTimeout
201
172
}
202
173
203
174
@ApolloExperimental
@@ -213,11 +184,10 @@ class WebSocketNetworkTransport private constructor(
213
184
return WebSocketNetworkTransport (
214
185
webSocketEngine = webSocketEngine ? : WebSocketEngine (),
215
186
serverUrl = serverUrl ? : error(" Apollo: 'serverUrl' is required" ),
216
- httpHeaders = httpHeaders.orEmpty(),
217
- idleTimeoutMillis = idleTimeoutMillis ? : 60_000 ,
187
+ idleTimeout = idleTimeout ? : 60 .seconds,
218
188
wsProtocol = wsProtocol ? : GraphQLWsProtocol { null },
219
- pingIntervalMillis = pingIntervalMillis ? : - 1L ,
220
- connectionAcknowledgeTimeoutMillis = connectionAcknowledgeTimeoutMillis ? : 10_000L ,
189
+ pingInterval = pingInterval ,
190
+ connectionAcknowledgeTimeout = connectionAcknowledgeTimeout ? : 10 .seconds ,
221
191
parserFactory = parserFactory ? : DefaultSubscriptionParserFactory
222
192
)
223
193
}
@@ -260,10 +230,10 @@ private class DefaultSubscriptionParser<D : Operation.Data>(private val request:
260
230
deferredJsonMerger.reset()
261
231
}
262
232
263
- if (deferredJsonMerger.isEmptyPayload) {
264
- return null
233
+ return if (deferredJsonMerger.isEmptyPayload) {
234
+ null
265
235
} else {
266
- return apolloResponse
236
+ apolloResponse
267
237
}
268
238
}
269
239
}
0 commit comments