@@ -54,7 +54,7 @@ internal class NativeClientCall<Request, Response>(
54
54
private var closed = atomic(false )
55
55
56
56
// tracks how many operations are in flight (not yet completed by the listener).
57
- // if 0, there are no more operations (except for the RECV_STATUS_ON_CLIENT op) .
57
+ // if 0 and we got a closeInfo (containing the status), there are no more ongoing operations .
58
58
// in this case, we can safely call onClose on the listener.
59
59
// we need this mechanism to ensure that onClose is not called while any other callback is still running
60
60
// on the listener.
@@ -64,7 +64,7 @@ internal class NativeClientCall<Request, Response>(
64
64
// if null, the call is still in progress. otherwise, the call can be closed as soon as inFlight is 0.
65
65
private val closeInfo = atomic<Pair <Status , GrpcTrailers >? > (null )
66
66
67
- // we currently don't buffer messages, so after one `sendMessage` call, ready turns false.
67
+ // we currently don't buffer messages, so after one `sendMessage` call, ready turns false. (KRPC-192)
68
68
private val ready = atomic(true )
69
69
70
70
/* *
@@ -81,11 +81,11 @@ internal class NativeClientCall<Request, Response>(
81
81
* AND the corresponding listener callback returned.
82
82
*
83
83
* If the counter reaches 0, no more listener callbacks are executed, and the call can be closed by
84
- * calling [tryDeliverClose ].
84
+ * calling [tryToCloseCall ].
85
85
*/
86
86
private fun endOp () {
87
87
if (inFlight.decrementAndGet() == 0 ) {
88
- tryDeliverClose ()
88
+ tryToCloseCall ()
89
89
}
90
90
}
91
91
@@ -97,23 +97,23 @@ internal class NativeClientCall<Request, Response>(
97
97
* - If the [inFlight] counter is not 0, this does nothing.
98
98
* - Otherwise, the listener's onClose callback is invoked and the call is closed.
99
99
*/
100
- private fun tryDeliverClose () {
101
- val s = closeInfo.value ? : return
100
+ private fun tryToCloseCall () {
101
+ val info = closeInfo.value ? : return
102
102
if (inFlight.value == 0 && closed.compareAndSet(expect = false , update = true )) {
103
- val lst = checkNotNull(listener) { " Not yet started" }
103
+ val lst = checkNotNull(listener) { internalError( " Not yet started" ) }
104
104
// allows the managed channel to join for the call to finish.
105
105
callJob.complete()
106
- lst.onClose(s .first, s .second)
106
+ lst.onClose(info .first, info .second)
107
107
}
108
108
}
109
109
110
110
/* *
111
- * Sets the [closeInfo] and calls [tryDeliverClose ].
112
- * This is called as soon as the RECV_STATUS_ON_CLIENT batch is finished.
111
+ * Sets the [closeInfo] and calls [tryToCloseCall ].
112
+ * This is called as soon as the RECV_STATUS_ON_CLIENT batch (started with [startRecvStatus]) finished.
113
113
*/
114
114
private fun markClosePending (status : Status , trailers : GrpcTrailers ) {
115
115
if (closeInfo.compareAndSet(null , Pair (status, trailers))) {
116
- tryDeliverClose ()
116
+ tryToCloseCall ()
117
117
}
118
118
}
119
119
@@ -132,8 +132,8 @@ internal class NativeClientCall<Request, Response>(
132
132
responseListener : Listener <Response >,
133
133
headers : GrpcTrailers ,
134
134
) {
135
- check(listener == null ) { " Already started" }
136
- check(! cancelled) { " Already cancelled." }
135
+ check(listener == null ) { internalError( " Already started" ) }
136
+ check(! cancelled) { internalError( " Already cancelled." ) }
137
137
138
138
listener = responseListener
139
139
@@ -164,7 +164,7 @@ internal class NativeClientCall<Request, Response>(
164
164
beginOp()
165
165
166
166
when (val callResult = cq.runBatch(this @NativeClientCall, ops, nOps)) {
167
- is BatchResult .Called -> {
167
+ is BatchResult .Submitted -> {
168
168
callResult.future.onComplete { success ->
169
169
try {
170
170
if (success) {
@@ -184,7 +184,7 @@ internal class NativeClientCall<Request, Response>(
184
184
cancelInternal(grpc_status_code.GRPC_STATUS_UNAVAILABLE , " Channel shutdown" )
185
185
}
186
186
187
- is BatchResult .CallError -> {
187
+ is BatchResult .SubmitError -> {
188
188
cleanup()
189
189
endOp()
190
190
cancelInternal(
@@ -196,7 +196,7 @@ internal class NativeClientCall<Request, Response>(
196
196
}
197
197
198
198
/* *
199
- * Starts a batch operation to receive the status from the completion queue.
199
+ * Starts a batch operation to receive the status from the completion queue (RECV_STATUS_ON_CLIENT) .
200
200
* This operation is bound to the lifetime of the call, so it will finish once all other operations are done.
201
201
* If this operation fails, it will call [markClosePending] with the corresponding error, as the entire call
202
202
* si considered failed.
@@ -206,7 +206,7 @@ internal class NativeClientCall<Request, Response>(
206
206
*/
207
207
@OptIn(ExperimentalStdlibApi ::class )
208
208
private fun startRecvStatus (): Boolean {
209
- checkNotNull(listener) { " Not yet started" }
209
+ checkNotNull(listener) { internalError( " Not yet started" ) }
210
210
val arena = Arena ()
211
211
val statusCode = arena.alloc< grpc_status_code.Var > ()
212
212
val statusDetails = arena.alloc< grpc_slice> ()
@@ -221,7 +221,7 @@ internal class NativeClientCall<Request, Response>(
221
221
}
222
222
223
223
when (val callResult = cq.runBatch(this @NativeClientCall, op.ptr, 1u )) {
224
- is BatchResult .Called -> {
224
+ is BatchResult .Submitted -> {
225
225
callResult.future.onComplete {
226
226
val details = statusDetails.toByteArray().toKString()
227
227
val status = Status (statusCode.value.toKotlin(), details, null )
@@ -244,7 +244,7 @@ internal class NativeClientCall<Request, Response>(
244
244
return false
245
245
}
246
246
247
- is BatchResult .CallError -> {
247
+ is BatchResult .SubmitError -> {
248
248
arena.clear()
249
249
markClosePending(
250
250
Status (StatusCode .INTERNAL , " Failed to start call: ${callResult.error} " ),
@@ -285,9 +285,11 @@ internal class NativeClientCall<Request, Response>(
285
285
* This must only be called again after [numMessages] were received in the [Listener.onMessage] callback.
286
286
*/
287
287
override fun request (numMessages : Int ) {
288
- check(numMessages > 0 ) { " numMessages must be > 0" }
289
- val listener = checkNotNull(listener) { " Not yet started" }
290
- check(! cancelled) { " Already cancelled" }
288
+ check(numMessages > 0 ) { internalError(" numMessages must be > 0" ) }
289
+ // limit numMessages to prevent potential stack overflows
290
+ check(numMessages <= 16 ) { internalError(" numMessages must be <= 16" ) }
291
+ val listener = checkNotNull(listener) { internalError(" Not yet started" ) }
292
+ check(! cancelled) { internalError(" Already cancelled" ) }
291
293
292
294
var remainingMessages = numMessages
293
295
@@ -333,8 +335,8 @@ internal class NativeClientCall<Request, Response>(
333
335
}
334
336
335
337
override fun halfClose () {
336
- check(! halfClosed) { " Already half closed." }
337
- check(! cancelled) { " Already cancelled." }
338
+ check(! halfClosed) { internalError( " Already half closed." ) }
339
+ check(! cancelled) { internalError( " Already cancelled." ) }
338
340
halfClosed = true
339
341
340
342
val arena = Arena ()
@@ -350,10 +352,10 @@ internal class NativeClientCall<Request, Response>(
350
352
override fun isReady (): Boolean = ready.value
351
353
352
354
override fun sendMessage (message : Request ) {
353
- checkNotNull(listener) { " Not yet started" }
354
- check(! halfClosed) { " Already half closed." }
355
- check(! cancelled) { " Already cancelled." }
356
- check(isReady()) { " Not yet ready." }
355
+ checkNotNull(listener) { internalError( " Not yet started" ) }
356
+ check(! halfClosed) { internalError( " Already half closed." ) }
357
+ check(! cancelled) { internalError( " Already cancelled." ) }
358
+ check(isReady()) { internalError( " Not yet ready." ) }
357
359
358
360
// set ready false, as only one message can be sent at a time.
359
361
ready.value = false
@@ -368,14 +370,12 @@ internal class NativeClientCall<Request, Response>(
368
370
}
369
371
370
372
runBatch(op.ptr, 1u , cleanup = {
371
- // no mater what happens, we need to set ready to true again.
372
- turnReady()
373
-
374
373
// actual cleanup
375
374
grpc_byte_buffer_destroy(byteBuffer)
376
375
arena.clear()
377
376
}) {
378
- // Nothing to do here
377
+ // set ready true, as we can now send another message.
378
+ turnReady()
379
379
}
380
380
}
381
381
}
0 commit comments