From da1ca343946f378e1e6a2722c1d6da6ef8c5e862 Mon Sep 17 00:00:00 2001 From: Sergey Saltykov Date: Sat, 3 Feb 2024 02:12:38 +0700 Subject: [PATCH 1/4] set additional allocated buffer len to 5 --- packages/grpc-js/src/server-interceptors.ts | 243 ++++++++++++++------ 1 file changed, 167 insertions(+), 76 deletions(-) diff --git a/packages/grpc-js/src/server-interceptors.ts b/packages/grpc-js/src/server-interceptors.ts index c03f3028c..75550e56b 100644 --- a/packages/grpc-js/src/server-interceptors.ts +++ b/packages/grpc-js/src/server-interceptors.ts @@ -15,19 +15,24 @@ * */ -import { PartialStatusObject} from "./call-interface"; -import { ServerMethodDefinition } from "./make-client"; -import { Metadata } from "./metadata"; -import { ChannelOptions } from "./channel-options"; -import { Handler, ServerErrorResponse } from "./server-call"; -import { Deadline } from "./deadline"; -import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, DEFAULT_MAX_SEND_MESSAGE_LENGTH, LogVerbosity, Status } from "./constants"; +import { PartialStatusObject } from './call-interface'; +import { ServerMethodDefinition } from './make-client'; +import { Metadata } from './metadata'; +import { ChannelOptions } from './channel-options'; +import { Handler, ServerErrorResponse } from './server-call'; +import { Deadline } from './deadline'; +import { + DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, + DEFAULT_MAX_SEND_MESSAGE_LENGTH, + LogVerbosity, + Status, +} from './constants'; import * as http2 from 'http2'; -import { getErrorMessage } from "./error"; +import { getErrorMessage } from './error'; import * as zlib from 'zlib'; -import { promisify } from "util"; -import { StreamDecoder } from "./stream-decoder"; -import { CallEventTracker } from "./transport"; +import { promisify } from 'util'; +import { StreamDecoder } from './stream-decoder'; +import { CallEventTracker } from './transport'; import * as logging from './logging'; const unzip = promisify(zlib.unzip); @@ -39,6 +44,11 @@ function trace(text: string) { logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); } +export interface GrpcFrame { + infoBytes: Buffer; + payload: Buffer; +} + export interface ServerMetadataListener { (metadata: Metadata, next: (metadata: Metadata) => void): void; } @@ -96,7 +106,7 @@ export class ServerListenerBuilder { onReceiveMetadata: this.metadata, onReceiveMessage: this.message, onReceiveHalfClose: this.halfClose, - onCancel: this.cancel + onCancel: this.cancel, }; } } @@ -109,8 +119,13 @@ export interface InterceptingServerListener { onCancel(): void; } -export function isInterceptingServerListener(listener: ServerListener | InterceptingServerListener): listener is InterceptingServerListener { - return listener.onReceiveMetadata !== undefined && listener.onReceiveMetadata.length === 1; +export function isInterceptingServerListener( + listener: ServerListener | InterceptingServerListener +): listener is InterceptingServerListener { + return ( + listener.onReceiveMetadata !== undefined && + listener.onReceiveMetadata.length === 1 + ); } class InterceptingServerListenerImpl implements InterceptingServerListener { @@ -124,7 +139,10 @@ class InterceptingServerListenerImpl implements InterceptingServerListener { private processingMessage: boolean = false; private hasPendingHalfClose: boolean = false; - constructor(private listener: FullServerListener, private nextListener: InterceptingServerListener) {} + constructor( + private listener: FullServerListener, + private nextListener: InterceptingServerListener + ) {} private processPendingMessage() { if (this.hasPendingMessage) { @@ -195,7 +213,6 @@ class InterceptingServerListenerImpl implements InterceptingServerListener { this.listener.onCancel(); this.nextListener.onCancel(); } - } export interface StartResponder { @@ -212,7 +229,10 @@ export interface MessageResponder { } export interface StatusResponder { - (status: PartialStatusObject, next: (status: PartialStatusObject) => void): void; + ( + status: PartialStatusObject, + next: (status: PartialStatusObject) => void + ): void; } export interface FullResponder { @@ -255,7 +275,7 @@ export class ResponderBuilder { start: this.start, sendMetadata: this.metadata, sendMessage: this.message, - sendStatus: this.status + sendStatus: this.status, }; } } @@ -270,11 +290,11 @@ const defaultServerListener: FullServerListener = { onReceiveHalfClose: next => { next(); }, - onCancel: () => {} + onCancel: () => {}, }; const defaultResponder: FullResponder = { - start: (next) => { + start: next => { next(); }, sendMetadata: (metadata, next) => { @@ -285,7 +305,7 @@ const defaultResponder: FullResponder = { }, sendStatus: (status, next) => { next(status); - } + }, }; export interface ServerInterceptingCallInterface { @@ -326,13 +346,19 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface { private pendingMessage: any = null; private pendingMessageCallback: (() => void) | null = null; private pendingStatus: PartialStatusObject | null = null; - constructor(private nextCall: ServerInterceptingCallInterface, responder?: Responder) { - this.responder = {...defaultResponder, ...responder}; + constructor( + private nextCall: ServerInterceptingCallInterface, + responder?: Responder + ) { + this.responder = { ...defaultResponder, ...responder }; } private processPendingMessage() { if (this.pendingMessageCallback) { - this.nextCall.sendMessage(this.pendingMessage, this.pendingMessageCallback); + this.nextCall.sendMessage( + this.pendingMessage, + this.pendingMessageCallback + ); this.pendingMessage = null; this.pendingMessageCallback = null; } @@ -347,8 +373,14 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface { start(listener: InterceptingServerListener): void { this.responder.start(interceptedListener => { - const fullInterceptedListener: FullServerListener = {...defaultServerListener, ...interceptedListener}; - const finalInterceptingListener = new InterceptingServerListenerImpl(fullInterceptedListener, listener); + const fullInterceptedListener: FullServerListener = { + ...defaultServerListener, + ...interceptedListener, + }; + const finalInterceptingListener = new InterceptingServerListenerImpl( + fullInterceptedListener, + listener + ); this.nextCall.start(finalInterceptingListener); }); } @@ -394,7 +426,10 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface { } export interface ServerInterceptor { - (methodDescriptor: ServerMethodDefinition, call: ServerInterceptingCallInterface): ServerInterceptingCall; + ( + methodDescriptor: ServerMethodDefinition, + call: ServerInterceptingCallInterface + ): ServerInterceptingCall; } interface DeadlineUnitIndexSignature { @@ -438,7 +473,9 @@ interface ReadQueueEntry { parsedMessage: any; } -export class BaseServerInterceptingCall implements ServerInterceptingCallInterface { +export class BaseServerInterceptingCall + implements ServerInterceptingCallInterface +{ private listener: InterceptingServerListener | null = null; private metadata: Metadata; private deadlineTimer: NodeJS.Timeout | null = null; @@ -485,7 +522,7 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa this.callEventTracker.onCallEnd({ code: Status.CANCELLED, details: 'Stream closed before sending status', - metadata: null + metadata: null, }); } @@ -548,7 +585,7 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa const status: PartialStatusObject = { code: Status.INTERNAL, details: `Invalid ${GRPC_TIMEOUT_HEADER} value "${timeoutHeader}"`, - metadata: null + metadata: null, }; // Wait for the constructor to complete before sending the error. process.nextTick(() => { @@ -565,11 +602,10 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa const status: PartialStatusObject = { code: Status.DEADLINE_EXCEEDED, details: 'Deadline exceeded', - metadata: null + metadata: null, }; this.sendStatus(status); }, timeout); - } private checkCancelled(): boolean { @@ -613,16 +649,15 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa * @param value * @returns */ - private serializeMessage(value: any) { - const messageBuffer = this.handler.serialize(value); - const byteLength = messageBuffer.byteLength; - const output = Buffer.allocUnsafe(byteLength + 5); + private serializeMessage(value: any): GrpcFrame { + const payload = this.handler.serialize(value); + + const infoBytes = Buffer.allocUnsafe(5); /* Note: response compression is currently not supported, so this * compressed bit is always 0. */ - output.writeUInt8(0, 0); - output.writeUInt32BE(byteLength, 1); - messageBuffer.copy(output, 5); - return output; + infoBytes.writeUInt8(0, 0); + infoBytes.writeUInt32BE(payload.byteLength, 1); + return { infoBytes, payload }; } private decompressMessage( @@ -650,14 +685,19 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa } const compressed = queueEntry.compressedMessage!.readUInt8(0) === 1; - const compressedMessageEncoding = compressed ? this.incomingEncoding : 'identity'; - const decompressedMessage = await this.decompressMessage(queueEntry.compressedMessage!, compressedMessageEncoding); + const compressedMessageEncoding = compressed + ? this.incomingEncoding + : 'identity'; + const decompressedMessage = await this.decompressMessage( + queueEntry.compressedMessage!, + compressedMessageEncoding + ); try { queueEntry.parsedMessage = this.handler.deserialize(decompressedMessage); } catch (err) { this.sendStatus({ code: Status.INTERNAL, - details: `Error deserializing request: ${(err as Error).message}` + details: `Error deserializing request: ${(err as Error).message}`, }); return; } @@ -666,7 +706,12 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa } private maybePushNextMessage() { - if (this.listener && this.isReadPending && this.readQueue.length > 0 && this.readQueue[0].type !== 'COMPRESSED') { + if ( + this.listener && + this.isReadPending && + this.readQueue.length > 0 && + this.readQueue[0].type !== 'COMPRESSED' + ) { this.isReadPending = false; const nextQueueEntry = this.readQueue.shift()!; if (nextQueueEntry.type === 'READABLE') { @@ -682,23 +727,33 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa if (this.checkCancelled()) { return; } - trace('Request to ' + this.handler.path + ' received data frame of size ' + data.length); + trace( + 'Request to ' + + this.handler.path + + ' received data frame of size ' + + data.length + ); const rawMessages = this.decoder.write(data); for (const messageBytes of rawMessages) { this.stream.pause(); - if (this.maxReceiveMessageSize !== -1 && messageBytes.length - 5 > this.maxReceiveMessageSize) { + if ( + this.maxReceiveMessageSize !== -1 && + messageBytes.length - 5 > this.maxReceiveMessageSize + ) { this.sendStatus({ code: Status.RESOURCE_EXHAUSTED, - details: `Received message larger than max (${messageBytes.length - 5} vs. ${this.maxReceiveMessageSize})`, - metadata: null + details: `Received message larger than max (${ + messageBytes.length - 5 + } vs. ${this.maxReceiveMessageSize})`, + metadata: null, }); return; } const queueEntry: ReadQueueEntry = { type: 'COMPRESSED', compressedMessage: messageBytes, - parsedMessage: null + parsedMessage: null, }; this.readQueue.push(queueEntry); this.decompressAndMaybePush(queueEntry); @@ -709,7 +764,7 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa this.readQueue.push({ type: 'HALF_CLOSE', compressedMessage: null, - parsedMessage: null + parsedMessage: null, }); this.receivedHalfClose = true; this.maybePushNextMessage(); @@ -740,47 +795,75 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa }; this.stream.respond(headers, defaultResponseOptions); } + private createFrameWriteErrorCallback( + callback: () => void + ): (err: unknown) => void { + let errored = false; + let written = 0; + return (error: unknown) => { + if (errored) { + return; + } + if (error) { + this.sendStatus({ + code: Status.INTERNAL, + details: `Error writing message: ${getErrorMessage(error)}`, + metadata: null, + }); + errored = true; + return; + } + written++; + if (written !== 2) { + return; + } + this.callEventTracker?.addMessageSent(); + callback(); + }; + } sendMessage(message: any, callback: () => void): void { if (this.checkCancelled()) { return; } - let response: Buffer; + let frame: GrpcFrame; try { - response = this.serializeMessage(message); + frame = this.serializeMessage(message); } catch (e) { this.sendStatus({ code: Status.INTERNAL, details: `Error serializing response: ${getErrorMessage(e)}`, - metadata: null + metadata: null, }); return; } if ( this.maxSendMessageSize !== -1 && - response.length - 5 > this.maxSendMessageSize + frame.payload.length > this.maxSendMessageSize ) { this.sendStatus({ code: Status.RESOURCE_EXHAUSTED, - details: `Sent message larger than max (${response.length} vs. ${this.maxSendMessageSize})`, - metadata: null + details: `Sent message larger than max (${frame.payload.length} vs. ${this.maxSendMessageSize})`, + metadata: null, }); return; } this.maybeSendMetadata(); - trace('Request to ' + this.handler.path + ' sent data frame of size ' + response.length); - this.stream.write(response, error => { - if (error) { - this.sendStatus({ - code: Status.INTERNAL, - details: `Error writing message: ${getErrorMessage(error)}`, - metadata: null - }); - return; - } - this.callEventTracker?.addMessageSent(); - callback(); - }); + trace( + 'Request to ' + + this.handler.path + + ' sent data frame of size ' + + frame.payload.length + ); + + this.stream.write( + frame.infoBytes, + this.createFrameWriteErrorCallback(callback) + ); + this.stream.write( + frame.payload, + this.createFrameWriteErrorCallback(callback) + ); } sendStatus(status: PartialStatusObject): void { if (this.checkCancelled()) { @@ -871,16 +954,24 @@ export function getServerInterceptingCall( handler: Handler, options: ChannelOptions ) { - const methodDefinition: ServerMethodDefinition = { path: handler.path, requestStream: handler.type === 'clientStream' || handler.type === 'bidi', responseStream: handler.type === 'serverStream' || handler.type === 'bidi', requestDeserialize: handler.deserialize, - responseSerialize: handler.serialize - } - const baseCall = new BaseServerInterceptingCall(stream, headers, callEventTracker, handler, options); - return interceptors.reduce((call: ServerInterceptingCallInterface, interceptor: ServerInterceptor) => { - return interceptor(methodDefinition, call); - }, baseCall); + responseSerialize: handler.serialize, + }; + const baseCall = new BaseServerInterceptingCall( + stream, + headers, + callEventTracker, + handler, + options + ); + return interceptors.reduce( + (call: ServerInterceptingCallInterface, interceptor: ServerInterceptor) => { + return interceptor(methodDefinition, call); + }, + baseCall + ); } From e40021533adf9c2d054d9184ac599601b3e08033 Mon Sep 17 00:00:00 2001 From: Sergey Saltykov Date: Sat, 3 Feb 2024 02:48:46 +0700 Subject: [PATCH 2/4] set additional allocated buffer len to 5 --- packages/grpc-js/src/server-interceptors.ts | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/packages/grpc-js/src/server-interceptors.ts b/packages/grpc-js/src/server-interceptors.ts index 75550e56b..a3bc213c1 100644 --- a/packages/grpc-js/src/server-interceptors.ts +++ b/packages/grpc-js/src/server-interceptors.ts @@ -856,14 +856,9 @@ export class BaseServerInterceptingCall frame.payload.length ); - this.stream.write( - frame.infoBytes, - this.createFrameWriteErrorCallback(callback) - ); - this.stream.write( - frame.payload, - this.createFrameWriteErrorCallback(callback) - ); + const cb = this.createFrameWriteErrorCallback(callback); + this.stream.write(frame.infoBytes, cb); + this.stream.write(frame.payload, cb); } sendStatus(status: PartialStatusObject): void { if (this.checkCancelled()) { From 5a191f36f9a8ad384b62a675691b4bef48241b57 Mon Sep 17 00:00:00 2001 From: Sergey Saltykov Date: Sat, 3 Feb 2024 03:42:09 +0700 Subject: [PATCH 3/4] revert formatting --- packages/grpc-js/src/server-interceptors.ts | 157 ++++++-------------- 1 file changed, 49 insertions(+), 108 deletions(-) diff --git a/packages/grpc-js/src/server-interceptors.ts b/packages/grpc-js/src/server-interceptors.ts index a3bc213c1..f6184a275 100644 --- a/packages/grpc-js/src/server-interceptors.ts +++ b/packages/grpc-js/src/server-interceptors.ts @@ -15,24 +15,19 @@ * */ -import { PartialStatusObject } from './call-interface'; -import { ServerMethodDefinition } from './make-client'; -import { Metadata } from './metadata'; -import { ChannelOptions } from './channel-options'; -import { Handler, ServerErrorResponse } from './server-call'; -import { Deadline } from './deadline'; -import { - DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, - DEFAULT_MAX_SEND_MESSAGE_LENGTH, - LogVerbosity, - Status, -} from './constants'; +import { PartialStatusObject} from "./call-interface"; +import { ServerMethodDefinition } from "./make-client"; +import { Metadata } from "./metadata"; +import { ChannelOptions } from "./channel-options"; +import { Handler, ServerErrorResponse } from "./server-call"; +import { Deadline } from "./deadline"; +import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, DEFAULT_MAX_SEND_MESSAGE_LENGTH, LogVerbosity, Status } from "./constants"; import * as http2 from 'http2'; -import { getErrorMessage } from './error'; +import { getErrorMessage } from "./error"; import * as zlib from 'zlib'; -import { promisify } from 'util'; -import { StreamDecoder } from './stream-decoder'; -import { CallEventTracker } from './transport'; +import { promisify } from "util"; +import { StreamDecoder } from "./stream-decoder"; +import { CallEventTracker } from "./transport"; import * as logging from './logging'; const unzip = promisify(zlib.unzip); @@ -106,7 +101,7 @@ export class ServerListenerBuilder { onReceiveMetadata: this.metadata, onReceiveMessage: this.message, onReceiveHalfClose: this.halfClose, - onCancel: this.cancel, + onCancel: this.cancel }; } } @@ -119,13 +114,8 @@ export interface InterceptingServerListener { onCancel(): void; } -export function isInterceptingServerListener( - listener: ServerListener | InterceptingServerListener -): listener is InterceptingServerListener { - return ( - listener.onReceiveMetadata !== undefined && - listener.onReceiveMetadata.length === 1 - ); +export function isInterceptingServerListener(listener: ServerListener | InterceptingServerListener): listener is InterceptingServerListener { + return listener.onReceiveMetadata !== undefined && listener.onReceiveMetadata.length === 1; } class InterceptingServerListenerImpl implements InterceptingServerListener { @@ -139,10 +129,7 @@ class InterceptingServerListenerImpl implements InterceptingServerListener { private processingMessage: boolean = false; private hasPendingHalfClose: boolean = false; - constructor( - private listener: FullServerListener, - private nextListener: InterceptingServerListener - ) {} + constructor(private listener: FullServerListener, private nextListener: InterceptingServerListener) {} private processPendingMessage() { if (this.hasPendingMessage) { @@ -213,6 +200,7 @@ class InterceptingServerListenerImpl implements InterceptingServerListener { this.listener.onCancel(); this.nextListener.onCancel(); } + } export interface StartResponder { @@ -229,10 +217,7 @@ export interface MessageResponder { } export interface StatusResponder { - ( - status: PartialStatusObject, - next: (status: PartialStatusObject) => void - ): void; + (status: PartialStatusObject, next: (status: PartialStatusObject) => void): void; } export interface FullResponder { @@ -275,7 +260,7 @@ export class ResponderBuilder { start: this.start, sendMetadata: this.metadata, sendMessage: this.message, - sendStatus: this.status, + sendStatus: this.status }; } } @@ -290,11 +275,11 @@ const defaultServerListener: FullServerListener = { onReceiveHalfClose: next => { next(); }, - onCancel: () => {}, + onCancel: () => {} }; const defaultResponder: FullResponder = { - start: next => { + start: (next) => { next(); }, sendMetadata: (metadata, next) => { @@ -305,7 +290,7 @@ const defaultResponder: FullResponder = { }, sendStatus: (status, next) => { next(status); - }, + } }; export interface ServerInterceptingCallInterface { @@ -346,19 +331,13 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface { private pendingMessage: any = null; private pendingMessageCallback: (() => void) | null = null; private pendingStatus: PartialStatusObject | null = null; - constructor( - private nextCall: ServerInterceptingCallInterface, - responder?: Responder - ) { - this.responder = { ...defaultResponder, ...responder }; + constructor(private nextCall: ServerInterceptingCallInterface, responder?: Responder) { + this.responder = {...defaultResponder, ...responder}; } private processPendingMessage() { if (this.pendingMessageCallback) { - this.nextCall.sendMessage( - this.pendingMessage, - this.pendingMessageCallback - ); + this.nextCall.sendMessage(this.pendingMessage, this.pendingMessageCallback); this.pendingMessage = null; this.pendingMessageCallback = null; } @@ -373,14 +352,8 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface { start(listener: InterceptingServerListener): void { this.responder.start(interceptedListener => { - const fullInterceptedListener: FullServerListener = { - ...defaultServerListener, - ...interceptedListener, - }; - const finalInterceptingListener = new InterceptingServerListenerImpl( - fullInterceptedListener, - listener - ); + const fullInterceptedListener: FullServerListener = {...defaultServerListener, ...interceptedListener}; + const finalInterceptingListener = new InterceptingServerListenerImpl(fullInterceptedListener, listener); this.nextCall.start(finalInterceptingListener); }); } @@ -426,10 +399,7 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface { } export interface ServerInterceptor { - ( - methodDescriptor: ServerMethodDefinition, - call: ServerInterceptingCallInterface - ): ServerInterceptingCall; + (methodDescriptor: ServerMethodDefinition, call: ServerInterceptingCallInterface): ServerInterceptingCall; } interface DeadlineUnitIndexSignature { @@ -473,9 +443,7 @@ interface ReadQueueEntry { parsedMessage: any; } -export class BaseServerInterceptingCall - implements ServerInterceptingCallInterface -{ +export class BaseServerInterceptingCall implements ServerInterceptingCallInterface { private listener: InterceptingServerListener | null = null; private metadata: Metadata; private deadlineTimer: NodeJS.Timeout | null = null; @@ -522,7 +490,7 @@ export class BaseServerInterceptingCall this.callEventTracker.onCallEnd({ code: Status.CANCELLED, details: 'Stream closed before sending status', - metadata: null, + metadata: null }); } @@ -585,7 +553,7 @@ export class BaseServerInterceptingCall const status: PartialStatusObject = { code: Status.INTERNAL, details: `Invalid ${GRPC_TIMEOUT_HEADER} value "${timeoutHeader}"`, - metadata: null, + metadata: null }; // Wait for the constructor to complete before sending the error. process.nextTick(() => { @@ -602,10 +570,11 @@ export class BaseServerInterceptingCall const status: PartialStatusObject = { code: Status.DEADLINE_EXCEEDED, details: 'Deadline exceeded', - metadata: null, + metadata: null }; this.sendStatus(status); }, timeout); + } private checkCancelled(): boolean { @@ -685,19 +654,14 @@ export class BaseServerInterceptingCall } const compressed = queueEntry.compressedMessage!.readUInt8(0) === 1; - const compressedMessageEncoding = compressed - ? this.incomingEncoding - : 'identity'; - const decompressedMessage = await this.decompressMessage( - queueEntry.compressedMessage!, - compressedMessageEncoding - ); + const compressedMessageEncoding = compressed ? this.incomingEncoding : 'identity'; + const decompressedMessage = await this.decompressMessage(queueEntry.compressedMessage!, compressedMessageEncoding); try { queueEntry.parsedMessage = this.handler.deserialize(decompressedMessage); } catch (err) { this.sendStatus({ code: Status.INTERNAL, - details: `Error deserializing request: ${(err as Error).message}`, + details: `Error deserializing request: ${(err as Error).message}` }); return; } @@ -706,12 +670,7 @@ export class BaseServerInterceptingCall } private maybePushNextMessage() { - if ( - this.listener && - this.isReadPending && - this.readQueue.length > 0 && - this.readQueue[0].type !== 'COMPRESSED' - ) { + if (this.listener && this.isReadPending && this.readQueue.length > 0 && this.readQueue[0].type !== 'COMPRESSED') { this.isReadPending = false; const nextQueueEntry = this.readQueue.shift()!; if (nextQueueEntry.type === 'READABLE') { @@ -727,33 +686,23 @@ export class BaseServerInterceptingCall if (this.checkCancelled()) { return; } - trace( - 'Request to ' + - this.handler.path + - ' received data frame of size ' + - data.length - ); + trace('Request to ' + this.handler.path + ' received data frame of size ' + data.length); const rawMessages = this.decoder.write(data); for (const messageBytes of rawMessages) { this.stream.pause(); - if ( - this.maxReceiveMessageSize !== -1 && - messageBytes.length - 5 > this.maxReceiveMessageSize - ) { + if (this.maxReceiveMessageSize !== -1 && messageBytes.length - 5 > this.maxReceiveMessageSize) { this.sendStatus({ code: Status.RESOURCE_EXHAUSTED, - details: `Received message larger than max (${ - messageBytes.length - 5 - } vs. ${this.maxReceiveMessageSize})`, - metadata: null, + details: `Received message larger than max (${messageBytes.length - 5} vs. ${this.maxReceiveMessageSize})`, + metadata: null }); return; } const queueEntry: ReadQueueEntry = { type: 'COMPRESSED', compressedMessage: messageBytes, - parsedMessage: null, + parsedMessage: null }; this.readQueue.push(queueEntry); this.decompressAndMaybePush(queueEntry); @@ -764,7 +713,7 @@ export class BaseServerInterceptingCall this.readQueue.push({ type: 'HALF_CLOSE', compressedMessage: null, - parsedMessage: null, + parsedMessage: null }); this.receivedHalfClose = true; this.maybePushNextMessage(); @@ -949,24 +898,16 @@ export function getServerInterceptingCall( handler: Handler, options: ChannelOptions ) { + const methodDefinition: ServerMethodDefinition = { path: handler.path, requestStream: handler.type === 'clientStream' || handler.type === 'bidi', responseStream: handler.type === 'serverStream' || handler.type === 'bidi', requestDeserialize: handler.deserialize, - responseSerialize: handler.serialize, - }; - const baseCall = new BaseServerInterceptingCall( - stream, - headers, - callEventTracker, - handler, - options - ); - return interceptors.reduce( - (call: ServerInterceptingCallInterface, interceptor: ServerInterceptor) => { - return interceptor(methodDefinition, call); - }, - baseCall - ); + responseSerialize: handler.serialize + } + const baseCall = new BaseServerInterceptingCall(stream, headers, callEventTracker, handler, options); + return interceptors.reduce((call: ServerInterceptingCallInterface, interceptor: ServerInterceptor) => { + return interceptor(methodDefinition, call); + }, baseCall); } From 4a8c4324859b400e45cd7665e443478bad2ca125 Mon Sep 17 00:00:00 2001 From: Sergey Saltykov Date: Sat, 3 Feb 2024 08:27:19 +0700 Subject: [PATCH 4/4] remove useless interface export --- packages/grpc-js/src/server-interceptors.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js/src/server-interceptors.ts b/packages/grpc-js/src/server-interceptors.ts index f6184a275..405f526bf 100644 --- a/packages/grpc-js/src/server-interceptors.ts +++ b/packages/grpc-js/src/server-interceptors.ts @@ -39,7 +39,7 @@ function trace(text: string) { logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); } -export interface GrpcFrame { +interface GrpcFrame { infoBytes: Buffer; payload: Buffer; }