Skip to content

Commit 5a9edd4

Browse files
mengddBoD
andauthored
fix the batch size not respected issue (#6528)
* fix the batch size not respected issue * Add a unit test * Fix node tests hanging --------- Co-authored-by: BoD <[email protected]>
1 parent cd89b82 commit 5a9edd4

File tree

2 files changed

+70
-11
lines changed

2 files changed

+70
-11
lines changed

libraries/apollo-runtime/src/commonMain/kotlin/com/apollographql/apollo/network/http/BatchingHttpInterceptor.kt

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,22 +103,31 @@ class BatchingHttpInterceptor @JvmOverloads constructor(
103103
val sendNow = mutex.withLock {
104104
// if there was an error, the previous job was already canceled, ignore that error
105105
pendingRequests.add(pendingRequest)
106-
pendingRequests.size >= maxBatchSize
106+
val batchFull = pendingRequests.size >= maxBatchSize
107+
if (batchFull) {
108+
executePendingRequests(needLock = false)
109+
}
110+
batchFull
107111
}
108-
if (sendNow) {
109-
executePendingRequests()
110-
} else {
112+
113+
if (!sendNow) {
111114
scope.launch {
112115
delay(batchIntervalMillis - (startMark.elapsedNow().inWholeMilliseconds % batchIntervalMillis) - 1)
113-
executePendingRequests()
116+
executePendingRequests(needLock = true)
114117
}
115118
}
116119

117120
return pendingRequest.deferred.await()
118121
}
119122

120-
private suspend fun executePendingRequests() {
121-
val pending = mutex.withLock {
123+
private suspend fun executePendingRequests(needLock: Boolean) {
124+
val pending = if (needLock) {
125+
mutex.withLock {
126+
val copy = pendingRequests.toList()
127+
pendingRequests.clear()
128+
copy
129+
}
130+
} else {
122131
val copy = pendingRequests.toList()
123132
pendingRequests.clear()
124133
copy

tests/integration-tests/src/commonTest/kotlin/test/batching/QueryBatchingTest.kt

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,22 @@ import batching.GetLaunch2Query
44
import batching.GetLaunchQuery
55
import com.apollographql.apollo.ApolloClient
66
import com.apollographql.apollo.api.AnyAdapter
7+
import com.apollographql.apollo.api.ApolloResponse
78
import com.apollographql.apollo.api.CustomScalarAdapters
89
import com.apollographql.apollo.api.ExecutionOptions.Companion.CAN_BE_BATCHED
910
import com.apollographql.apollo.api.http.HttpHeader
1011
import com.apollographql.apollo.api.json.jsonReader
12+
import com.apollographql.apollo.testing.internal.runTest
13+
import com.apollographql.mockserver.MockRequest
14+
import com.apollographql.mockserver.MockRequestBase
15+
import com.apollographql.mockserver.MockResponse
1116
import com.apollographql.mockserver.MockServer
17+
import com.apollographql.mockserver.MockServerHandler
1218
import com.apollographql.mockserver.awaitRequest
1319
import com.apollographql.mockserver.enqueueString
14-
import com.apollographql.apollo.testing.internal.runTest
20+
import kotlinx.coroutines.Deferred
1521
import kotlinx.coroutines.async
22+
import kotlinx.coroutines.awaitAll
1623
import kotlinx.coroutines.delay
1724
import okio.Buffer
1825
import kotlin.test.Ignore
@@ -21,7 +28,9 @@ import kotlin.test.assertEquals
2128
import kotlin.test.assertFails
2229
import kotlin.test.assertFalse
2330
import kotlin.test.assertIs
31+
import kotlin.test.assertNotNull
2432
import kotlin.test.assertTrue
33+
import kotlin.time.Duration.Companion.seconds
2534

2635
class QueryBatchingTest {
2736
private lateinit var mockServer: MockServer
@@ -90,7 +99,7 @@ class QueryBatchingTest {
9099

91100
// Only one request must have been sent
92101
assertFails {
93-
mockServer.awaitRequest()
102+
mockServer.awaitRequest(3.seconds)
94103
}
95104
}
96105

@@ -217,7 +226,7 @@ class QueryBatchingTest {
217226

218227
// Only one request must have been sent
219228
assertFails {
220-
mockServer.awaitRequest()
229+
mockServer.awaitRequest(3.seconds)
221230
}
222231
}
223232

@@ -251,7 +260,7 @@ class QueryBatchingTest {
251260
val request = mockServer.awaitRequest()
252261
// Only one request must have been sent
253262
assertFails {
254-
mockServer.awaitRequest()
263+
mockServer.awaitRequest(3.seconds)
255264
}
256265
assertTrue(request.headers["client0"] == "0")
257266
assertTrue(request.headers["client1"] == "1")
@@ -296,4 +305,45 @@ class QueryBatchingTest {
296305
assertFalse(request.headers.keys.contains("query1+query2-different-value"))
297306
assertFalse(request.headers.keys.contains(CAN_BE_BATCHED))
298307
}
308+
309+
@Test
310+
fun batchSizeIsHonoredWithConcurrency() = runTest(before = { setUp() }, after = { tearDown() }) {
311+
mockServer.close()
312+
mockServer = MockServer.Builder().handler(object : MockServerHandler {
313+
override fun handle(request: MockRequestBase): MockResponse {
314+
val jsonReader = Buffer().write((request as MockRequest).body).jsonReader()
315+
jsonReader.beginArray()
316+
var arrayLength = 0
317+
while (jsonReader.hasNext()) {
318+
jsonReader.skipValue()
319+
arrayLength++
320+
}
321+
322+
// Check we never receive more than maxBatchSize queries
323+
assertTrue(arrayLength <= 10)
324+
325+
return MockResponse.Builder()
326+
.body("[" + List(arrayLength) { """{"data":{"launch":{"id":"83"}}}""" }.joinToString() + "]")
327+
.build()
328+
}
329+
}).build()
330+
331+
apolloClient = ApolloClient.Builder()
332+
.serverUrl(mockServer.url())
333+
.httpBatching(maxBatchSize = 10, batchIntervalMillis = 10)
334+
.build()
335+
336+
val deferredResults = mutableListOf<Deferred<ApolloResponse<GetLaunchQuery.Data>>>()
337+
repeat(100) {
338+
deferredResults += async {
339+
delay(1)
340+
apolloClient.query(GetLaunchQuery())
341+
.canBeBatched(true)
342+
.execute()
343+
}
344+
}
345+
for (apolloResponse in deferredResults.awaitAll()) {
346+
assertNotNull(apolloResponse.data)
347+
}
348+
}
299349
}

0 commit comments

Comments
 (0)