@@ -34,6 +34,7 @@ internal class NativeClientCall<Request, Response>(
34
34
}
35
35
36
36
init {
37
+ // cancel the call if the job is canceled.
37
38
callJob.invokeOnCompletion {
38
39
when (it) {
39
40
is CancellationException -> {
@@ -63,6 +64,9 @@ internal class NativeClientCall<Request, Response>(
63
64
// if null, the call is still in progress. otherwise, the call can be closed as soon as inFlight is 0.
64
65
private val closeInfo = atomic<Pair <Status , GrpcTrailers >? > (null )
65
66
67
+ // we currently don't buffer messages, so after one `sendMessage` call, ready turns false.
68
+ private val ready = atomic(true )
69
+
66
70
/* *
67
71
* Increments the [inFlight] counter by one.
68
72
* This should be called before starting a batch.
@@ -113,6 +117,16 @@ internal class NativeClientCall<Request, Response>(
113
117
}
114
118
}
115
119
120
+ /* *
121
+ * Sets the [ready] flag to true and calls the listener's onReady callback.
122
+ * This is called as soon as the RECV_MESSAGE batch is finished (or failed).
123
+ */
124
+ private fun turnReady () {
125
+ if (ready.compareAndSet(expect = false , update = true )) {
126
+ listener?.onReady()
127
+ }
128
+ }
129
+
116
130
117
131
override fun start (
118
132
responseListener : Listener <Response >,
@@ -124,13 +138,19 @@ internal class NativeClientCall<Request, Response>(
124
138
listener = responseListener
125
139
126
140
// start receiving the status from the completion queue,
127
- // which is bound to the lifecycle of the call.
128
- val success = initializeCallOnCQ ()
141
+ // which is bound to the lifetime of the call.
142
+ val success = startRecvStatus ()
129
143
if (! success) return
130
144
145
+ // send and receive initial headers to/from the server
131
146
sendAndReceiveInitialMetadata()
132
147
}
133
148
149
+ /* *
150
+ * Submits a batch operation to the [CompletionQueue] and handle the returned [BatchResult].
151
+ * If the batch was successfully submitted, [onSuccess] is called.
152
+ * In any case, [cleanup] is called.
153
+ */
134
154
private fun runBatch (
135
155
ops : CPointer <grpc_op>,
136
156
nOps : ULong ,
@@ -175,12 +195,19 @@ internal class NativeClientCall<Request, Response>(
175
195
}
176
196
}
177
197
198
+ /* *
199
+ * Starts a batch operation to receive the status from the completion queue.
200
+ * This operation is bound to the lifetime of the call, so it will finish once all other operations are done.
201
+ * If this operation fails, it will call [markClosePending] with the corresponding error, as the entire call
202
+ * si considered failed.
203
+ *
204
+ * @return true if the batch was successfully submitted, false otherwise.
205
+ * In this case, the call is considered failed.
206
+ */
178
207
@OptIn(ExperimentalStdlibApi ::class )
179
- private fun initializeCallOnCQ (): Boolean {
208
+ private fun startRecvStatus (): Boolean {
180
209
checkNotNull(listener) { " Not yet started" }
181
210
val arena = Arena ()
182
- // this must not be canceled as it sets the call status.
183
- // if the client itself got canceled, this will return fast.
184
211
val statusCode = arena.alloc< grpc_status_code.Var > ()
185
212
val statusDetails = arena.alloc< grpc_slice> ()
186
213
val errorStr = arena.alloc<CPointerVar <ByteVar >>()
@@ -253,12 +280,19 @@ internal class NativeClientCall<Request, Response>(
253
280
}
254
281
}
255
282
283
+ /* *
284
+ * Requests [numMessages] messages from the server.
285
+ * This must only be called again after [numMessages] were received in the [Listener.onMessage] callback.
286
+ */
256
287
override fun request (numMessages : Int ) {
257
288
check(numMessages > 0 ) { " numMessages must be > 0" }
258
289
val listener = checkNotNull(listener) { " Not yet started" }
259
290
check(! cancelled) { " Already cancelled" }
260
291
261
292
var remainingMessages = numMessages
293
+
294
+ // we need to request only one message at a time, so we use a recursive function that
295
+ // requests one message and then calls itself again.
262
296
fun post () {
263
297
if (remainingMessages-- <= 0 ) return
264
298
@@ -272,13 +306,16 @@ internal class NativeClientCall<Request, Response>(
272
306
if (recvPtr.value != null ) grpc_byte_buffer_destroy(recvPtr.value)
273
307
arena.clear()
274
308
}) {
275
- val buf = recvPtr.value ? : return @runBatch // EOS
309
+ // if the call was successful, but no message was received, we reached the end-of-stream.
310
+ val buf = recvPtr.value ? : return @runBatch
276
311
val msg = methodDescriptor.getResponseMarshaller()
277
312
.parse(buf.toKotlin().asInputStream())
278
313
listener.onMessage(msg)
279
- post() // post next only now
314
+ post()
280
315
}
281
316
}
317
+
318
+ // start requesting messages
282
319
post()
283
320
}
284
321
@@ -310,19 +347,31 @@ internal class NativeClientCall<Request, Response>(
310
347
}
311
348
}
312
349
350
+ override fun isReady (): Boolean = ready.value
351
+
313
352
override fun sendMessage (message : Request ) {
314
353
checkNotNull(listener) { " Not yet started" }
315
354
check(! halfClosed) { " Already half closed." }
316
355
check(! cancelled) { " Already cancelled." }
356
+ check(isReady()) { " Not yet ready." }
357
+
358
+ // set ready false, as only one message can be sent at a time.
359
+ ready.value = false
317
360
318
361
val arena = Arena ()
319
362
val inputStream = methodDescriptor.getRequestMarshaller().stream(message)
320
363
val byteBuffer = inputStream.asSource().toGrpcByteBuffer()
364
+
321
365
val op = arena.alloc< grpc_op> {
322
366
op = GRPC_OP_SEND_MESSAGE
323
367
data.send_message.send_message = byteBuffer
324
368
}
369
+
325
370
runBatch(op.ptr, 1u , cleanup = {
371
+ // no mater what happens, we need to set ready to true again.
372
+ turnReady()
373
+
374
+ // actual cleanup
326
375
grpc_byte_buffer_destroy(byteBuffer)
327
376
arena.clear()
328
377
}) {
0 commit comments