@@ -58,6 +58,9 @@ internal class NativeServerCall<Request, Response>(
58
58
private var cancelled = false
59
59
private var finalized = atomic(false )
60
60
61
+ // Tracks whether at least one request message has been received on this call.
62
+ private var receivedFirstMessage = false
63
+
61
64
// we currently don't buffer messages, so after one `sendMessage` call, ready turns false. (KRPC-192)
62
65
private val ready = atomic(true )
63
66
@@ -184,8 +187,16 @@ internal class NativeServerCall<Request, Response>(
184
187
// and thus the client half-closed.
185
188
val buf = recvPtr.value
186
189
if (buf == null ) {
187
- callbackMutex.withLock {
188
- listener.onHalfClose()
190
+ // end-of-stream observed. for UNARY, absence of any request is a protocol violation.
191
+ if (methodDescriptor.type == MethodType .UNARY && ! receivedFirstMessage) {
192
+ cancelCall(
193
+ grpc_status_code.GRPC_STATUS_INTERNAL ,
194
+ " Unary call half-closed before receiving a request message"
195
+ )
196
+ } else {
197
+ callbackMutex.withLock {
198
+ listener.onHalfClose()
199
+ }
189
200
}
190
201
} else {
191
202
val msg = methodDescriptor.getRequestMarshaller()
@@ -194,6 +205,9 @@ internal class NativeServerCall<Request, Response>(
194
205
// destroy the buffer, we don't need it anymore
195
206
grpc_byte_buffer_destroy(buf)
196
207
208
+ // Mark that we have received at least one request message
209
+ receivedFirstMessage = true
210
+
197
211
callbackMutex.withLock {
198
212
listener.onMessage(msg)
199
213
}
0 commit comments