Skip to content

Commit 3f527fb

Browse files
authored
Merge pull request #2675 from murgatroid99/grpc-js_connection_injection
grpc-js: Add Server#createConnectionInjector API
2 parents 9886ee2 + 321b660 commit 3f527fb

File tree

2 files changed

+138
-0
lines changed

2 files changed

+138
-0
lines changed

packages/grpc-js/src/server.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ import { ServerInterceptingCallInterface, ServerInterceptor, getServerIntercepti
7474
import { PartialStatusObject } from './call-interface';
7575
import { CallEventTracker } from './transport';
7676
import { Socket } from 'net';
77+
import { Duplex } from 'stream';
7778

7879
const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31);
7980
const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
@@ -225,6 +226,12 @@ export interface ServerOptions extends ChannelOptions {
225226
interceptors?: ServerInterceptor[]
226227
}
227228

229+
export interface ConnectionInjector {
230+
injectConnection(connection: Duplex): void;
231+
drain(graceTimeMs: number): void;
232+
destroy(): void;
233+
}
234+
228235
export class Server {
229236
private boundPorts: Map<string, BoundPort>= new Map();
230237
private http2Servers: Map<AnyHttp2Server, Http2ServerInfo> = new Map();
@@ -808,6 +815,70 @@ export class Server {
808815
}
809816
}
810817

818+
private registerInjectorToChannelz() {
819+
return registerChannelzSocket(
820+
'injector',
821+
() => {
822+
return {
823+
localAddress: null,
824+
remoteAddress: null,
825+
security: null,
826+
remoteName: null,
827+
streamsStarted: 0,
828+
streamsSucceeded: 0,
829+
streamsFailed: 0,
830+
messagesSent: 0,
831+
messagesReceived: 0,
832+
keepAlivesSent: 0,
833+
lastLocalStreamCreatedTimestamp: null,
834+
lastRemoteStreamCreatedTimestamp: null,
835+
lastMessageSentTimestamp: null,
836+
lastMessageReceivedTimestamp: null,
837+
localFlowControlWindow: null,
838+
remoteFlowControlWindow: null,
839+
};
840+
},
841+
this.channelzEnabled
842+
);
843+
}
844+
845+
createConnectionInjector(credentials: ServerCredentials): ConnectionInjector {
846+
if (credentials === null || !(credentials instanceof ServerCredentials)) {
847+
throw new TypeError('creds must be a ServerCredentials object');
848+
}
849+
const server = this.createHttp2Server(credentials);
850+
const channelzRef = this.registerInjectorToChannelz();
851+
if (this.channelzEnabled) {
852+
this.listenerChildrenTracker.refChild(channelzRef);
853+
}
854+
const sessionsSet: Set<http2.ServerHttp2Session> = new Set();
855+
this.http2Servers.set(server, {
856+
channelzRef: channelzRef,
857+
sessions: sessionsSet
858+
});
859+
return {
860+
injectConnection: (connection: Duplex) => {
861+
server.emit('connection', connection);
862+
},
863+
drain: (graceTimeMs: number) => {
864+
for (const session of sessionsSet) {
865+
this.closeSession(session);
866+
}
867+
setTimeout(() => {
868+
for (const session of sessionsSet) {
869+
session.destroy(http2.constants.NGHTTP2_CANCEL as any);
870+
}
871+
}, graceTimeMs).unref?.();
872+
},
873+
destroy: () => {
874+
this.closeServer(server)
875+
for (const session of sessionsSet) {
876+
this.closeSession(session);
877+
}
878+
}
879+
};
880+
}
881+
811882
private closeServer(server: AnyHttp2Server, callback?: () => void) {
812883
this.trace('Closing server with address ' + JSON.stringify(server.address()));
813884
const serverInfo = this.http2Servers.get(server);

packages/grpc-js/test/test-server.ts

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import * as assert from 'assert';
2121
import * as fs from 'fs';
2222
import * as http2 from 'http2';
2323
import * as path from 'path';
24+
import * as net from 'net';
2425
import * as protoLoader from '@grpc/proto-loader';
2526

2627
import * as grpc from '../src';
@@ -905,6 +906,72 @@ describe('Echo service', () => {
905906
});
906907
});
907908

909+
describe('Connection injector', () => {
910+
let tcpServer: net.Server;
911+
let server: Server;
912+
let client: ServiceClient;
913+
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
914+
const echoService = loadProtoFile(protoFile)
915+
.EchoService as ServiceClientConstructor;
916+
917+
const serviceImplementation = {
918+
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
919+
callback(null, call.request);
920+
},
921+
echoBidiStream(call: ServerDuplexStream<any, any>) {
922+
call.on('data', data => {
923+
call.write(data);
924+
});
925+
call.on('end', () => {
926+
call.end();
927+
});
928+
},
929+
};
930+
931+
before(done => {
932+
server = new Server();
933+
const creds = ServerCredentials.createSsl(
934+
null,
935+
[{ private_key: key, cert_chain: cert }]
936+
);
937+
const connectionInjector = server.createConnectionInjector(creds);
938+
tcpServer = net.createServer(socket => {
939+
connectionInjector.injectConnection(socket);
940+
});
941+
server.addService(echoService.service, serviceImplementation);
942+
tcpServer.listen(0, 'localhost', () => {
943+
const port = (tcpServer.address() as net.AddressInfo).port;
944+
client = new echoService(
945+
`localhost:${port}`,
946+
grpc.credentials.createSsl(ca),
947+
{
948+
'grpc.ssl_target_name_override': 'foo.test.google.fr',
949+
'grpc.default_authority': 'foo.test.google.fr'
950+
}
951+
);
952+
done();
953+
});
954+
});
955+
956+
after(done => {
957+
client.close();
958+
tcpServer.close();
959+
server.tryShutdown(done);
960+
});
961+
962+
it('should respond to a request', done => {
963+
client.echo(
964+
{ value: 'test value', value2: 3 },
965+
(error: ServiceError, response: any) => {
966+
assert.ifError(error);
967+
assert.deepStrictEqual(response, { value: 'test value', value2: 3 });
968+
done();
969+
}
970+
);
971+
});
972+
973+
})
974+
908975
describe('Generic client and server', () => {
909976
function toString(val: any) {
910977
return val.toString();

0 commit comments

Comments
 (0)