1
1
/*
2
2
* Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3
3
*/
4
- @file:OptIn( ExperimentalForeignApi :: class , ExperimentalStdlibApi :: class , ExperimentalNativeApi :: class )
4
+ package kotlinx.rpc.grpc.test
5
5
6
- package kotlinx.rpc.grpc.internal
7
-
8
- import HelloReply
9
- import HelloReplyInternal
10
- import HelloRequest
11
- import HelloRequestInternal
12
- import invoke
13
- import kotlinx.cinterop.ExperimentalForeignApi
14
6
import kotlinx.coroutines.CompletableDeferred
15
7
import kotlinx.coroutines.delay
16
8
import kotlinx.coroutines.runBlocking
9
+ import kotlinx.coroutines.test.runTest
17
10
import kotlinx.coroutines.withTimeout
18
11
import kotlinx.rpc.grpc.*
19
- import kotlin.experimental.ExperimentalNativeApi
12
+ import kotlinx.rpc.grpc.internal.*
13
+ import kotlinx.rpc.registerService
20
14
import kotlin.test.Test
21
15
import kotlin.test.assertEquals
16
+ import kotlin.test.assertFails
22
17
import kotlin.test.assertFailsWith
23
- import kotlin.test.assertTrue
18
+ import kotlin.time.Duration
24
19
20
+ private const val PORT = 50051
25
21
22
+ /* *
23
+ * Client tests that use lower level API directly to test that it behaves correctly.
24
+ * Before executing the tests run [GreeterServiceImpl.runServer] on JVM.
25
+ */
26
26
// TODO: Start external service server automatically (KRPC-208)
27
- class GrpcCoreTest {
27
+ class GrpcCoreClientTest {
28
28
29
- private fun descriptorFor (fullName : String = "helloworld. Greeter /SayHello "): MethodDescriptor <HelloRequest , HelloReply > =
29
+ private fun descriptorFor (fullName : String = "kotlinx.rpc.grpc.test. GreeterService /SayHello "): MethodDescriptor <HelloRequest , HelloReply > =
30
30
methodDescriptor(
31
31
fullMethodName = fullName,
32
32
requestCodec = HelloRequestInternal .CODEC ,
@@ -38,10 +38,10 @@ class GrpcCoreTest {
38
38
sampledToLocalTracing = true ,
39
39
)
40
40
41
- private fun ManagedChannel.newHelloCall (fullName : String = "helloworld. Greeter /SayHello "): ClientCall <HelloRequest , HelloReply > =
42
- platformApi.newCall(descriptorFor(fullName), GrpcCallOptions () )
41
+ private fun ManagedChannel.newHelloCall (fullName : String = "kotlinx.rpc.grpc.test. GreeterService /SayHello "): ClientCall <HelloRequest , HelloReply > =
42
+ platformApi.newCall(descriptorFor(fullName), GrpcDefaultCallOptions )
43
43
44
- private fun createChannel (): ManagedChannel = ManagedChannelBuilder (" localhost:50051 " )
44
+ private fun createChannel (): ManagedChannel = ManagedChannelBuilder (" localhost:$PORT " )
45
45
.usePlaintext()
46
46
.buildChannel()
47
47
@@ -64,15 +64,10 @@ class GrpcCoreTest {
64
64
65
65
val statusDeferred = CompletableDeferred <Status >()
66
66
val replyDeferred = CompletableDeferred <HelloReply >()
67
- val listener = object : ClientCall .Listener <HelloReply >() {
68
- override fun onMessage (message : HelloReply ) {
69
- replyDeferred.complete(message)
70
- }
71
-
72
- override fun onClose (status : Status , trailers : GrpcTrailers ) {
73
- statusDeferred.complete(status)
74
- }
75
- }
67
+ val listener = createClientCallListener<HelloReply >(
68
+ onMessage = { replyDeferred.complete(it) },
69
+ onClose = { status, _ -> statusDeferred.complete(status) }
70
+ )
76
71
77
72
call.start(listener, GrpcTrailers ())
78
73
call.sendMessage(req)
@@ -82,41 +77,22 @@ class GrpcCoreTest {
82
77
runBlocking {
83
78
withTimeout(10000 ) {
84
79
val status = statusDeferred.await()
85
- val reply = replyDeferred.await()
86
80
assertEquals(StatusCode .OK , status.statusCode)
81
+ val reply = replyDeferred.await()
87
82
assertEquals(" Hello world" , reply.message)
88
83
}
89
84
}
90
85
shutdownAndWait(channel)
91
86
}
92
87
93
- @Test
94
- fun sendMessage_beforeStart_throws () {
95
- val channel = createChannel()
96
- val call = channel.newHelloCall()
97
- val req = helloReq()
98
- assertFailsWith<IllegalStateException > { call.sendMessage(req) }
99
- shutdownAndWait(channel)
100
- }
101
-
102
- @Test
103
- fun request_beforeStart_throws () {
104
- val channel = createChannel()
105
- val call = channel.newHelloCall()
106
- assertFailsWith<IllegalStateException > { call.request(1 ) }
107
- shutdownAndWait(channel)
108
- }
109
-
110
88
@Test
111
89
fun start_twice_throws () {
112
90
val channel = createChannel()
113
91
val call = channel.newHelloCall()
114
92
val statusDeferred = CompletableDeferred <Status >()
115
- val listener = object : ClientCall .Listener <HelloReply >() {
116
- override fun onClose (status : Status , trailers : GrpcTrailers ) {
117
- statusDeferred.complete(status)
118
- }
119
- }
93
+ val listener = createClientCallListener<HelloReply >(
94
+ onClose = { status, _ -> statusDeferred.complete(status) }
95
+ )
120
96
call.start(listener, GrpcTrailers ())
121
97
assertFailsWith<IllegalStateException > { call.start(listener, GrpcTrailers ()) }
122
98
// cancel to finish the call quickly
@@ -131,11 +107,9 @@ class GrpcCoreTest {
131
107
val call = channel.newHelloCall()
132
108
val req = helloReq()
133
109
val statusDeferred = CompletableDeferred <Status >()
134
- val listener = object : ClientCall .Listener <HelloReply >() {
135
- override fun onClose (status : Status , trailers : GrpcTrailers ) {
136
- statusDeferred.complete(status)
137
- }
138
- }
110
+ val listener = createClientCallListener<HelloReply >(
111
+ onClose = { status, _ -> statusDeferred.complete(status) }
112
+ )
139
113
call.start(listener, GrpcTrailers ())
140
114
call.halfClose()
141
115
assertFailsWith<IllegalStateException > { call.sendMessage(req) }
@@ -146,17 +120,15 @@ class GrpcCoreTest {
146
120
}
147
121
148
122
@Test
149
- fun request_zero_throws () {
123
+ fun request_negative_throws () {
150
124
val channel = createChannel()
151
125
val call = channel.newHelloCall()
152
126
val statusDeferred = CompletableDeferred <Status >()
153
- val listener = object : ClientCall .Listener <HelloReply >() {
154
- override fun onClose (status : Status , trailers : GrpcTrailers ) {
155
- statusDeferred.complete(status)
156
- }
157
- }
127
+ val listener = createClientCallListener<HelloReply >(
128
+ onClose = { status, _ -> statusDeferred.complete(status) }
129
+ )
158
130
call.start(listener, GrpcTrailers ())
159
- assertFailsWith< IllegalStateException > { call.request(0 ) }
131
+ assertFails { call.request(- 1 ) }
160
132
call.cancel(" cleanup" , null )
161
133
runBlocking { withTimeout(5000 ) { statusDeferred.await() } }
162
134
shutdownAndWait(channel)
@@ -167,11 +139,9 @@ class GrpcCoreTest {
167
139
val channel = createChannel()
168
140
val call = channel.newHelloCall()
169
141
val statusDeferred = CompletableDeferred <Status >()
170
- val listener = object : ClientCall .Listener <HelloReply >() {
171
- override fun onClose (status : Status , trailers : GrpcTrailers ) {
172
- statusDeferred.complete(status)
173
- }
174
- }
142
+ val listener = createClientCallListener<HelloReply >(
143
+ onClose = { status, _ -> statusDeferred.complete(status) }
144
+ )
175
145
call.start(listener, GrpcTrailers ())
176
146
call.cancel(" user cancel" , null )
177
147
runBlocking {
@@ -186,13 +156,11 @@ class GrpcCoreTest {
186
156
@Test
187
157
fun invalid_method_returnsNonOkStatus () {
188
158
val channel = createChannel()
189
- val call = channel.newHelloCall(" /helloworld .Greeter/NoSuchMethod" )
159
+ val call = channel.newHelloCall(" kotlinx.rpc.grpc.test .Greeter/NoSuchMethod" )
190
160
val statusDeferred = CompletableDeferred <Status >()
191
- val listener = object : ClientCall .Listener <HelloReply >() {
192
- override fun onClose (status : Status , trailers : GrpcTrailers ) {
193
- statusDeferred.complete(status)
194
- }
195
- }
161
+ val listener = createClientCallListener<HelloReply >(
162
+ onClose = { status, _ -> statusDeferred.complete(status) }
163
+ )
196
164
197
165
call.start(listener, GrpcTrailers ())
198
166
call.sendMessage(helloReq())
@@ -201,7 +169,7 @@ class GrpcCoreTest {
201
169
runBlocking {
202
170
withTimeout(10000 ) {
203
171
val status = statusDeferred.await()
204
- assertTrue(status.statusCode != StatusCode . OK )
172
+ assertEquals( StatusCode . UNIMPLEMENTED , status.statusCode )
205
173
}
206
174
}
207
175
shutdownAndWait(channel)
@@ -213,11 +181,9 @@ class GrpcCoreTest {
213
181
val channel = createChannel()
214
182
val call = channel.newHelloCall()
215
183
val statusDeferred = CompletableDeferred <Status >()
216
- val listener = object : ClientCall .Listener <HelloReply >() {
217
- override fun onClose (status : Status , trailers : GrpcTrailers ) {
218
- statusDeferred.complete(status)
219
- }
220
- }
184
+ val listener = createClientCallListener<HelloReply >(
185
+ onClose = { status, _ -> statusDeferred.complete(status) }
186
+ )
221
187
assertFailsWith<IllegalStateException > {
222
188
try {
223
189
call.start(listener, GrpcTrailers ())
@@ -234,13 +200,12 @@ class GrpcCoreTest {
234
200
val channel = createChannel()
235
201
val call = channel.newHelloCall()
236
202
val statusDeferred = CompletableDeferred <Status >()
237
- val listener = object : ClientCall .Listener <HelloReply >() {
238
- override fun onClose (status : Status , trailers : GrpcTrailers ) {
239
- statusDeferred.complete(status)
240
- }
241
- }
203
+ val listener = createClientCallListener<HelloReply >(
204
+ onClose = { status, _ -> statusDeferred.complete(status) }
205
+ )
242
206
243
207
channel.shutdown()
208
+ runBlocking { delay(100 ) }
244
209
call.start(listener, GrpcTrailers ())
245
210
call.sendMessage(helloReq())
246
211
call.halfClose()
@@ -259,11 +224,9 @@ class GrpcCoreTest {
259
224
val channel = createChannel()
260
225
val call = channel.newHelloCall()
261
226
val statusDeferred = CompletableDeferred <Status >()
262
- val listener = object : ClientCall .Listener <HelloReply >() {
263
- override fun onClose (status : Status , trailers : GrpcTrailers ) {
264
- statusDeferred.complete(status)
265
- }
266
- }
227
+ val listener = createClientCallListener<HelloReply >(
228
+ onClose = { status, _ -> statusDeferred.complete(status) }
229
+ )
267
230
268
231
call.start(listener, GrpcTrailers ())
269
232
// set timeout on the server to 1000 ms, to simulate a long-running call
@@ -276,19 +239,53 @@ class GrpcCoreTest {
276
239
channel.shutdownNow()
277
240
withTimeout(10000 ) {
278
241
val status = statusDeferred.await()
279
- assertEquals(StatusCode .CANCELLED , status.statusCode)
242
+ assertEquals(StatusCode .UNAVAILABLE , status.statusCode)
280
243
}
281
244
}
282
245
}
246
+ }
247
+
248
+ class GreeterServiceImpl : GreeterService {
249
+
250
+ override suspend fun SayHello (message : HelloRequest ): HelloReply {
251
+ delay(message.timeout?.toLong() ? : 0 )
252
+ return HelloReply {
253
+ this .message = " Hello ${message.name} "
254
+ }
255
+ }
283
256
257
+
258
+ /* *
259
+ * Run this on JVM before executing tests.
260
+ */
284
261
@Test
285
- fun unaryCallTest () = runBlocking {
286
- val ch = createChannel()
287
- val desc = descriptorFor()
288
- val req = helloReq()
289
- repeat(1000 ) {
290
- val res = unaryRpc(ch.platformApi, desc, req)
291
- assertEquals(" Hello world" , res.message)
262
+ fun runServer () = runTest(timeout = Duration .INFINITE ) {
263
+ val server = GrpcServer (
264
+ port = PORT ,
265
+ builder = { registerService<GreeterService > { GreeterServiceImpl () } }
266
+ )
267
+
268
+ try {
269
+ server.start()
270
+ println (" Server started" )
271
+ server.awaitTermination()
272
+ } finally {
273
+ server.shutdown()
274
+ server.awaitTermination()
292
275
}
293
276
}
294
- }
277
+
278
+ }
279
+
280
+
281
+ private fun <T > createClientCallListener (
282
+ onHeaders : (headers: GrpcTrailers ) -> Unit = {},
283
+ onMessage : (message: T ) -> Unit = {},
284
+ onClose : (status: Status , trailers: GrpcTrailers ) -> Unit = { _, _ -> },
285
+ onReady : () -> Unit = {},
286
+ ) = clientCallListener(
287
+ onHeaders = onHeaders,
288
+ onMessage = onMessage,
289
+ onClose = onClose,
290
+ onReady = onReady,
291
+ )
0 commit comments