Skip to content

Commit 76b381d

Browse files
authored
grpc: Add Call Metadata (Headers and Trailers) (#517)
1 parent 1d2a412 commit 76b381d

File tree

13 files changed

+2061
-59
lines changed

13 files changed

+2061
-59
lines changed

cinterop-c/include/kgrpc.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ void kgrpc_server_set_batch_method_allocator(
8989
kgrpc_batch_call_allocator allocator
9090
);
9191

92+
/**
93+
* Append an grpc_metadata entry to the given grpc_metadata_array.
94+
*
95+
* @return false if the array has not enough capacity, true otherwise.
96+
*/
97+
bool kgrpc_metadata_array_append(grpc_metadata_array *array, grpc_slice key, grpc_slice value);
9298

9399
#ifdef __cplusplus
94100
}

cinterop-c/src/kgrpc.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <kgrpc.h>
44
#include "src/core/lib/iomgr/iomgr.h"
55
#include "src/core/server/server.h"
6+
#include "grpc/support/string_util.h"
67

78
extern "C" {
89

@@ -53,6 +54,18 @@ void kgrpc_server_set_batch_method_allocator(
5354
});
5455
}
5556

57+
bool kgrpc_metadata_array_append(grpc_metadata_array *array, grpc_slice key, grpc_slice value) {
58+
if (array->capacity - array->count <= 0) {
59+
return false;
60+
}
61+
grpc_metadata entry = {
62+
.key = key,
63+
.value = value
64+
};
65+
array->metadata[array->count++] = entry;
66+
return true;
67+
}
68+
5669
}
5770

5871

grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/suspendClientCalls.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@ import kotlinx.coroutines.flow.flowOf
1717
import kotlinx.coroutines.flow.single
1818
import kotlinx.coroutines.launch
1919
import kotlinx.coroutines.withContext
20-
import kotlinx.rpc.grpc.client.ClientCallScope
21-
import kotlinx.rpc.grpc.client.GrpcClient
2220
import kotlinx.rpc.grpc.GrpcMetadata
2321
import kotlinx.rpc.grpc.Status
2422
import kotlinx.rpc.grpc.StatusCode
2523
import kotlinx.rpc.grpc.StatusException
26-
import kotlinx.rpc.grpc.internal.CallbackFuture
24+
import kotlinx.rpc.grpc.client.ClientCallScope
25+
import kotlinx.rpc.grpc.client.GrpcClient
2726
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
2827
import kotlinx.rpc.grpc.descriptor.MethodType
29-
import kotlinx.rpc.grpc.internal.Ready
3028
import kotlinx.rpc.grpc.descriptor.methodType
29+
import kotlinx.rpc.grpc.internal.CallbackFuture
30+
import kotlinx.rpc.grpc.internal.Ready
3131
import kotlinx.rpc.grpc.internal.singleOrStatus
3232
import kotlinx.rpc.grpc.statusCode
3333
import kotlinx.rpc.internal.utils.InternalRpcApi

grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeClientCall.kt

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ import kotlinx.coroutines.CompletableJob
2626
import kotlinx.rpc.grpc.GrpcMetadata
2727
import kotlinx.rpc.grpc.Status
2828
import kotlinx.rpc.grpc.StatusCode
29+
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
2930
import kotlinx.rpc.grpc.internal.BatchResult
3031
import kotlinx.rpc.grpc.internal.CompletionQueue
31-
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
32+
import kotlinx.rpc.grpc.internal.destroyEntries
3233
import kotlinx.rpc.grpc.internal.internalError
3334
import kotlinx.rpc.grpc.internal.toByteArray
3435
import kotlinx.rpc.grpc.internal.toGrpcByteBuffer
@@ -182,7 +183,7 @@ internal class NativeClientCall<Request, Response>(
182183
if (!success) return
183184

184185
// send and receive initial headers to/from the server
185-
sendAndReceiveInitialMetadata()
186+
sendAndReceiveInitialMetadata(headers)
186187
}
187188

188189
/**
@@ -251,13 +252,16 @@ internal class NativeClientCall<Request, Response>(
251252
val statusCode = arena.alloc<grpc_status_code.Var>()
252253
val statusDetails = arena.alloc<grpc_slice>()
253254
val errorStr = arena.alloc<CPointerVar<ByteVar>>()
255+
256+
val trailingMetadata = arena.alloc<grpc_metadata_array>()
257+
grpc_metadata_array_init(trailingMetadata.ptr)
258+
254259
val op = arena.alloc<grpc_op> {
255260
op = GRPC_OP_RECV_STATUS_ON_CLIENT
256261
data.recv_status_on_client.status = statusCode.ptr
257262
data.recv_status_on_client.status_details = statusDetails.ptr
258263
data.recv_status_on_client.error_string = errorStr.ptr
259-
// TODO: trailing metadata
260-
data.recv_status_on_client.trailing_metadata = null
264+
data.recv_status_on_client.trailing_metadata = trailingMetadata.ptr
261265
}
262266

263267
when (val callResult = cq.runBatch(this@NativeClientCall.raw, op.ptr, 1u)) {
@@ -266,11 +270,13 @@ internal class NativeClientCall<Request, Response>(
266270
val details = statusDetails.toByteArray().toKString()
267271
val kStatusCode = statusCode.value.toKotlin()
268272
val status = Status(kStatusCode, details, null)
269-
val trailers = GrpcMetadata()
273+
val trailers = GrpcMetadata(trailingMetadata)
270274

271275
// cleanup
272276
grpc_slice_unref(statusDetails.readValue())
273277
if (errorStr.value != null) gpr_free(errorStr.value)
278+
// the entries are owned by the call object, so we must only destroy the array
279+
grpc_metadata_array_destroy(trailingMetadata.readValue())
274280
arena.clear()
275281

276282
// set close info and try to close the call.
@@ -296,29 +302,37 @@ internal class NativeClientCall<Request, Response>(
296302
}
297303
}
298304

299-
private fun sendAndReceiveInitialMetadata() {
305+
private fun sendAndReceiveInitialMetadata(headers: GrpcMetadata) {
300306
// sending and receiving initial metadata
301307
val arena = Arena()
302308
val opsNum = 2uL
303309
val ops = arena.allocArray<grpc_op>(opsNum.convert())
304310

311+
// turn given headers into a grpc_metadata_array.
312+
val sendInitialMetadata: grpc_metadata_array = with(headers) {
313+
arena.allocRawGrpcMetadata()
314+
}
315+
305316
// send initial meta data to server
306-
// TODO: initial metadata
307317
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA
308-
ops[0].data.send_initial_metadata.count = 0u
318+
ops[0].data.send_initial_metadata.count = sendInitialMetadata.count
319+
ops[0].data.send_initial_metadata.metadata = sendInitialMetadata.metadata
309320

310-
val meta = arena.alloc<grpc_metadata_array>()
311-
// TODO: make metadata array an object (for lifecycle management)
312-
grpc_metadata_array_init(meta.ptr)
321+
val recvInitialMetadata = arena.alloc<grpc_metadata_array>()
322+
grpc_metadata_array_init(recvInitialMetadata.ptr)
313323
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA
314-
ops[1].data.recv_initial_metadata.recv_initial_metadata = meta.ptr
324+
ops[1].data.recv_initial_metadata.recv_initial_metadata = recvInitialMetadata.ptr
315325

316326
runBatch(ops, opsNum, cleanup = {
317-
grpc_metadata_array_destroy(meta.ptr)
327+
// we must not destroy the array itself, as it is cleared when clearing the arena.
328+
sendInitialMetadata.destroyEntries()
329+
// the entries are owned by the call object, so we must only destroy the array
330+
grpc_metadata_array_destroy(recvInitialMetadata.readValue())
318331
arena.clear()
319332
}) {
333+
val headers = GrpcMetadata(recvInitialMetadata)
320334
safeUserCode("Failed to call onHeaders.") {
321-
listener?.onHeaders(GrpcMetadata())
335+
listener?.onHeaders(headers)
322336
}
323337
}
324338
}
@@ -447,4 +461,3 @@ internal class NativeClientCall<Request, Response>(
447461
}
448462
}
449463

450-

0 commit comments

Comments
 (0)