Skip to content

Commit 9066afb

Browse files
authored
Add grpc metrics (#37)
1 parent 3c33589 commit 9066afb

File tree

4 files changed

+418
-3
lines changed

4 files changed

+418
-3
lines changed

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@
181181
"yazl": "^2.5.1",
182182
"@grpc/grpc-js": "^1.8.8",
183183
"long":"^5.2.1",
184-
"protobufjs": "^7.2.2"
184+
"protobufjs": "^7.2.2",
185+
"prom-client": "^14.1.1"
185186
}
186187
}

src/metrics.ts

Lines changed: 377 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,377 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) Gitpod. All rights reserved.
3+
* Licensed under the MIT License. See License.txt in the project root for license information.
4+
*--------------------------------------------------------------------------------------------*/
5+
6+
import * as vscode from 'vscode';
7+
import * as grpc from '@grpc/grpc-js';
8+
import { Registry, Counter, Histogram, metric } from 'prom-client';
9+
import { Code, connectErrorFromReason, Interceptor, StreamRequest, UnaryRequest } from '@bufbuild/connect-node';
10+
import { MethodKind } from '@bufbuild/protobuf';
11+
import { codeToString, StreamResponse, UnaryResponse } from '@bufbuild/connect-core';
12+
13+
export type GrpcMethodType = 'unary' | 'client_stream' | 'server_stream' | 'bidi_stream';
14+
15+
export interface IGrpcCallMetricsLabels {
16+
service: string;
17+
method: string;
18+
type: GrpcMethodType;
19+
}
20+
21+
export interface IGrpcCallMetricsLabelsWithCode extends IGrpcCallMetricsLabels {
22+
code: string;
23+
}
24+
25+
const register = new Registry();
26+
27+
class PrometheusClientCallMetrics {
28+
readonly startedCounter: Counter<string>;
29+
readonly sentCounter: Counter<string>;
30+
readonly receivedCounter: Counter<string>;
31+
readonly handledCounter: Counter<string>;
32+
readonly handledSecondsHistogram: Histogram<string>;
33+
34+
constructor() {
35+
this.startedCounter = new Counter({
36+
name: 'grpc_client_started_total',
37+
help: 'Total number of RPCs started on the client.',
38+
labelNames: ['grpc_service', 'grpc_method', 'grpc_type'],
39+
registers: [register],
40+
});
41+
this.sentCounter = new Counter({
42+
name: 'grpc_client_msg_sent_total',
43+
help: ' Total number of gRPC stream messages sent by the client.',
44+
labelNames: ['grpc_service', 'grpc_method', 'grpc_type'],
45+
registers: [register],
46+
});
47+
this.receivedCounter = new Counter({
48+
name: 'grpc_client_msg_received_total',
49+
help: 'Total number of RPC stream messages received by the client.',
50+
labelNames: ['grpc_service', 'grpc_method', 'grpc_type'],
51+
registers: [register],
52+
});
53+
this.handledCounter = new Counter({
54+
name: 'grpc_client_handled_total',
55+
help: 'Total number of RPCs completed by the client, regardless of success or failure.',
56+
labelNames: ['grpc_service', 'grpc_method', 'grpc_type', 'grpc_code'],
57+
registers: [register],
58+
});
59+
this.handledSecondsHistogram = new Histogram({
60+
name: 'grpc_client_handling_seconds',
61+
help: 'Histogram of response latency (seconds) of the gRPC until it is finished by the application.',
62+
labelNames: ['grpc_service', 'grpc_method', 'grpc_type', 'grpc_code'],
63+
buckets: [0.1, 0.2, 0.5, 1, 2, 5, 10], // it should be aligned with https://github.com/gitpod-io/gitpod/blob/84ed1a0672d91446ba33cb7b504cfada769271a8/install/installer/pkg/components/ide-metrics/configmap.go#L315
64+
registers: [register],
65+
});
66+
}
67+
68+
started(labels: IGrpcCallMetricsLabels): void {
69+
this.startedCounter.inc({
70+
grpc_service: labels.service,
71+
grpc_method: labels.method,
72+
grpc_type: labels.type,
73+
});
74+
}
75+
76+
sent(labels: IGrpcCallMetricsLabels): void {
77+
this.sentCounter.inc({
78+
grpc_service: labels.service,
79+
grpc_method: labels.method,
80+
grpc_type: labels.type,
81+
});
82+
}
83+
84+
received(labels: IGrpcCallMetricsLabels): void {
85+
this.receivedCounter.inc({
86+
grpc_service: labels.service,
87+
grpc_method: labels.method,
88+
grpc_type: labels.type,
89+
});
90+
}
91+
92+
handled(labels: IGrpcCallMetricsLabelsWithCode): void {
93+
this.handledCounter.inc({
94+
grpc_service: labels.service,
95+
grpc_method: labels.method,
96+
grpc_type: labels.type,
97+
grpc_code: labels.code,
98+
});
99+
}
100+
101+
startHandleTimer(
102+
labels: IGrpcCallMetricsLabels,
103+
): (labels?: Partial<Record<string, string | number>> | undefined) => number {
104+
return this.handledSecondsHistogram.startTimer({
105+
grpc_service: labels.service,
106+
grpc_method: labels.method,
107+
grpc_type: labels.type,
108+
});
109+
}
110+
}
111+
112+
const GRPCMetrics = new PrometheusClientCallMetrics();
113+
114+
export function getConnectMetricsInterceptor(): Interceptor {
115+
const getLabels = (req: UnaryRequest | StreamRequest): IGrpcCallMetricsLabels => {
116+
let type: GrpcMethodType;
117+
switch (req.method.kind) {
118+
case MethodKind.Unary: type = 'unary'; break;
119+
case MethodKind.ServerStreaming: type = 'server_stream'; break;
120+
case MethodKind.ClientStreaming: type = 'client_stream'; break;
121+
case MethodKind.BiDiStreaming: type = 'bidi_stream'; break;
122+
}
123+
return {
124+
type,
125+
service: req.service.typeName,
126+
method: req.method.name,
127+
};
128+
};
129+
130+
return (next) => async (req) => {
131+
async function* incrementStreamMessagesCounter<T>(iterable: AsyncIterable<T>, callback: () => void, handleMetrics: boolean): AsyncIterable<T> {
132+
let status: Code | undefined;
133+
try {
134+
for await (const item of iterable) {
135+
callback();
136+
yield item;
137+
}
138+
} catch (e) {
139+
const err = connectErrorFromReason(e, Code.Internal);
140+
status = err.code;
141+
throw e;
142+
} finally {
143+
if (handleMetrics && !settled) {
144+
stopTimer({ grpc_code: status ? codeToString(status).toUpperCase() : 'OK' });
145+
GRPCMetrics.handled({ ...labels, code: status ? codeToString(status).toUpperCase() : 'OK' });
146+
}
147+
}
148+
}
149+
150+
const labels = getLabels(req);
151+
GRPCMetrics.started(labels);
152+
const stopTimer = GRPCMetrics.startHandleTimer(labels);
153+
154+
let settled = false;
155+
let status: Code | undefined;
156+
try {
157+
let request: UnaryRequest | StreamRequest;
158+
if (!req.stream) {
159+
request = req;
160+
} else {
161+
request = {
162+
...req,
163+
message: incrementStreamMessagesCounter(req.message, GRPCMetrics.sent.bind(GRPCMetrics, labels), false)
164+
};
165+
}
166+
167+
const res = await next(request);
168+
169+
let response: UnaryResponse | StreamResponse;
170+
if (!res.stream) {
171+
response = res;
172+
settled = true;
173+
} else {
174+
response = {
175+
...res,
176+
message: incrementStreamMessagesCounter(res.message, GRPCMetrics.received.bind(GRPCMetrics, labels), true)
177+
};
178+
}
179+
180+
return response;
181+
} catch (e) {
182+
settled = true;
183+
const err = connectErrorFromReason(e, Code.Internal);
184+
status = err.code;
185+
throw err;
186+
} finally {
187+
if (settled) {
188+
stopTimer({ grpc_code: status ? codeToString(status).toUpperCase() : 'OK' });
189+
GRPCMetrics.handled({ ...labels, code: status ? codeToString(status).toUpperCase() : 'OK' });
190+
}
191+
}
192+
};
193+
}
194+
195+
export function getGrpcMetricsInterceptor(): grpc.Interceptor {
196+
const getLabels = (path: string, requestStream: boolean, responseStream: boolean): IGrpcCallMetricsLabels => {
197+
const method = path.substring(path.lastIndexOf('/') + 1);
198+
const service = path.substring(1, path.length - method.length - 1);
199+
let type: GrpcMethodType;
200+
if (requestStream) {
201+
if (responseStream) {
202+
type = 'bidi_stream';
203+
} else {
204+
type = 'client_stream';
205+
}
206+
} else {
207+
if (responseStream) {
208+
type = 'server_stream';
209+
} else {
210+
type = 'unary';
211+
}
212+
}
213+
return {
214+
type,
215+
service,
216+
method,
217+
};
218+
};
219+
220+
return (options, nextCall): grpc.InterceptingCall => {
221+
const methodDef = options.method_definition;
222+
const labels = getLabels(methodDef.path, methodDef.requestStream, methodDef.responseStream);
223+
const requester = new grpc.RequesterBuilder()
224+
.withStart((metadata, _listener, next) => {
225+
const newListener = new grpc.ListenerBuilder()
226+
.withOnReceiveStatus((status, next) => {
227+
GRPCMetrics.handled({
228+
...labels,
229+
code: grpc.status[status.code],
230+
});
231+
stopTimer({ grpc_code: grpc.status[status.code] });
232+
next(status);
233+
})
234+
.withOnReceiveMessage((message, next) => {
235+
GRPCMetrics.received(labels);
236+
next(message);
237+
})
238+
.build();
239+
240+
GRPCMetrics.started(labels);
241+
const stopTimer = GRPCMetrics.startHandleTimer(labels);
242+
next(metadata, newListener);
243+
})
244+
.withSendMessage((message, next) => {
245+
GRPCMetrics.sent(labels);
246+
next(message);
247+
})
248+
.build();
249+
return new grpc.InterceptingCall(nextCall(options), requester);
250+
};
251+
}
252+
253+
export class MetricsReporter {
254+
private static readonly REPORT_INTERVAL = 60000;
255+
256+
private metricsHost: string;
257+
private intervalHandler: NodeJS.Timer | undefined;
258+
259+
constructor(gitpodHost: string, private logger: vscode.LogOutputChannel) {
260+
const serviceUrl = new URL(gitpodHost);
261+
this.metricsHost = `ide.${serviceUrl.hostname}`;
262+
}
263+
264+
startReporting() {
265+
if (this.intervalHandler) {
266+
return;
267+
}
268+
this.intervalHandler = setInterval(() => this.report().catch(e => this.logger.error('Error while reporting metrics', e)), MetricsReporter.REPORT_INTERVAL);
269+
}
270+
271+
private async report() {
272+
const metrics = await register.getMetricsAsJSON();
273+
register.resetMetrics();
274+
for (const m of metrics) {
275+
if (m.name === 'grpc_client_msg_sent_total' || m.name === 'grpc_client_msg_received_total') {
276+
// Skip these as thy are filtered by ide metris
277+
continue;
278+
}
279+
280+
const type = m.type as unknown as string;
281+
if (type === 'counter') {
282+
await this.reportCounter(m);
283+
} else if (type === 'histogram') {
284+
await this.reportHistogram(m);
285+
}
286+
}
287+
}
288+
289+
private async reportCounter(metric: metric) {
290+
const counterMetric = metric as metric & { values: [{ value: number; labels: Record<string, string> }] };
291+
for (const { value, labels } of counterMetric.values) {
292+
if (value > 0) {
293+
await this.doAddCounter(counterMetric.name, labels, value);
294+
}
295+
}
296+
}
297+
298+
private async reportHistogram(metric: metric) {
299+
const histogramMetric = metric as metric & { values: [{ value: number; labels: Record<string, string>; metricName: string }] };
300+
let sum = 0;
301+
let buckets: number[] = [];
302+
for (const { value, labels, metricName } of histogramMetric.values) {
303+
// metricName are in the following order _bucket, _sum, _count
304+
// We report on _count as it's the last
305+
// https://github.com/siimon/prom-client/blob/eee34858d2ef4198ff94f56a278d7b81f65e9c63/lib/histogram.js#L222-L235
306+
if (metricName.endsWith('_bucket')) {
307+
if (labels['le'] !== '+Inf') {
308+
buckets.push(value);
309+
}
310+
} else if (metricName.endsWith('_sum')) {
311+
sum = value;
312+
} else if (metricName.endsWith('_count')) {
313+
if (value > 0) {
314+
await this.doAddHistogram(histogramMetric.name, labels, value, sum, buckets);
315+
}
316+
sum = 0;
317+
buckets = [];
318+
}
319+
}
320+
}
321+
322+
private async doAddCounter(name: string, labels: Record<string, string>, value: number) {
323+
const data = {
324+
name,
325+
labels,
326+
value,
327+
};
328+
329+
const resp = await fetch(
330+
`https://${this.metricsHost}/metrics-api/metrics/counter/add/${name}`,
331+
{
332+
method: 'POST',
333+
headers: {
334+
'Content-Type': 'application/json',
335+
'X-Client': 'vscode-desktop-extension'
336+
},
337+
body: JSON.stringify(data)
338+
}
339+
);
340+
341+
if (!resp.ok) {
342+
throw new Error(`Metrics endpoint responded with ${resp.status} ${resp.statusText}`);
343+
}
344+
}
345+
346+
private async doAddHistogram(name: string, labels: Record<string, string>, count: number, sum: number, buckets: number[]) {
347+
const data = {
348+
name,
349+
labels,
350+
count,
351+
sum,
352+
buckets,
353+
};
354+
355+
const resp = await fetch(
356+
`https://${this.metricsHost}/metrics-api/metrics/histogram/add/${name}`,
357+
{
358+
method: 'POST',
359+
headers: {
360+
'Content-Type': 'application/json',
361+
'X-Client': 'vscode-desktop-extension'
362+
},
363+
body: JSON.stringify(data)
364+
}
365+
);
366+
367+
if (!resp.ok) {
368+
throw new Error(`Metrics endpoint responded with ${resp.status} ${resp.statusText}`);
369+
}
370+
}
371+
372+
stopReporting() {
373+
if (this.intervalHandler) {
374+
clearInterval(this.intervalHandler);
375+
}
376+
}
377+
}

0 commit comments

Comments
 (0)