Skip to content

Commit d60c4ea

Browse files
committed
Add channelz tests and fix some bugs
1 parent acd9913 commit d60c4ea

File tree

9 files changed

+421
-43
lines changed

9 files changed

+421
-43
lines changed

packages/grpc-js/src/call-stream.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import { FilterStackFactory, FilterStack } from './filter-stack';
2525
import { Metadata } from './metadata';
2626
import { StreamDecoder } from './stream-decoder';
2727
import { ChannelImplementation } from './channel';
28-
import { Subchannel } from './subchannel';
28+
import { SubchannelCallStatsTracker, Subchannel } from './subchannel';
2929
import * as logging from './logging';
3030
import { LogVerbosity } from './constants';
3131
import { ServerSurfaceCall } from './server-call';
@@ -252,6 +252,8 @@ export class Http2CallStream implements Call {
252252
private statusWatchers: ((status: StatusObject) => void)[] = [];
253253
private streamEndWatchers: ((success: boolean) => void)[] = [];
254254

255+
private callStatsTracker: SubchannelCallStatsTracker | null = null;
256+
255257
constructor(
256258
private readonly methodName: string,
257259
private readonly channel: ChannelImplementation,
@@ -468,10 +470,16 @@ export class Http2CallStream implements Call {
468470
this.endCall(status);
469471
}
470472

473+
private writeMessageToStream(message: Buffer, callback: WriteCallback) {
474+
this.callStatsTracker?.addMessageSent();
475+
this.http2Stream!.write(message, callback);
476+
}
477+
471478
attachHttp2Stream(
472479
stream: http2.ClientHttp2Stream,
473480
subchannel: Subchannel,
474-
extraFilters: FilterFactory<Filter>[]
481+
extraFilters: FilterFactory<Filter>[],
482+
callStatsTracker: SubchannelCallStatsTracker
475483
): void {
476484
this.filterStack.push(
477485
extraFilters.map((filterFactory) => filterFactory.createFilter(this))
@@ -484,6 +492,7 @@ export class Http2CallStream implements Call {
484492
);
485493
this.http2Stream = stream;
486494
this.subchannel = subchannel;
495+
this.callStatsTracker = callStatsTracker;
487496
subchannel.addDisconnectListener(this.disconnectListener);
488497
subchannel.callRef();
489498
stream.on('response', (headers, flags) => {
@@ -549,6 +558,7 @@ export class Http2CallStream implements Call {
549558

550559
for (const message of messages) {
551560
this.trace('parsed message of length ' + message.length);
561+
this.callStatsTracker!.addMessageReceived();
552562
this.tryPush(message);
553563
}
554564
});
@@ -666,7 +676,7 @@ export class Http2CallStream implements Call {
666676
this.pendingWrite.length +
667677
' (deferred)'
668678
);
669-
stream.write(this.pendingWrite, this.pendingWriteCallback);
679+
this.writeMessageToStream(this.pendingWrite, this.pendingWriteCallback);
670680
}
671681
this.maybeCloseWrites();
672682
}
@@ -802,7 +812,7 @@ export class Http2CallStream implements Call {
802812
this.pendingWriteCallback = cb;
803813
} else {
804814
this.trace('sending data chunk of length ' + message.message.length);
805-
this.http2Stream.write(message.message, cb);
815+
this.writeMessageToStream(message.message, cb);
806816
this.maybeCloseWrites();
807817
}
808818
}, this.handleFilterError.bind(this));

packages/grpc-js/src/channel.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ export interface Channel {
103103
deadline: Date | number,
104104
callback: (error?: Error) => void
105105
): void;
106+
/**
107+
* Get the channelz reference object for this channel. A request to the
108+
* channelz service for the id in this object will provide information
109+
* about this channel.
110+
*/
111+
getChannelzRef(): ChannelRef;
106112
/**
107113
* Create a call object. Call is an opaque type that is used by the Client
108114
* class. This function is called by the gRPC library when starting a
@@ -243,7 +249,7 @@ export class ChannelImplementation implements Channel {
243249
Object.assign({}, this.options, subchannelArgs),
244250
this.credentials
245251
);
246-
this.channelzTrace.addTrace('CT_INFO', 'Created or got existing subchannel', subchannel.getChannelzRef());
252+
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
247253
return subchannel;
248254
},
249255
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
@@ -677,6 +683,10 @@ export class ChannelImplementation implements Channel {
677683
this.connectivityStateWatchers.push(watcherObject);
678684
}
679685

686+
getChannelzRef() {
687+
return this.channelzRef;
688+
}
689+
680690
createCall(
681691
method: string,
682692
deadline: Deadline,

packages/grpc-js/src/channelz.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import { ChannelzDefinition, ChannelzHandlers } from "./generated/grpc/channelz/
5151
import { ProtoGrpcType as ChannelzProtoGrpcType } from "./generated/channelz";
5252
import type { loadSync } from '@grpc/proto-loader';
5353
import { registerAdminService } from "./admin";
54+
import { loadPackageDefinition } from "./make-client";
5455

5556
export type TraceSeverity = 'CT_UNKNOWN' | 'CT_INFO' | 'CT_WARNING' | 'CT_ERROR';
5657

@@ -455,9 +456,10 @@ function dateToProtoTimestamp(date?: Date | null): Timestamp | null {
455456
if (!date) {
456457
return null;
457458
}
459+
const millisSinceEpoch = date.getTime();
458460
return {
459-
seconds: date.getSeconds(),
460-
nanos: date.getMilliseconds() * 1_000_000
461+
seconds: (millisSinceEpoch / 1000) | 0,
462+
nanos: (millisSinceEpoch % 1000) * 1_000_000
461463
}
462464
}
463465

@@ -664,7 +666,10 @@ function GetServerSockets(call: ServerUnaryCall<GetServerSocketsRequest__Output,
664666
const startId = Number.parseInt(call.request.start_socket_id);
665667
const maxResults = Number.parseInt(call.request.max_results);
666668
const resolvedInfo = serverEntry.getInfo();
667-
const allSockets = resolvedInfo.listenerChildren.sockets.concat(resolvedInfo.sessionChildren.sockets).sort((ref1, ref2) => ref1.id - ref2.id);
669+
// If we wanted to include listener sockets in the result, this line would
670+
// instead say
671+
// const allSockets = resolvedInfo.listenerChildren.sockets.concat(resolvedInfo.sessionChildren.sockets).sort((ref1, ref2) => ref1.id - ref2.id);
672+
const allSockets = resolvedInfo.sessionChildren.sockets.sort((ref1, ref2) => ref1.id - ref2.id);
668673
const resultList: SocketRefMessage[] = [];
669674
let i = 0;
670675
for (; i < allSockets.length; i++) {
@@ -709,10 +714,11 @@ export function getChannelzServiceDefinition(): ChannelzDefinition {
709714
defaults: true,
710715
oneofs: true,
711716
includeDirs: [
712-
'../../proto'
717+
`${__dirname}/../../proto`
713718
]
714-
}) as unknown as ChannelzProtoGrpcType;
715-
loadedChannelzDefinition = loadedProto.grpc.channelz.v1.Channelz.service;
719+
});
720+
const channelzGrpcObject = loadPackageDefinition(loadedProto) as unknown as ChannelzProtoGrpcType;
721+
loadedChannelzDefinition = channelzGrpcObject.grpc.channelz.v1.Channelz.service;
716722
return loadedChannelzDefinition;
717723
}
718724

packages/grpc-js/src/server-call.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,6 @@ export class Http2ServerCallStream<
570570
const response = this.serializeMessage(value!);
571571

572572
this.write(response);
573-
this.emit('sendMessage');
574573
this.sendStatus({ code: Status.OK, details: 'OK', metadata });
575574
} catch (err) {
576575
err.code = Status.INTERNAL;

packages/grpc-js/src/server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ export class Server {
188188
const peerCertificate = tlsSocket.getPeerCertificate();
189189
tlsInfo = {
190190
cipherSuiteStandardName: cipherInfo.standardName ?? null,
191-
cipherSuiteOtherName: cipherInfo.standardName ? cipherInfo.name: null,
191+
cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
192192
localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null,
193193
remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null
194194
};
@@ -424,6 +424,7 @@ export class Server {
424424
remoteFlowControlWindow: null
425425
};
426426
});
427+
this.listenerChildrenTracker.refChild(channelzRef);
427428
this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
428429
trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
429430
resolve('port' in boundSubchannelAddress ? boundSubchannelAddress.port : portNum);
@@ -491,6 +492,7 @@ export class Server {
491492
remoteFlowControlWindow: null
492493
};
493494
});
495+
this.listenerChildrenTracker.refChild(channelzRef);
494496
this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
495497
trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
496498
resolve(
@@ -676,6 +678,10 @@ export class Server {
676678
throw new Error('Not yet implemented');
677679
}
678680

681+
getChannelzRef() {
682+
return this.channelzRef;
683+
}
684+
679685
private _setupHandlers(
680686
http2Server: http2.Http2Server | http2.Http2SecureServer
681687
): void {

packages/grpc-js/src/subchannel.ts

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ export type ConnectivityStateListener = (
6868
newState: ConnectivityState
6969
) => void;
7070

71+
export interface SubchannelCallStatsTracker {
72+
addMessageSent(): void;
73+
addMessageReceived(): void;
74+
}
75+
7176
const {
7277
HTTP2_HEADER_AUTHORITY,
7378
HTTP2_HEADER_CONTENT_TYPE,
@@ -178,33 +183,6 @@ export class Subchannel {
178183
private messagesReceived = 0;
179184
private lastMessageSentTimestamp: Date | null = null;
180185
private lastMessageReceivedTimestamp: Date | null = null;
181-
private MessageCountFilter = class extends BaseFilter implements Filter {
182-
private session: http2.ClientHttp2Session;
183-
constructor(private parent: Subchannel) {
184-
super();
185-
this.session = parent.session!;
186-
}
187-
sendMessage(message: Promise<WriteObject>): Promise<WriteObject> {
188-
if (this.parent.session === this.session) {
189-
this.parent.messagesSent += 1;
190-
this.parent.lastMessageSentTimestamp = new Date();
191-
}
192-
return message;
193-
}
194-
receiveMessage(message: Promise<Buffer>): Promise<Buffer> {
195-
if (this.parent.session === this.session) {
196-
this.parent.messagesReceived += 1;
197-
this.parent.lastMessageReceivedTimestamp = new Date();
198-
}
199-
return message;
200-
}
201-
};
202-
private MessageCountFilterFactory = class implements FilterFactory<Filter> {
203-
constructor(private parent: Subchannel) {}
204-
createFilter(callStream: Call): Filter {
205-
return new this.parent.MessageCountFilter(this.parent);
206-
}
207-
}
208186

209187
/**
210188
* A class representing a connection to a single backend.
@@ -848,8 +826,15 @@ export class Subchannel {
848826
}
849827
}
850828
});
851-
extraFilterFactories.push(new this.MessageCountFilterFactory(this));
852-
callStream.attachHttp2Stream(http2Stream, this, extraFilterFactories);
829+
callStream.attachHttp2Stream(http2Stream, this, extraFilterFactories, {
830+
addMessageSent: () => {
831+
this.messagesSent += 1;
832+
this.lastMessageSentTimestamp = new Date();
833+
},
834+
addMessageReceived: () => {
835+
this.messagesReceived += 1;
836+
}
837+
});
853838
}
854839

855840
/**

0 commit comments

Comments
 (0)