Skip to content

Commit 565ea07

Browse files
authored
grpc-native: Add server implementation (#469)
* grpc-native: Add server support on native Signed-off-by: Johannes Zottele <[email protected]> * grpc-native: Working native server implementation Signed-off-by: Johannes Zottele <[email protected]> * grpc-native: Fixing memory leaks Signed-off-by: Johannes Zottele <[email protected]> * grpc-native: Test fix Signed-off-by: Johannes Zottele <[email protected]> * grpc-native: Fix halfClose before message received behavior Signed-off-by: Johannes Zottele <[email protected]> * grpc-native: Fix free on error Signed-off-by: Johannes Zottele <[email protected]> * grpc-native: Code cleanup Signed-off-by: Johannes Zottele <[email protected]> * grpc-native: Code cleanup Signed-off-by: Johannes Zottele <[email protected]> * grpc-native: Code cleanup Signed-off-by: Johannes Zottele <[email protected]> * grpc-native: Address PR comments Signed-off-by: Johannes Zottele <[email protected]> --------- Signed-off-by: Johannes Zottele <[email protected]>
1 parent c7f445b commit 565ea07

21 files changed

+1260
-86
lines changed

cinterop-c/MODULE.bazel.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cinterop-c/include/kgrpc.h

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,74 @@ typedef struct {
2424
} kgrpc_cb_tag;
2525

2626

27-
/*
28-
* Call to grpc_iomgr_run_in_background(), which is not exposed as extern "C" and therefore must be wrapped.
27+
// This is a duplicate of the RegisteredCallAllocation, which is defined in
28+
// https://github.com/grpc/grpc/blob/893bdadd56dbb75fb156175afdaa2b0d47e1c15b/src/core/server/server.h#L150-L157.
29+
// This is required, as RegisteredCallAllocation is not part of the exposed C API.
30+
typedef struct {
31+
void *tag;
32+
grpc_call **call;
33+
grpc_metadata_array *initial_metadata;
34+
gpr_timespec *deadline;
35+
grpc_byte_buffer **optional_payload;
36+
grpc_completion_queue *cq;
37+
} kgrpc_registered_call_allocation;
38+
39+
typedef struct {
40+
void* tag;
41+
grpc_call** call;
42+
grpc_metadata_array* initial_metadata;
43+
grpc_call_details* details;
44+
grpc_completion_queue* cq;
45+
} kgrpc_batch_call_allocation;
46+
47+
typedef kgrpc_registered_call_allocation (*kgrpc_registered_call_allocator)(void* ctx);
48+
typedef kgrpc_batch_call_allocation (*kgrpc_batch_call_allocator)(void* ctx);
49+
50+
/*
51+
* Call to grpc_iomgr_run_in_background(), which is not exposed as extern "C" and therefore must be wrapped.
52+
*/
53+
bool kgrpc_iomgr_run_in_background();
54+
55+
/**
56+
* Registers a C-style allocator callback for accepting gRPC calls to a specific method.
57+
*
58+
* Wraps the internal C++ API `Server::SetRegisteredMethodAllocator()` to enable
59+
* callback-driven method dispatch via the Core C API.
60+
* If the C++ API is exposed to the C API, this can be removed (https://github.com/grpc/grpc/issues/40632).
61+
*
62+
* When the gRPC Core needs to accept a new call for the specified method, it invokes:
63+
* kgrpc_registered_call_allocation alloc = allocator();
64+
* to retrieve the accept context, including `tag`, `grpc_call*`, metadata, deadline,
65+
* optional payload, and the completion queue.
66+
*
67+
* @param server The gRPC C `grpc_server*` instance.
68+
* @param cq A callback-style `grpc_completion_queue*` (must be registered earlier).
69+
* @param method_tag Opaque identifier from `grpc_server_register_method()` for the RPC method.
70+
* @param allocator_ctx The context for the callback to pass all necessary objects to the static function.
71+
* @param allocator Function providing new accept contexts (`kgrpc_registered_call_allocation`).
2972
*/
30-
bool kgrpc_iomgr_run_in_background();
73+
void kgrpc_server_set_register_method_allocator(
74+
grpc_server *server,
75+
grpc_completion_queue *cq,
76+
void *method_tag,
77+
void *allocator_ctx,
78+
kgrpc_registered_call_allocator allocator
79+
);
80+
81+
/**
82+
* Like kgrpc_server_set_register_method_allocator but instead of registered methods,
83+
* it sets an allocation callback for unknown method calls.
84+
*/
85+
void kgrpc_server_set_batch_method_allocator(
86+
grpc_server *server,
87+
grpc_completion_queue *cq,
88+
void *allocator_ctx,
89+
kgrpc_batch_call_allocator allocator
90+
);
91+
3192

3293
#ifdef __cplusplus
33-
}
94+
}
3495
#endif
3596

3697
#endif //GRPCPP_C_H

cinterop-c/src/kgrpc.cpp

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,56 @@
22

33
#include <kgrpc.h>
44
#include "src/core/lib/iomgr/iomgr.h"
5+
#include "src/core/server/server.h"
56

67
extern "C" {
78

8-
bool kgrpc_iomgr_run_in_background() {
9-
return grpc_iomgr_run_in_background();
10-
}
9+
bool kgrpc_iomgr_run_in_background() {
10+
return grpc_iomgr_run_in_background();
11+
}
12+
13+
void kgrpc_server_set_register_method_allocator(
14+
grpc_server *server,
15+
grpc_completion_queue *cq,
16+
void *method_tag,
17+
void *allocator_ctx,
18+
kgrpc_registered_call_allocator allocator
19+
) {
20+
grpc_core::Server::FromC(server)->SetRegisteredMethodAllocator(
21+
cq,
22+
method_tag,
23+
[allocator_ctx, allocator] {
24+
auto result = allocator(allocator_ctx);
25+
return grpc_core::Server::RegisteredCallAllocation{
26+
.tag = result.tag,
27+
.call = result.call,
28+
.initial_metadata = result.initial_metadata,
29+
.deadline = result.deadline,
30+
.optional_payload = result.optional_payload,
31+
.cq = result.cq,
32+
};
33+
});
34+
}
35+
36+
void kgrpc_server_set_batch_method_allocator(
37+
grpc_server *server,
38+
grpc_completion_queue *cq,
39+
void *allocator_ctx,
40+
kgrpc_batch_call_allocator allocator
41+
) {
42+
grpc_core::Server::FromC(server)->SetBatchMethodAllocator(
43+
cq,
44+
[allocator_ctx, allocator] {
45+
auto result = allocator(allocator_ctx);
46+
return grpc_core::Server::BatchCallAllocation{
47+
.tag = result.tag,
48+
.call = result.call,
49+
.initial_metadata = result.initial_metadata,
50+
.details = result.details,
51+
.cq = result.cq,
52+
};
53+
});
54+
}
1155

1256
}
1357

cinterop-c/tools/collect_headers.bzl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,8 @@ include_dir = rule(
8484

8585
def _cc_headers_only_impl(ctx):
8686
dep_cc = ctx.attr.dep[CcInfo].compilation_context
87-
88-
# keep only source headers; this skips generated headers and their actions.
89-
all_hdrs = dep_cc.headers.to_list()
90-
src_hdrs = [f for f in all_hdrs if getattr(f, "is_source", False)]
9187
cc_ctx = cc_common.create_compilation_context(
92-
headers = depset(src_hdrs),
88+
headers = dep_cc.headers,
9389
includes = dep_cc.includes,
9490
quote_includes = dep_cc.quote_includes,
9591
system_includes = dep_cc.system_includes,

grpc/grpc-core/build.gradle.kts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,18 @@ kotlin {
100100
extraOpts("-libraryPath", "$cLibOutDir")
101101
}
102102
}
103+
104+
// configures linkReleaseTest task to build and link the test binary in RELEASE mode.
105+
// this can be useful for performance analysis.
106+
targets.withType<org.jetbrains.kotlin.gradle.plugin.mpp.KotlinNativeTarget>().configureEach {
107+
binaries {
108+
test(
109+
buildTypes = listOf(
110+
org.jetbrains.kotlin.gradle.plugin.mpp.NativeBuildType.RELEASE
111+
)
112+
)
113+
}
114+
}
103115
}
104116

105117
configureLocalProtocGenDevelopmentDependency()

grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/ServerServiceDefinition.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ import kotlinx.rpc.internal.utils.InternalRpcApi
1616
public expect class ServerServiceDefinition {
1717
public fun getServiceDescriptor(): ServiceDescriptor
1818
public fun getMethods(): Collection<ServerMethodDefinition<*, *>>
19+
20+
public fun getMethod(methodName: String): ServerMethodDefinition<*, *>?
1921
}
2022

2123
@InternalRpcApi
2224
public expect fun serverServiceDefinition(
2325
serviceDescriptor: ServiceDescriptor,
24-
methods: Collection<ServerMethodDefinition<*, *>>
26+
methods: Collection<ServerMethodDefinition<*, *>>,
2527
): ServerServiceDefinition

grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,25 @@ import kotlinx.coroutines.delay
88
import kotlinx.coroutines.runBlocking
99
import kotlinx.coroutines.test.runTest
1010
import kotlinx.coroutines.withTimeout
11-
import kotlinx.rpc.grpc.*
12-
import kotlinx.rpc.grpc.internal.*
11+
import kotlinx.rpc.grpc.GrpcServer
12+
import kotlinx.rpc.grpc.GrpcTrailers
13+
import kotlinx.rpc.grpc.ManagedChannel
14+
import kotlinx.rpc.grpc.ManagedChannelBuilder
15+
import kotlinx.rpc.grpc.Status
16+
import kotlinx.rpc.grpc.StatusCode
17+
import kotlinx.rpc.grpc.buildChannel
18+
import kotlinx.rpc.grpc.internal.ClientCall
19+
import kotlinx.rpc.grpc.internal.GrpcDefaultCallOptions
20+
import kotlinx.rpc.grpc.internal.MethodDescriptor
21+
import kotlinx.rpc.grpc.internal.MethodType
22+
import kotlinx.rpc.grpc.internal.clientCallListener
23+
import kotlinx.rpc.grpc.internal.methodDescriptor
24+
import kotlinx.rpc.grpc.statusCode
1325
import kotlinx.rpc.registerService
1426
import kotlin.test.Test
1527
import kotlin.test.assertEquals
1628
import kotlin.test.assertFails
1729
import kotlin.test.assertFailsWith
18-
import kotlin.time.Duration
1930

2031
private const val PORT = 50051
2132

@@ -51,8 +62,12 @@ class GrpcCoreClientTest {
5162
this.timeout = timeout
5263
}
5364

54-
private fun shutdownAndWait(channel: ManagedChannel) {
55-
channel.shutdown()
65+
private fun shutdownAndWait(channel: ManagedChannel, now: Boolean = false) {
66+
if (now) {
67+
channel.shutdownNow()
68+
} else {
69+
channel.shutdown()
70+
}
5671
runBlocking { channel.awaitTermination() }
5772
}
5873

@@ -180,10 +195,7 @@ class GrpcCoreClientTest {
180195
fun halfCloseBeforeSendingMessage_errorWithoutCrashing() {
181196
val channel = createChannel()
182197
val call = channel.newHelloCall()
183-
val statusDeferred = CompletableDeferred<Status>()
184-
val listener = createClientCallListener<HelloReply>(
185-
onClose = { status, _ -> statusDeferred.complete(status) }
186-
)
198+
val listener = createClientCallListener<HelloReply>()
187199
assertFailsWith<IllegalStateException> {
188200
try {
189201
call.start(listener, GrpcTrailers())
@@ -259,7 +271,7 @@ class GreeterServiceImpl : GreeterService {
259271
* Run this on JVM before executing tests.
260272
*/
261273
@Test
262-
fun runServer() = runTest(timeout = Duration.INFINITE) {
274+
fun runServer() = runTest {
263275
val server = GrpcServer(
264276
port = PORT,
265277
builder = { registerService<GreeterService> { GreeterServiceImpl() } }
@@ -272,6 +284,7 @@ class GreeterServiceImpl : GreeterService {
272284
} finally {
273285
server.shutdown()
274286
server.awaitTermination()
287+
275288
}
276289
}
277290

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/ManagedChannel.native.kt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@
66

77
package kotlinx.rpc.grpc
88

9-
import kotlinx.rpc.grpc.internal.*
9+
import kotlinx.rpc.grpc.internal.GrpcChannel
10+
import kotlinx.rpc.grpc.internal.GrpcChannelCredentials
11+
import kotlinx.rpc.grpc.internal.GrpcInsecureChannelCredentials
12+
import kotlinx.rpc.grpc.internal.NativeManagedChannel
13+
import kotlinx.rpc.grpc.internal.internalError
1014

1115
/**
1216
* Same as [ManagedChannel], but is platform-exposed.
@@ -25,10 +29,10 @@ public actual abstract class ManagedChannelBuilder<T : ManagedChannelBuilder<T>>
2529
internal class NativeManagedChannelBuilder(
2630
private val target: String,
2731
) : ManagedChannelBuilder<NativeManagedChannelBuilder>() {
28-
private var credentials: GrpcCredentials? = null
32+
private var credentials: GrpcChannelCredentials? = null
2933

3034
override fun usePlaintext(): NativeManagedChannelBuilder {
31-
credentials = GrpcInsecureCredentials()
35+
credentials = GrpcInsecureChannelCredentials()
3236
return this
3337
}
3438

grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/MutableHandlerRegistry.native.kt

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,66 @@
66

77
package kotlinx.rpc.grpc
88

9+
import kotlinx.rpc.grpc.internal.MethodDescriptor
10+
import kotlinx.rpc.grpc.internal.ServerMethodDefinition
11+
import kotlinx.rpc.internal.utils.map.RpcInternalConcurrentHashMap
12+
913
/**
1014
* Registry of services and their methods used by servers to dispatching incoming calls.
1115
*/
12-
public actual abstract class HandlerRegistry
16+
public actual abstract class HandlerRegistry {
17+
/**
18+
* Returns the [ServerServiceDefinition]s provided by the registry, or an empty list if not
19+
* supported by the implementation.
20+
*/
21+
public abstract fun getServices(): List<ServerServiceDefinition>
22+
23+
/**
24+
* Lookup a [ServerMethodDefinition] by its fully-qualified name.
25+
*
26+
* @param methodName to lookup [ServerMethodDefinition] for.
27+
* @param authority the authority for the desired method (to do virtual hosting). If `null`
28+
* the first matching method will be returned.
29+
* @return the resolved method or `null` if no method for that name exists.
30+
*/
31+
public abstract fun lookupMethod(
32+
methodName: String, authority: String?,
33+
): ServerMethodDefinition<*, *>?
34+
35+
/**
36+
* Lookup a [ServerMethodDefinition] by its fully-qualified name.
37+
*
38+
* @param methodName to lookup [ServerMethodDefinition] for.
39+
* @return the resolved method or `null` if no method for that name exists.
40+
*/
41+
public fun lookupMethod(methodName: String): ServerMethodDefinition<*, *>? {
42+
return lookupMethod(methodName, null)
43+
}
44+
45+
}
1346

1447
internal actual class MutableHandlerRegistry : HandlerRegistry() {
48+
49+
private val services = RpcInternalConcurrentHashMap<String, ServerServiceDefinition>()
50+
1551
actual fun addService(service: ServerServiceDefinition): ServerServiceDefinition? {
16-
error("Native target is not supported in gRPC")
52+
return services.put(service.getServiceDescriptor().getName(), service)
1753
}
1854

1955
actual fun removeService(service: ServerServiceDefinition): Boolean {
20-
error("Native target is not supported in gRPC")
56+
return services.remove(service.getServiceDescriptor().getName()) != null
57+
}
58+
59+
override fun getServices(): List<ServerServiceDefinition> {
60+
return services.values.toList()
61+
}
62+
63+
override fun lookupMethod(
64+
methodName: String,
65+
authority: String?,
66+
): ServerMethodDefinition<*, *>? {
67+
val serviceName = MethodDescriptor.extractFullServiceName(methodName) ?: return null
68+
val service = services[serviceName] ?: return null
69+
return service.getMethod(methodName)
2170
}
2271
}

0 commit comments

Comments
 (0)