Skip to content

Commit 14b18a4

Browse files
author
Cedric Kassen
committed
promisify receiveUnaryMessage server-call
1 parent 1cc36e8 commit 14b18a4

File tree

2 files changed

+108
-119
lines changed

2 files changed

+108
-119
lines changed

packages/grpc-js/src/server-call.ts

Lines changed: 72 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -553,102 +553,100 @@ export class Http2ServerCallStream<
553553
return metadata;
554554
}
555555

556-
receiveUnaryMessage(
557-
encoding: string,
558-
next: (
559-
err: Partial<ServerStatusResponse> | null,
560-
request?: RequestType
561-
) => void
562-
): void {
563-
const { stream } = this;
556+
receiveUnaryMessage(encoding: string): Promise<RequestType | void> {
557+
return new Promise((resolve, reject) => {
558+
const { stream } = this;
559+
560+
let receivedLength = 0;
561+
562+
// eslint-disable-next-line @typescript-eslint/no-this-alias
563+
const call = this;
564+
const body: Buffer[] = [];
565+
const limit = this.maxReceiveMessageSize;
564566

565-
let receivedLength = 0;
567+
this.stream.on('data', onData);
568+
this.stream.on('end', onEnd);
569+
this.stream.on('error', onEnd);
566570

567-
// eslint-disable-next-line @typescript-eslint/no-this-alias
568-
const call = this;
569-
const body: Buffer[] = [];
570-
const limit = this.maxReceiveMessageSize;
571+
async function onData(chunk: Buffer) {
572+
receivedLength += chunk.byteLength;
571573

572-
stream.on('data', onData);
573-
stream.on('end', onEnd);
574-
stream.on('error', onEnd);
574+
if (limit !== -1 && receivedLength > limit) {
575+
stream.removeListener('data', onData);
576+
stream.removeListener('end', onEnd);
577+
stream.removeListener('error', onEnd);
575578

576-
function onData(chunk: Buffer) {
577-
receivedLength += chunk.byteLength;
579+
reject({
580+
code: Status.RESOURCE_EXHAUSTED,
581+
details: `Received message larger than max (${receivedLength} vs. ${limit})`,
582+
});
583+
return;
584+
}
585+
586+
body.push(chunk);
587+
}
578588

579-
if (limit !== -1 && receivedLength > limit) {
589+
async function onEnd(err?: Error) {
580590
stream.removeListener('data', onData);
581591
stream.removeListener('end', onEnd);
582592
stream.removeListener('error', onEnd);
583-
next({
584-
code: Status.RESOURCE_EXHAUSTED,
585-
details: `Received message larger than max (${receivedLength} vs. ${limit})`,
586-
});
587-
return;
588-
}
589-
590-
body.push(chunk);
591-
}
592593

593-
function onEnd(err?: Error) {
594-
stream.removeListener('data', onData);
595-
stream.removeListener('end', onEnd);
596-
stream.removeListener('error', onEnd);
594+
if (err !== undefined) {
595+
reject({ code: Status.INTERNAL, details: err.message });
596+
return;
597+
}
597598

598-
if (err !== undefined) {
599-
next({ code: Status.INTERNAL, details: err.message });
600-
return;
601-
}
599+
if (receivedLength === 0) {
600+
reject({
601+
code: Status.INTERNAL,
602+
details: 'received empty unary message',
603+
});
604+
return;
605+
}
602606

603-
if (receivedLength === 0) {
604-
next({
605-
code: Status.INTERNAL,
606-
details: 'received empty unary message',
607-
});
608-
return;
609-
}
607+
call.emit('receiveMessage');
610608

611-
call.emit('receiveMessage');
609+
const requestBytes = Buffer.concat(body, receivedLength);
610+
const compressed = requestBytes.readUInt8(0) === 1;
611+
const compressedMessageEncoding = compressed ? encoding : 'identity';
612+
const decompressedMessage = call.getDecompressedMessage(
613+
requestBytes,
614+
compressedMessageEncoding
615+
);
612616

613-
const requestBytes = Buffer.concat(body, receivedLength);
614-
const compressed = requestBytes.readUInt8(0) === 1;
615-
const compressedMessageEncoding = compressed ? encoding : 'identity';
616-
const decompressedMessage = call.getDecompressedMessage(
617-
requestBytes,
618-
compressedMessageEncoding
619-
);
617+
if (Buffer.isBuffer(decompressedMessage)) {
618+
call.safeDeserializeMessage(decompressedMessage, resolve, reject);
619+
return;
620+
}
620621

621-
if (Buffer.isBuffer(decompressedMessage)) {
622-
call.safeDeserializeMessage(decompressedMessage, next);
623-
return;
622+
decompressedMessage.then(
623+
decompressed =>
624+
call.safeDeserializeMessage(decompressed, resolve, reject),
625+
(err: any) =>
626+
reject(
627+
err.code
628+
? err
629+
: {
630+
code: Status.INTERNAL,
631+
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
632+
}
633+
)
634+
);
624635
}
625-
626-
decompressedMessage.then(
627-
decompressed => call.safeDeserializeMessage(decompressed, next),
628-
(err: any) =>
629-
next(
630-
err.code
631-
? err
632-
: {
633-
code: Status.INTERNAL,
634-
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
635-
}
636-
)
637-
);
638-
}
636+
});
639637
}
640638

641639
private safeDeserializeMessage(
642640
buffer: Buffer,
643-
next: (
644-
err: Partial<ServerStatusResponse> | null,
645-
request?: RequestType
646-
) => void
641+
resolve: (
642+
value: void | RequestType | PromiseLike<void | RequestType>
643+
) => void,
644+
reject: (reason: any) => void
647645
) {
648646
try {
649-
next(null, this.deserializeMessage(buffer));
647+
resolve(this.deserializeMessage(buffer));
650648
} catch (err) {
651-
next({
649+
reject({
652650
details: getErrorMessage(err),
653651
code: Status.INTERNAL,
654652
});

packages/grpc-js/src/server.ts

Lines changed: 36 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ function getUnimplementedStatusResponse(
9696
return {
9797
code: Status.UNIMPLEMENTED,
9898
details: `The server does not implement the method ${methodName}`,
99+
metadata: new Metadata(),
99100
};
100101
}
101102

@@ -1176,40 +1177,35 @@ export class Server {
11761177
}
11771178
}
11781179

1179-
function handleUnary<RequestType, ResponseType>(
1180+
async function handleUnary<RequestType, ResponseType>(
11801181
call: Http2ServerCallStream<RequestType, ResponseType>,
11811182
handler: UnaryHandler<RequestType, ResponseType>,
11821183
metadata: Metadata,
11831184
encoding: string
1184-
): void {
1185-
call.receiveUnaryMessage(encoding, (err, request) => {
1186-
if (err) {
1187-
call.sendError(err);
1188-
return;
1189-
}
1185+
): Promise<void> {
1186+
const request = await call.receiveUnaryMessage(encoding);
11901187

1191-
if (request === undefined || call.cancelled) {
1192-
return;
1193-
}
1188+
if (request === undefined || call.cancelled) {
1189+
return;
1190+
}
11941191

1195-
const emitter = new ServerUnaryCallImpl<RequestType, ResponseType>(
1196-
call,
1197-
metadata,
1198-
request
1199-
);
1192+
const emitter = new ServerUnaryCallImpl<RequestType, ResponseType>(
1193+
call,
1194+
metadata,
1195+
request
1196+
);
12001197

1201-
handler.func(
1202-
emitter,
1203-
(
1204-
err: ServerErrorResponse | ServerStatusResponse | null,
1205-
value?: ResponseType | null,
1206-
trailer?: Metadata,
1207-
flags?: number
1208-
) => {
1209-
call.sendUnaryMessage(err, value, trailer, flags);
1210-
}
1211-
);
1212-
});
1198+
handler.func(
1199+
emitter,
1200+
(
1201+
err: ServerErrorResponse | ServerStatusResponse | null,
1202+
value?: ResponseType | null,
1203+
trailer?: Metadata,
1204+
flags?: number
1205+
) => {
1206+
call.sendUnaryMessage(err, value, trailer, flags);
1207+
}
1208+
);
12131209
}
12141210

12151211
function handleClientStreaming<RequestType, ResponseType>(
@@ -1243,31 +1239,26 @@ function handleClientStreaming<RequestType, ResponseType>(
12431239
handler.func(stream, respond);
12441240
}
12451241

1246-
function handleServerStreaming<RequestType, ResponseType>(
1242+
async function handleServerStreaming<RequestType, ResponseType>(
12471243
call: Http2ServerCallStream<RequestType, ResponseType>,
12481244
handler: ServerStreamingHandler<RequestType, ResponseType>,
12491245
metadata: Metadata,
12501246
encoding: string
1251-
): void {
1252-
call.receiveUnaryMessage(encoding, (err, request) => {
1253-
if (err) {
1254-
call.sendError(err);
1255-
return;
1256-
}
1247+
): Promise<void> {
1248+
const request = await call.receiveUnaryMessage(encoding);
12571249

1258-
if (request === undefined || call.cancelled) {
1259-
return;
1260-
}
1250+
if (request === undefined || call.cancelled) {
1251+
return;
1252+
}
12611253

1262-
const stream = new ServerWritableStreamImpl<RequestType, ResponseType>(
1263-
call,
1264-
metadata,
1265-
handler.serialize,
1266-
request
1267-
);
1254+
const stream = new ServerWritableStreamImpl<RequestType, ResponseType>(
1255+
call,
1256+
metadata,
1257+
handler.serialize,
1258+
request
1259+
);
12681260

1269-
handler.func(stream);
1270-
});
1261+
handler.func(stream);
12711262
}
12721263

12731264
function handleBidiStreaming<RequestType, ResponseType>(

0 commit comments

Comments
 (0)