Skip to content
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 54 additions & 27 deletions packages/grpc-js/src/server-interceptors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ function trace(text: string) {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
}

export interface GrpcFrame {
infoBytes: Buffer;
payload: Buffer;
}

export interface ServerMetadataListener {
(metadata: Metadata, next: (metadata: Metadata) => void): void;
}
Expand Down Expand Up @@ -613,16 +618,15 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa
* @param value
* @returns
*/
private serializeMessage(value: any) {
const messageBuffer = this.handler.serialize(value);
const byteLength = messageBuffer.byteLength;
const output = Buffer.allocUnsafe(byteLength + 5);
private serializeMessage(value: any): GrpcFrame {
const payload = this.handler.serialize(value);

const infoBytes = Buffer.allocUnsafe(5);
/* Note: response compression is currently not supported, so this
* compressed bit is always 0. */
output.writeUInt8(0, 0);
output.writeUInt32BE(byteLength, 1);
messageBuffer.copy(output, 5);
return output;
infoBytes.writeUInt8(0, 0);
infoBytes.writeUInt32BE(payload.byteLength, 1);
return { infoBytes, payload };
}

private decompressMessage(
Expand Down Expand Up @@ -740,47 +744,70 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa
};
this.stream.respond(headers, defaultResponseOptions);
}
private createFrameWriteErrorCallback(
callback: () => void
): (err: unknown) => void {
let errored = false;
let written = 0;
return (error: unknown) => {
if (errored) {
return;
}
if (error) {
this.sendStatus({
code: Status.INTERNAL,
details: `Error writing message: ${getErrorMessage(error)}`,
metadata: null,
});
errored = true;
return;
}
written++;
if (written !== 2) {
return;
}
this.callEventTracker?.addMessageSent();
callback();
};
}
sendMessage(message: any, callback: () => void): void {
if (this.checkCancelled()) {
return;
}
let response: Buffer;
let frame: GrpcFrame;
try {
response = this.serializeMessage(message);
frame = this.serializeMessage(message);
} catch (e) {
this.sendStatus({
code: Status.INTERNAL,
details: `Error serializing response: ${getErrorMessage(e)}`,
metadata: null
metadata: null,
});
return;
}

if (
this.maxSendMessageSize !== -1 &&
response.length - 5 > this.maxSendMessageSize
frame.payload.length > this.maxSendMessageSize
) {
this.sendStatus({
code: Status.RESOURCE_EXHAUSTED,
details: `Sent message larger than max (${response.length} vs. ${this.maxSendMessageSize})`,
metadata: null
details: `Sent message larger than max (${frame.payload.length} vs. ${this.maxSendMessageSize})`,
metadata: null,
});
return;
}
this.maybeSendMetadata();
trace('Request to ' + this.handler.path + ' sent data frame of size ' + response.length);
this.stream.write(response, error => {
if (error) {
this.sendStatus({
code: Status.INTERNAL,
details: `Error writing message: ${getErrorMessage(error)}`,
metadata: null
});
return;
}
this.callEventTracker?.addMessageSent();
callback();
});
trace(
'Request to ' +
this.handler.path +
' sent data frame of size ' +
frame.payload.length
);

const cb = this.createFrameWriteErrorCallback(callback);
this.stream.write(frame.infoBytes, cb);
this.stream.write(frame.payload, cb);
}
sendStatus(status: PartialStatusObject): void {
if (this.checkCancelled()) {
Expand Down