Skip to content

Commit 26c8c37

Browse files
committed
grpc-js: Handle filters in ResolvingCall instead of LoadBalancingCall
1 parent 29b58d0 commit 26c8c37

File tree

3 files changed

+83
-74
lines changed

3 files changed

+83
-74
lines changed

packages/grpc-js/src/internal-channel.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ export class InternalChannel {
402402
method +
403403
'"'
404404
);
405-
return new LoadBalancingCall(this, callConfig, method, host, credentials, deadline, this.filterStackFactory, callNumber);
405+
return new LoadBalancingCall(this, callConfig, method, host, credentials, deadline, callNumber);
406406
}
407407

408408
createInnerCall(

packages/grpc-js/src/load-balancing-call.ts

Lines changed: 13 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,11 @@ export interface StatusObjectWithProgress extends StatusObject {
4141
export class LoadBalancingCall implements Call {
4242
private child: SubchannelCall | null = null;
4343
private readPending = false;
44-
private writeFilterPending = false;
4544
private pendingMessage: {context: MessageContext, message: Buffer} | null = null;
4645
private pendingHalfClose = false;
47-
private readFilterPending = false;
4846
private pendingChildStatus: StatusObject | null = null;
4947
private ended = false;
5048
private serviceUrl: string;
51-
private filterStack: FilterStack;
5249
private metadata: Metadata | null = null;
5350
private listener: InterceptingListener | null = null;
5451
private onCallEnded: ((statusCode: Status) => void) | null = null;
@@ -59,11 +56,8 @@ export class LoadBalancingCall implements Call {
5956
private readonly host : string,
6057
private readonly credentials: CallCredentials,
6158
private readonly deadline: Deadline,
62-
filterStackFactory: FilterStackFactory,
6359
private readonly callNumber: number
6460
) {
65-
this.filterStack = filterStackFactory.createFilter();
66-
6761
const splitPath: string[] = this.methodName.split('/');
6862
let serviceName = '';
6963
/* The standard path format is "/{serviceName}/{methodName}", so if we split
@@ -90,8 +84,7 @@ export class LoadBalancingCall implements Call {
9084
if (!this.ended) {
9185
this.ended = true;
9286
this.trace('ended with status: code=' + status.code + ' details="' + status.details + '"');
93-
const filteredStatus = this.filterStack.receiveTrailers(status);
94-
const finalStatus = {...filteredStatus, progress};
87+
const finalStatus = {...status, progress};
9588
this.listener?.onReceiveStatus(finalStatus);
9689
this.onCallEnded?.(finalStatus.code);
9790
}
@@ -152,26 +145,13 @@ export class LoadBalancingCall implements Call {
152145
try {
153146
this.child = pickResult.subchannel!.getRealSubchannel().createCall(finalMetadata, this.host, this.methodName, {
154147
onReceiveMetadata: metadata => {
155-
this.listener!.onReceiveMetadata(this.filterStack.receiveMetadata(metadata));
148+
this.listener!.onReceiveMetadata(metadata);
156149
},
157150
onReceiveMessage: message => {
158-
this.readFilterPending = true;
159-
this.filterStack.receiveMessage(message).then(filteredMesssage => {
160-
this.readFilterPending = false;
161-
this.listener!.onReceiveMessage(filteredMesssage);
162-
if (this.pendingChildStatus) {
163-
this.outputStatus(this.pendingChildStatus, 'PROCESSED');
164-
}
165-
}, (status: StatusObject) => {
166-
this.cancelWithStatus(status.code, status.details);
167-
});
151+
this.listener!.onReceiveMessage(message);
168152
},
169153
onReceiveStatus: status => {
170-
if (this.readFilterPending) {
171-
this.pendingChildStatus = status;
172-
} else {
173-
this.outputStatus(status, 'PROCESSED');
174-
}
154+
this.outputStatus(status, 'PROCESSED');
175155
}
176156
});
177157
} catch (error) {
@@ -201,7 +181,7 @@ export class LoadBalancingCall implements Call {
201181
if (this.pendingMessage) {
202182
this.child.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
203183
}
204-
if (this.pendingHalfClose && !this.writeFilterPending) {
184+
if (this.pendingHalfClose) {
205185
this.child.halfClose();
206186
}
207187
}, (error: Error & { code: number }) => {
@@ -249,29 +229,16 @@ export class LoadBalancingCall implements Call {
249229
start(metadata: Metadata, listener: InterceptingListener): void {
250230
this.trace('start called');
251231
this.listener = listener;
252-
this.filterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => {
253-
this.metadata = filteredMetadata;
254-
this.doPick();
255-
}, (status: StatusObject) => {
256-
this.outputStatus(status, 'PROCESSED');
257-
});
232+
this.metadata = metadata;
233+
this.doPick();
258234
}
259235
sendMessageWithContext(context: MessageContext, message: Buffer): void {
260236
this.trace('write() called with message of length ' + message.length);
261-
this.writeFilterPending = true;
262-
this.filterStack.sendMessage(Promise.resolve({message: message, flags: context.flags})).then((filteredMessage) => {
263-
this.writeFilterPending = false;
264-
if (this.child) {
265-
this.child.sendMessageWithContext(context, filteredMessage.message);
266-
if (this.pendingHalfClose) {
267-
this.child.halfClose();
268-
}
269-
} else {
270-
this.pendingMessage = {context, message: filteredMessage.message};
271-
}
272-
}, (status: StatusObject) => {
273-
this.cancelWithStatus(status.code, status.details);
274-
})
237+
if (this.child) {
238+
this.child.sendMessageWithContext(context, message);
239+
} else {
240+
this.pendingMessage = {context, message};
241+
}
275242
}
276243
startRead(): void {
277244
this.trace('startRead called');
@@ -283,7 +250,7 @@ export class LoadBalancingCall implements Call {
283250
}
284251
halfClose(): void {
285252
this.trace('halfClose called');
286-
if (this.child && !this.writeFilterPending) {
253+
if (this.child) {
287254
this.child.halfClose();
288255
} else {
289256
this.pendingHalfClose = true;

packages/grpc-js/src/resolving-call.ts

Lines changed: 69 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import { CallCredentials } from "./call-credentials";
1919
import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from "./call-interface";
2020
import { LogVerbosity, Propagate, Status } from "./constants";
2121
import { Deadline, getDeadlineTimeoutString, getRelativeTimeout, minDeadline } from "./deadline";
22-
import { FilterStackFactory } from "./filter-stack";
22+
import { FilterStack, FilterStackFactory } from "./filter-stack";
2323
import { InternalChannel } from "./internal-channel";
2424
import { Metadata } from "./metadata";
2525
import * as logging from './logging';
@@ -33,12 +33,16 @@ export class ResolvingCall implements Call {
3333
private pendingMessage: {context: MessageContext, message: Buffer} | null = null;
3434
private pendingHalfClose = false;
3535
private ended = false;
36+
private readFilterPending = false;
37+
private writeFilterPending = false;
38+
private pendingChildStatus: StatusObject | null = null;
3639
private metadata: Metadata | null = null;
3740
private listener: InterceptingListener | null = null;
3841
private deadline: Deadline;
3942
private host: string;
4043
private statusWatchers: ((status: StatusObject) => void)[] = [];
4144
private deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0);
45+
private filterStack: FilterStack | null = null;
4246

4347
constructor(
4448
private readonly channel: InternalChannel,
@@ -96,14 +100,35 @@ export class ResolvingCall implements Call {
96100
private outputStatus(status: StatusObject) {
97101
if (!this.ended) {
98102
this.ended = true;
99-
this.trace('ended with status: code=' + status.code + ' details="' + status.details + '"');
100-
this.statusWatchers.forEach(watcher => watcher(status));
103+
if (!this.filterStack) {
104+
this.filterStack = this.filterStackFactory.createFilter();
105+
}
106+
const filteredStatus = this.filterStack.receiveTrailers(status);
107+
this.trace('ended with status: code=' + filteredStatus.code + ' details="' + filteredStatus.details + '"');
108+
this.statusWatchers.forEach(watcher => watcher(filteredStatus));
101109
process.nextTick(() => {
102-
this.listener?.onReceiveStatus(status);
110+
this.listener?.onReceiveStatus(filteredStatus);
103111
});
104112
}
105113
}
106114

115+
private sendMessageOnChild(context: MessageContext, message: Buffer): void {
116+
if (!this.child) {
117+
throw new Error('sendMessageonChild called with child not populated');
118+
}
119+
const child = this.child;
120+
this.writeFilterPending = true;
121+
this.filterStack!.sendMessage(Promise.resolve({message: message, flags: context.flags})).then((filteredMessage) => {
122+
this.writeFilterPending = false;
123+
child.sendMessageWithContext(context, filteredMessage.message);
124+
if (this.pendingHalfClose) {
125+
child.halfClose();
126+
}
127+
}, (status: StatusObject) => {
128+
this.cancelWithStatus(status.code, status.details);
129+
});
130+
}
131+
107132
getConfig(): void {
108133
if (this.ended) {
109134
return;
@@ -149,29 +174,46 @@ export class ResolvingCall implements Call {
149174
}
150175

151176
this.filterStackFactory.push(config.dynamicFilterFactories);
152-
153-
this.child = this.channel.createInnerCall(config, this.method, this.host, this.credentials, this.deadline);
154-
this.child.start(this.metadata, {
155-
onReceiveMetadata: metadata => {
156-
this.listener!.onReceiveMetadata(metadata);
157-
},
158-
onReceiveMessage: message => {
159-
this.listener!.onReceiveMessage(message);
160-
},
161-
onReceiveStatus: status => {
162-
this.outputStatus(status);
177+
this.filterStack = this.filterStackFactory.createFilter();
178+
this.filterStack.sendMetadata(Promise.resolve(this.metadata)).then(filteredMetadata => {
179+
this.child = this.channel.createInnerCall(config, this.method, this.host, this.credentials, this.deadline);
180+
this.child.start(filteredMetadata, {
181+
onReceiveMetadata: metadata => {
182+
this.listener!.onReceiveMetadata(this.filterStack!.receiveMetadata(metadata));
183+
},
184+
onReceiveMessage: message => {
185+
this.readFilterPending = true;
186+
this.filterStack!.receiveMessage(message).then(filteredMesssage => {
187+
this.readFilterPending = false;
188+
this.listener!.onReceiveMessage(filteredMesssage);
189+
if (this.pendingChildStatus) {
190+
this.outputStatus(this.pendingChildStatus);
191+
}
192+
}, (status: StatusObject) => {
193+
this.cancelWithStatus(status.code, status.details);
194+
});
195+
},
196+
onReceiveStatus: status => {
197+
if (this.readFilterPending) {
198+
this.pendingChildStatus = status;
199+
} else {
200+
this.outputStatus(status);
201+
}
202+
}
203+
});
204+
if (this.readPending) {
205+
this.child.startRead();
163206
}
164-
});
165-
if (this.readPending) {
166-
this.child.startRead();
167-
}
168-
if (this.pendingMessage) {
169-
this.child.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
170-
}
171-
if (this.pendingHalfClose) {
172-
this.child.halfClose();
173-
}
207+
if (this.pendingMessage) {
208+
this.sendMessageOnChild(this.pendingMessage.context, this.pendingMessage.message);
209+
} else if (this.pendingHalfClose) {
210+
this.child.halfClose();
211+
}
212+
}, (status: StatusObject) => {
213+
this.outputStatus(status);
214+
})
174215
}
216+
175217
reportResolverError(status: StatusObject) {
176218
if (this.metadata?.getOptions().waitForReady) {
177219
this.channel.queueCallForConfig(this);
@@ -196,7 +238,7 @@ export class ResolvingCall implements Call {
196238
sendMessageWithContext(context: MessageContext, message: Buffer): void {
197239
this.trace('write() called with message of length ' + message.length);
198240
if (this.child) {
199-
this.child.sendMessageWithContext(context, message);
241+
this.sendMessageOnChild(context, message);
200242
} else {
201243
this.pendingMessage = {context, message};
202244
}
@@ -211,7 +253,7 @@ export class ResolvingCall implements Call {
211253
}
212254
halfClose(): void {
213255
this.trace('halfClose called');
214-
if (this.child) {
256+
if (this.child && !this.writeFilterPending) {
215257
this.child.halfClose();
216258
} else {
217259
this.pendingHalfClose = true;

0 commit comments

Comments
 (0)