Skip to content

Commit c4c321d

Browse files
committed
grpc-js: Handle filters in ResolvingCall instead of LoadBalancingCall
1 parent aaa568f commit c4c321d

File tree

3 files changed

+83
-74
lines changed

3 files changed

+83
-74
lines changed

packages/grpc-js/src/internal-channel.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ export class InternalChannel {
402402
method +
403403
'"'
404404
);
405-
return new LoadBalancingCall(this, callConfig, method, host, credentials, deadline, this.filterStackFactory, callNumber);
405+
return new LoadBalancingCall(this, callConfig, method, host, credentials, deadline, callNumber);
406406
}
407407

408408
createInnerCall(

packages/grpc-js/src/load-balancing-call.ts

Lines changed: 13 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,11 @@ export interface StatusObjectWithProgress extends StatusObject {
4141
export class LoadBalancingCall implements Call {
4242
private child: SubchannelCall | null = null;
4343
private readPending = false;
44-
private writeFilterPending = false;
4544
private pendingMessage: {context: MessageContext, message: Buffer} | null = null;
4645
private pendingHalfClose = false;
47-
private readFilterPending = false;
4846
private pendingChildStatus: StatusObject | null = null;
4947
private ended = false;
5048
private serviceUrl: string;
51-
private filterStack: FilterStack;
5249
private metadata: Metadata | null = null;
5350
private listener: InterceptingListener | null = null;
5451
private onCallEnded: ((statusCode: Status) => void) | null = null;
@@ -59,11 +56,8 @@ export class LoadBalancingCall implements Call {
5956
private readonly host : string,
6057
private readonly credentials: CallCredentials,
6158
private readonly deadline: Deadline,
62-
filterStackFactory: FilterStackFactory,
6359
private readonly callNumber: number
6460
) {
65-
this.filterStack = filterStackFactory.createFilter();
66-
6761
const splitPath: string[] = this.methodName.split('/');
6862
let serviceName = '';
6963
/* The standard path format is "/{serviceName}/{methodName}", so if we split
@@ -90,8 +84,7 @@ export class LoadBalancingCall implements Call {
9084
if (!this.ended) {
9185
this.ended = true;
9286
this.trace('ended with status: code=' + status.code + ' details="' + status.details + '"');
93-
const filteredStatus = this.filterStack.receiveTrailers(status);
94-
const finalStatus = {...filteredStatus, progress};
87+
const finalStatus = {...status, progress};
9588
this.listener?.onReceiveStatus(finalStatus);
9689
this.onCallEnded?.(finalStatus.code);
9790
}
@@ -152,26 +145,13 @@ export class LoadBalancingCall implements Call {
152145
try {
153146
this.child = pickResult.subchannel!.getRealSubchannel().createCall(finalMetadata, this.host, this.methodName, {
154147
onReceiveMetadata: metadata => {
155-
this.listener!.onReceiveMetadata(this.filterStack.receiveMetadata(metadata));
148+
this.listener!.onReceiveMetadata(metadata);
156149
},
157150
onReceiveMessage: message => {
158-
this.readFilterPending = true;
159-
this.filterStack.receiveMessage(message).then(filteredMesssage => {
160-
this.readFilterPending = false;
161-
this.listener!.onReceiveMessage(filteredMesssage);
162-
if (this.pendingChildStatus) {
163-
this.outputStatus(this.pendingChildStatus, 'PROCESSED');
164-
}
165-
}, (status: StatusObject) => {
166-
this.cancelWithStatus(status.code, status.details);
167-
});
151+
this.listener!.onReceiveMessage(message);
168152
},
169153
onReceiveStatus: status => {
170-
if (this.readFilterPending) {
171-
this.pendingChildStatus = status;
172-
} else {
173-
this.outputStatus(status, 'PROCESSED');
174-
}
154+
this.outputStatus(status, 'PROCESSED');
175155
}
176156
});
177157
} catch (error) {
@@ -201,7 +181,7 @@ export class LoadBalancingCall implements Call {
201181
if (this.pendingMessage) {
202182
this.child.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
203183
}
204-
if (this.pendingHalfClose && !this.writeFilterPending) {
184+
if (this.pendingHalfClose) {
205185
this.child.halfClose();
206186
}
207187
}, (error: Error & { code: number }) => {
@@ -249,29 +229,16 @@ export class LoadBalancingCall implements Call {
249229
start(metadata: Metadata, listener: InterceptingListener): void {
250230
this.trace('start called');
251231
this.listener = listener;
252-
this.filterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => {
253-
this.metadata = filteredMetadata;
254-
this.doPick();
255-
}, (status: StatusObject) => {
256-
this.outputStatus(status, 'PROCESSED');
257-
});
232+
this.metadata = metadata;
233+
this.doPick();
258234
}
259235
sendMessageWithContext(context: MessageContext, message: Buffer): void {
260236
this.trace('write() called with message of length ' + message.length);
261-
this.writeFilterPending = true;
262-
this.filterStack.sendMessage(Promise.resolve({message: message, flags: context.flags})).then((filteredMessage) => {
263-
this.writeFilterPending = false;
264-
if (this.child) {
265-
this.child.sendMessageWithContext(context, filteredMessage.message);
266-
if (this.pendingHalfClose) {
267-
this.child.halfClose();
268-
}
269-
} else {
270-
this.pendingMessage = {context, message: filteredMessage.message};
271-
}
272-
}, (status: StatusObject) => {
273-
this.cancelWithStatus(status.code, status.details);
274-
})
237+
if (this.child) {
238+
this.child.sendMessageWithContext(context, message);
239+
} else {
240+
this.pendingMessage = {context, message};
241+
}
275242
}
276243
startRead(): void {
277244
this.trace('startRead called');
@@ -283,7 +250,7 @@ export class LoadBalancingCall implements Call {
283250
}
284251
halfClose(): void {
285252
this.trace('halfClose called');
286-
if (this.child && !this.writeFilterPending) {
253+
if (this.child) {
287254
this.child.halfClose();
288255
} else {
289256
this.pendingHalfClose = true;

packages/grpc-js/src/resolving-call.ts

Lines changed: 69 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import { CallCredentials } from "./call-credentials";
1919
import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from "./call-interface";
2020
import { LogVerbosity, Propagate, Status } from "./constants";
2121
import { Deadline, getDeadlineTimeoutString, getRelativeTimeout, minDeadline } from "./deadline";
22-
import { FilterStackFactory } from "./filter-stack";
22+
import { FilterStack, FilterStackFactory } from "./filter-stack";
2323
import { InternalChannel } from "./internal-channel";
2424
import { Metadata } from "./metadata";
2525
import * as logging from './logging';
@@ -33,12 +33,16 @@ export class ResolvingCall implements Call {
3333
private pendingMessage: {context: MessageContext, message: Buffer} | null = null;
3434
private pendingHalfClose = false;
3535
private ended = false;
36+
private readFilterPending = false;
37+
private writeFilterPending = false;
38+
private pendingChildStatus: StatusObject | null = null;
3639
private metadata: Metadata | null = null;
3740
private listener: InterceptingListener | null = null;
3841
private deadline: Deadline;
3942
private host: string;
4043
private statusWatchers: ((status: StatusObject) => void)[] = [];
4144
private deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0);
45+
private filterStack: FilterStack | null = null;
4246

4347
constructor(
4448
private readonly channel: InternalChannel,
@@ -96,14 +100,35 @@ export class ResolvingCall implements Call {
96100
private outputStatus(status: StatusObject) {
97101
if (!this.ended) {
98102
this.ended = true;
99-
this.trace('ended with status: code=' + status.code + ' details="' + status.details + '"');
100-
this.statusWatchers.forEach(watcher => watcher(status));
103+
if (!this.filterStack) {
104+
this.filterStack = this.filterStackFactory.createFilter();
105+
}
106+
const filteredStatus = this.filterStack.receiveTrailers(status);
107+
this.trace('ended with status: code=' + filteredStatus.code + ' details="' + filteredStatus.details + '"');
108+
this.statusWatchers.forEach(watcher => watcher(filteredStatus));
101109
process.nextTick(() => {
102-
this.listener?.onReceiveStatus(status);
110+
this.listener?.onReceiveStatus(filteredStatus);
103111
});
104112
}
105113
}
106114

115+
private sendMessageOnChild(context: MessageContext, message: Buffer): void {
116+
if (!this.child) {
117+
throw new Error('sendMessageonChild called with child not populated');
118+
}
119+
const child = this.child;
120+
this.writeFilterPending = true;
121+
this.filterStack!.sendMessage(Promise.resolve({message: message, flags: context.flags})).then((filteredMessage) => {
122+
this.writeFilterPending = false;
123+
child.sendMessageWithContext(context, filteredMessage.message);
124+
if (this.pendingHalfClose) {
125+
child.halfClose();
126+
}
127+
}, (status: StatusObject) => {
128+
this.cancelWithStatus(status.code, status.details);
129+
});
130+
}
131+
107132
getConfig(): void {
108133
if (this.ended) {
109134
return;
@@ -148,29 +173,46 @@ export class ResolvingCall implements Call {
148173
}
149174

150175
this.filterStackFactory.push(config.dynamicFilterFactories);
151-
152-
this.child = this.channel.createInnerCall(config, this.method, this.host, this.credentials, this.deadline);
153-
this.child.start(this.metadata, {
154-
onReceiveMetadata: metadata => {
155-
this.listener!.onReceiveMetadata(metadata);
156-
},
157-
onReceiveMessage: message => {
158-
this.listener!.onReceiveMessage(message);
159-
},
160-
onReceiveStatus: status => {
161-
this.outputStatus(status);
176+
this.filterStack = this.filterStackFactory.createFilter();
177+
this.filterStack.sendMetadata(Promise.resolve(this.metadata)).then(filteredMetadata => {
178+
this.child = this.channel.createInnerCall(config, this.method, this.host, this.credentials, this.deadline);
179+
this.child.start(filteredMetadata, {
180+
onReceiveMetadata: metadata => {
181+
this.listener!.onReceiveMetadata(this.filterStack!.receiveMetadata(metadata));
182+
},
183+
onReceiveMessage: message => {
184+
this.readFilterPending = true;
185+
this.filterStack!.receiveMessage(message).then(filteredMesssage => {
186+
this.readFilterPending = false;
187+
this.listener!.onReceiveMessage(filteredMesssage);
188+
if (this.pendingChildStatus) {
189+
this.outputStatus(this.pendingChildStatus);
190+
}
191+
}, (status: StatusObject) => {
192+
this.cancelWithStatus(status.code, status.details);
193+
});
194+
},
195+
onReceiveStatus: status => {
196+
if (this.readFilterPending) {
197+
this.pendingChildStatus = status;
198+
} else {
199+
this.outputStatus(status);
200+
}
201+
}
202+
});
203+
if (this.readPending) {
204+
this.child.startRead();
162205
}
163-
});
164-
if (this.readPending) {
165-
this.child.startRead();
166-
}
167-
if (this.pendingMessage) {
168-
this.child.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
169-
}
170-
if (this.pendingHalfClose) {
171-
this.child.halfClose();
172-
}
206+
if (this.pendingMessage) {
207+
this.sendMessageOnChild(this.pendingMessage.context, this.pendingMessage.message);
208+
} else if (this.pendingHalfClose) {
209+
this.child.halfClose();
210+
}
211+
}, (status: StatusObject) => {
212+
this.outputStatus(status);
213+
})
173214
}
215+
174216
reportResolverError(status: StatusObject) {
175217
if (this.metadata?.getOptions().waitForReady) {
176218
this.channel.queueCallForConfig(this);
@@ -195,7 +237,7 @@ export class ResolvingCall implements Call {
195237
sendMessageWithContext(context: MessageContext, message: Buffer): void {
196238
this.trace('write() called with message of length ' + message.length);
197239
if (this.child) {
198-
this.child.sendMessageWithContext(context, message);
240+
this.sendMessageOnChild(context, message);
199241
} else {
200242
this.pendingMessage = {context, message};
201243
}
@@ -210,7 +252,7 @@ export class ResolvingCall implements Call {
210252
}
211253
halfClose(): void {
212254
this.trace('halfClose called');
213-
if (this.child) {
255+
if (this.child && !this.writeFilterPending) {
214256
this.child.halfClose();
215257
} else {
216258
this.pendingHalfClose = true;

0 commit comments

Comments
 (0)