@@ -16,11 +16,15 @@ import kotlinx.cinterop.CPointerVar
16
16
import kotlinx.cinterop.ExperimentalForeignApi
17
17
import kotlinx.cinterop.IntVar
18
18
import kotlinx.cinterop.alloc
19
+ import kotlinx.cinterop.allocArray
20
+ import kotlinx.cinterop.convert
21
+ import kotlinx.cinterop.get
19
22
import kotlinx.cinterop.ptr
20
- import kotlinx.cinterop.readValue
21
23
import kotlinx.cinterop.value
22
24
import kotlinx.rpc.grpc.GrpcTrailers
23
25
import kotlinx.rpc.grpc.Status
26
+ import kotlinx.rpc.grpc.StatusCode
27
+ import kotlinx.rpc.grpc.StatusException
24
28
import kotlinx.rpc.protobuf.input.stream.asInputStream
25
29
import kotlinx.rpc.protobuf.input.stream.asSource
26
30
import libkgrpc.GRPC_OP_RECV_CLOSE_ON_SERVER
@@ -33,7 +37,6 @@ import libkgrpc.grpc_byte_buffer_destroy
33
37
import libkgrpc.grpc_call_cancel_with_status
34
38
import libkgrpc.grpc_call_unref
35
39
import libkgrpc.grpc_op
36
- import libkgrpc.grpc_slice
37
40
import libkgrpc.grpc_slice_unref
38
41
import libkgrpc.grpc_status_code
39
42
import kotlin.concurrent.Volatile
@@ -66,7 +69,12 @@ internal class NativeServerCall<Request, Response>(
66
69
private var cancelled = false
67
70
private val finalized = atomic(false )
68
71
69
- // Tracks whether at least one request message has been received on this call.
72
+ // tracks whether the initial metadata has been sent.
73
+ // this is used to determine if we have to send the initial metadata
74
+ // when we try to close the call.
75
+ private var sentInitialMetadata = false
76
+
77
+ // tracks whether at least one request message has been received on this call.
70
78
private var receivedFirstMessage = false
71
79
72
80
// we currently don't buffer messages, so after one `sendMessage` call, ready turns false. (KRPC-192)
@@ -245,6 +253,7 @@ internal class NativeServerCall<Request, Response>(
245
253
data.send_initial_metadata.metadata = null
246
254
}
247
255
256
+ sentInitialMetadata = true
248
257
runBatch(op.ptr, 1u , cleanup = { arena.clear() }) {
249
258
// nothing to do here
250
259
}
@@ -256,42 +265,52 @@ internal class NativeServerCall<Request, Response>(
256
265
val methodDescriptor = checkNotNull(methodDescriptor) { internalError(" Method descriptor not set" ) }
257
266
258
267
val arena = Arena ()
259
- val inputStream = methodDescriptor.getResponseMarshaller().stream(message)
260
- val byteBuffer = inputStream.asSource().toGrpcByteBuffer()
261
- ready.value = false
262
-
263
- val op = arena.alloc< grpc_op> {
264
- op = GRPC_OP_SEND_MESSAGE
265
- data.send_message.send_message = byteBuffer
266
- }
268
+ tryRun {
269
+ val inputStream = methodDescriptor.getResponseMarshaller().stream(message)
270
+ val byteBuffer = inputStream.asSource().toGrpcByteBuffer()
271
+ ready.value = false
272
+
273
+ val op = arena.alloc< grpc_op> {
274
+ op = GRPC_OP_SEND_MESSAGE
275
+ data.send_message.send_message = byteBuffer
276
+ }
267
277
268
- runBatch(op.ptr, 1u , cleanup = {
269
- arena.clear()
270
- grpc_byte_buffer_destroy(byteBuffer)
271
- }) {
272
- turnReady()
278
+ runBatch(op.ptr, 1u , cleanup = {
279
+ arena.clear()
280
+ grpc_byte_buffer_destroy(byteBuffer)
281
+ }) {
282
+ turnReady()
283
+ }
273
284
}
274
285
}
275
286
276
287
override fun close (status : Status , trailers : GrpcTrailers ) {
277
288
check(initialized) { internalError(" Call not initialized" ) }
278
289
val arena = Arena ()
279
290
280
- val details = status.getDescription()?.let {
281
- arena.alloc< grpc_slice> {
282
- it.toGrpcSlice()
283
- }
284
- }
285
- val op = arena.alloc< grpc_op> {
286
- op = GRPC_OP_SEND_STATUS_FROM_SERVER
287
- data.send_status_from_server.status = status.statusCode.toRawCallAllocation()
288
- data.send_status_from_server.status_details = details?.ptr
289
- data.send_status_from_server.trailing_metadata_count = 0u
290
- data.send_status_from_server.trailing_metadata = null
291
+ val details = status.getDescription()?.toGrpcSlice()
292
+ val detailsPtr = details?.getPointer(arena)
293
+
294
+ val nOps = if (sentInitialMetadata) 1uL else 2uL
295
+
296
+ val ops = arena.allocArray< grpc_op> (nOps.convert())
297
+
298
+ ops[0 ].op = GRPC_OP_SEND_STATUS_FROM_SERVER
299
+ ops[0 ].data.send_status_from_server.status = status.statusCode.toRaw()
300
+ ops[0 ].data.send_status_from_server.status_details = detailsPtr
301
+ ops[0 ].data.send_status_from_server.trailing_metadata_count = 0u
302
+ ops[0 ].data.send_status_from_server.trailing_metadata = null
303
+
304
+ if (! sentInitialMetadata) {
305
+ // if we haven't sent GRPC_OP_SEND_INITIAL_METADATA yet,
306
+ // so we must do it together with the close operation.
307
+ ops[1 ].op = GRPC_OP_SEND_INITIAL_METADATA
308
+ ops[1 ].data.send_initial_metadata.count = 0u
309
+ ops[1 ].data.send_initial_metadata.metadata = null
291
310
}
292
311
293
- runBatch(op.ptr, 1u , cleanup = {
294
- if (details != null ) grpc_slice_unref(details.readValue() )
312
+ runBatch(ops, nOps , cleanup = {
313
+ if (details != null ) grpc_slice_unref(details)
295
314
arena.clear()
296
315
}) {
297
316
// nothing to do here
@@ -306,6 +325,25 @@ internal class NativeServerCall<Request, Response>(
306
325
val methodDescriptor = checkNotNull(methodDescriptor) { internalError(" Method descriptor not set" ) }
307
326
return methodDescriptor
308
327
}
328
+
329
+
330
+ private fun <T > tryRun (block : () -> T ): T {
331
+ try {
332
+ return block()
333
+ } catch (e: Throwable ) {
334
+ // TODO: Log internal error as warning
335
+ val status = when (e) {
336
+ is StatusException -> e.getStatus()
337
+ else -> Status (
338
+ StatusCode .INTERNAL ,
339
+ description = " Internal error, so canceling the stream" ,
340
+ cause = e
341
+ )
342
+ }
343
+ cancel(status.statusCode.toRaw(), status.getDescription() ? : " Unknown error" )
344
+ throw StatusException (status, trailers = null )
345
+ }
346
+ }
309
347
}
310
348
311
349
/* *
0 commit comments