Skip to content

Commit f04db5b

Browse files
authored
Merge pull request #2616 from murgatroid99/grpc-js_server_drain
grpc-js: Implement `Server#drain`
2 parents 58b13ac + 3a16187 commit f04db5b

File tree

2 files changed

+102
-0
lines changed

2 files changed

+102
-0
lines changed

packages/grpc-js/src/server.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -853,6 +853,50 @@ export class Server {
853853
}
854854
}
855855

856+
/**
857+
* Gracefully close all connections associated with a previously bound port.
858+
* After the grace time, forcefully close all remaining open connections.
859+
*
860+
* If port 0 was bound, only the actual bound port can be
861+
* drained. For example, if bindAsync was called with "localhost:0" and the
862+
* bound port result was 54321, it can be drained as "localhost:54321".
863+
* @param port
864+
* @param graceTimeMs
865+
* @returns
866+
*/
867+
drain(port: string, graceTimeMs: number): void {
868+
this.trace('drain port=' + port + ' graceTimeMs=' + graceTimeMs);
869+
const portUri = this.normalizePort(port);
870+
const splitPort = splitHostPort(portUri.path);
871+
if (splitPort?.port === 0) {
872+
throw new Error('Cannot drain port 0');
873+
}
874+
const boundPortObject = this.boundPorts.get(uriToString(portUri));
875+
if (!boundPortObject) {
876+
return;
877+
}
878+
const allSessions: Set<http2.Http2Session> = new Set();
879+
for (const http2Server of boundPortObject.listeningServers) {
880+
const serverEntry = this.http2Servers.get(http2Server);
881+
if (!serverEntry) {
882+
continue;
883+
}
884+
for (const session of serverEntry.sessions) {
885+
allSessions.add(session);
886+
this.closeSession(session, () => {
887+
allSessions.delete(session);
888+
});
889+
}
890+
}
891+
/* After the grace time ends, send another goaway to all remaining sessions
892+
* with the CANCEL code. */
893+
setTimeout(() => {
894+
for (const session of allSessions) {
895+
session.destroy(http2.constants.NGHTTP2_CANCEL as any);
896+
}
897+
}, graceTimeMs).unref?.();
898+
}
899+
856900
forceShutdown(): void {
857901
for (const boundPortObject of this.boundPorts.values()) {
858902
boundPortObject.cancelled = true;

packages/grpc-js/test/test-server.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,64 @@ describe('Server', () => {
228228
});
229229
});
230230

231+
describe.only('drain', () => {
232+
let client: ServiceClient;
233+
let portNumber: number;
234+
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
235+
const echoService = loadProtoFile(protoFile)
236+
.EchoService as ServiceClientConstructor;
237+
238+
const serviceImplementation = {
239+
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
240+
callback(null, call.request);
241+
},
242+
echoBidiStream(call: ServerDuplexStream<any, any>) {
243+
call.on('data', data => {
244+
call.write(data);
245+
});
246+
call.on('end', () => {
247+
call.end();
248+
});
249+
},
250+
};
251+
252+
beforeEach(done => {
253+
server.addService(echoService.service, serviceImplementation);
254+
255+
server.bindAsync(
256+
'localhost:0',
257+
ServerCredentials.createInsecure(),
258+
(err, port) => {
259+
assert.ifError(err);
260+
portNumber = port;
261+
client = new echoService(
262+
`localhost:${port}`,
263+
grpc.credentials.createInsecure()
264+
);
265+
server.start();
266+
done();
267+
}
268+
);
269+
});
270+
271+
afterEach(done => {
272+
client.close();
273+
server.tryShutdown(done);
274+
});
275+
276+
it('Should cancel open calls after the grace period ends', done => {
277+
const call = client.echoBidiStream();
278+
call.on('error', (error: ServiceError) => {
279+
assert.strictEqual(error.code, grpc.status.CANCELLED);
280+
done();
281+
});
282+
call.on('data', () => {
283+
server.drain(`localhost:${portNumber!}`, 100);
284+
});
285+
call.write({value: 'abc'});
286+
});
287+
});
288+
231289
describe('start', () => {
232290
let server: Server;
233291

0 commit comments

Comments
 (0)