Skip to content

Commit 8d92025

Browse files
authored
feat: some more metrics on the schema service (#6748)
1 parent eb5ff48 commit 8d92025

File tree

4 files changed

+75
-26
lines changed

4 files changed

+75
-26
lines changed

deployment/grafana-dashboards/Schema-Service.json

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@
269269
"uid": "grafanacloud-prom"
270270
},
271271
"editorMode": "code",
272-
"expr": "sum by(cache) (rate(schema_external_composition_total{type=\"success\"}[$__rate_interval]))",
272+
"expr": "sum by(cache) (rate(schema_composition_total{type=\"success\"}[$__rate_interval]))",
273273
"instant": false,
274274
"legendFormat": "{{cache}}",
275275
"range": true,
@@ -334,7 +334,7 @@
334334
"uid": "grafanacloud-prom"
335335
},
336336
"editorMode": "code",
337-
"expr": "sum by(cache) (rate(schema_external_composition_total{type=\"success\"}[$__rate_interval]))",
337+
"expr": "sum by(cache) (rate(schema_composition_total{type=\"success\"}[$__rate_interval]))",
338338
"instant": false,
339339
"legendFormat": "{{cache}}",
340340
"range": true,
@@ -426,7 +426,7 @@
426426
"uid": "grafanacloud-prom"
427427
},
428428
"editorMode": "code",
429-
"expr": "sum by(cache) (rate(schema_external_composition_total{type=\"failure\"}[$__rate_interval]))",
429+
"expr": "sum by(cache) (rate(schema_composition_total{type=\"failure\"}[$__rate_interval]))",
430430
"instant": false,
431431
"legendFormat": "{{cache}}",
432432
"range": true,
@@ -491,7 +491,7 @@
491491
"uid": "grafanacloud-prom"
492492
},
493493
"editorMode": "code",
494-
"expr": "sum by(cache) (rate(schema_external_composition_total{type=\"failure\"}[$__rate_interval]))",
494+
"expr": "sum by(cache) (rate(schema_composition_total{type=\"failure\"}[$__rate_interval]))",
495495
"instant": false,
496496
"legendFormat": "{{cache}}",
497497
"range": true,
@@ -587,7 +587,7 @@
587587
"uid": "grafanacloud-prom"
588588
},
589589
"editorMode": "code",
590-
"expr": "sum by(cache) (rate(schema_external_composition_total{type=\"timeout\"}[$__rate_interval]))",
590+
"expr": "sum by(cache) (rate(schema_composition_total{type=\"timeout\"}[$__rate_interval]))",
591591
"instant": false,
592592
"legendFormat": "{{cache}}",
593593
"range": true,
@@ -652,7 +652,7 @@
652652
"uid": "grafanacloud-prom"
653653
},
654654
"editorMode": "code",
655-
"expr": "sum by(cache) (rate(schema_external_composition_total{type=\"timeout\"}[$__rate_interval]))",
655+
"expr": "sum by(cache) (rate(schema_composition_total{type=\"timeout\"}[$__rate_interval]))",
656656
"instant": false,
657657
"legendFormat": "{{cache}}",
658658
"range": true,

packages/services/schema/src/cache.ts

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import stringify from 'fast-json-stable-stringify';
33
import type { Redis } from 'ioredis';
44
import { TimeoutError } from 'p-timeout';
55
import type { ServiceLogger } from '@hive/service-common';
6-
import { externalCompositionCounter } from './metrics';
6+
import { compositionCacheValueSizeBytes, schemaCompositionCounter } from './metrics';
77

88
function createChecksum<TInput>(input: TInput): string {
99
return createHash('sha256').update(stringify(input)).digest('hex');
@@ -101,13 +101,18 @@ export function createCache(options: {
101101
pickCacheType: (data: T) => CacheTTLType,
102102
): Promise<void> {
103103
logger.debug('Completing action (id=%s)', id);
104+
const encodedData = JSON.stringify({
105+
status: 'completed',
106+
result: data,
107+
});
108+
109+
const sizeInBytes = Buffer.byteLength(encodedData, 'utf8');
110+
compositionCacheValueSizeBytes.observe(sizeInBytes);
111+
104112
await redis.psetex(
105113
id,
106114
pickCacheType(data) === 'long' ? ttlMs.success : ttlMs.failure,
107-
JSON.stringify({
108-
status: 'completed',
109-
result: data,
110-
}),
115+
encodedData,
111116
);
112117
}
113118

@@ -152,21 +157,21 @@ export function createCache(options: {
152157
if (cached.status === 'failed') {
153158
logger.debug('Rejecting action from cache (id=%s)', id);
154159
if (cached.error.startsWith('TimeoutError:')) {
155-
externalCompositionCounter.inc({
160+
schemaCompositionCounter.inc({
156161
cache: 'hit',
157162
type: 'timeout',
158163
});
159164
throw new TimeoutError(cached.error.replace('TimeoutError:', ''));
160165
}
161-
externalCompositionCounter.inc({
166+
schemaCompositionCounter.inc({
162167
cache: 'hit',
163168
type: 'failure',
164169
});
165170
throw new Error(cached.error);
166171
}
167172

168173
logger.debug('Resolving action from cache (id=%s)', id);
169-
externalCompositionCounter.inc({
174+
schemaCompositionCounter.inc({
170175
cache: 'hit',
171176
type: 'success',
172177
});
@@ -190,13 +195,13 @@ export function createCache(options: {
190195
]);
191196

192197
await completeAction(id, result, pickCacheType);
193-
externalCompositionCounter.inc({
198+
schemaCompositionCounter.inc({
194199
cache: 'miss',
195200
type: 'success',
196201
});
197202
return result;
198203
} catch (error) {
199-
externalCompositionCounter.inc({
204+
schemaCompositionCounter.inc({
200205
cache: 'miss',
201206
type: error instanceof TimeoutError ? 'timeout' : 'failure',
202207
});

packages/services/schema/src/composition-scheduler.ts

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ import fastq from 'fastq';
44
import * as Sentry from '@sentry/node';
55
import { registerWorkerLogging, type Logger } from '../../api/src/modules/shared/providers/logger';
66
import type { CompositionEvent, CompositionResultEvent } from './composition-worker';
7+
import {
8+
compositionQueueDurationMS,
9+
compositionTotalDurationMS,
10+
compositionWorkerDurationMS,
11+
} from './metrics';
712

813
type WorkerRunArgs = {
914
data: CompositionEvent['data'];
@@ -20,6 +25,11 @@ type WorkerInterface = {
2025
run: (args: WorkerRunArgs) => Promise<CompositionResultEvent['data']>;
2126
};
2227

28+
type QueueData = {
29+
args: WorkerRunArgs;
30+
addedToQueueTime: number;
31+
};
32+
2333
export class CompositionScheduler {
2434
private logger: Logger;
2535
/** The amount of parallel workers */
@@ -28,7 +38,7 @@ export class CompositionScheduler {
2838
/** List of all workers */
2939
private workers: Array<WorkerInterface>;
3040

31-
private queue: fastq.queueAsPromised<WorkerRunArgs, CompositionResultEvent['data']>;
41+
private queue: fastq.queueAsPromised<QueueData, CompositionResultEvent['data']>;
3242

3343
constructor(logger: Logger, workerCount: number, maxOldGenerationSizeMb: number) {
3444
this.workerCount = workerCount;
@@ -38,17 +48,26 @@ export class CompositionScheduler {
3848
this.workers = workers;
3949

4050
this.queue = fastq.promise(
41-
function queue(data) {
51+
async function queue(data) {
4252
// Let's not process aborted requests
43-
if (data.abortSignal.aborted) {
44-
throw data.abortSignal.reason;
53+
if (data.args.abortSignal.aborted) {
54+
throw data.args.abortSignal.reason;
4555
}
56+
const startProcessingTime = now();
57+
compositionQueueDurationMS.observe(startProcessingTime - data.addedToQueueTime);
4658
const worker = workers.find(worker => worker.isIdle);
47-
4859
if (!worker) {
4960
throw new Error('No idle worker found.');
5061
}
51-
return worker.run(data);
62+
63+
const result = await worker.run(data.args);
64+
const finishedTime = now();
65+
compositionWorkerDurationMS.observe(
66+
{ type: data.args.data.type },
67+
finishedTime - startProcessingTime,
68+
);
69+
compositionTotalDurationMS.observe(finishedTime - data.addedToQueueTime);
70+
return result;
5271
},
5372
// The size needs to be the same as the length of `this.workers`.
5473
// Otherwise a worker would process more than a single task at a time.
@@ -179,6 +198,10 @@ export class CompositionScheduler {
179198

180199
/** Process a composition task in a worker (once the next worker is free). */
181200
process(args: WorkerRunArgs): Promise<CompositionResultEvent['data']> {
182-
return this.queue.push(args);
201+
return this.queue.push({ args, addedToQueueTime: now() });
183202
}
184203
}
204+
205+
function now() {
206+
return new Date().getTime();
207+
}

packages/services/schema/src/metrics.ts

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,29 @@ export const composeAndValidateCounter = new metrics.Counter({
66
labelNames: ['type'],
77
});
88

9-
export const externalCompositionCounter = new metrics.Counter({
10-
name: 'schema_external_composition_total',
11-
help: 'Number of external compositions',
9+
export const schemaCompositionCounter = new metrics.Counter({
10+
name: 'schema_composition_total',
11+
help: 'Number of schema compositions',
1212
labelNames: ['cache' /* hit or miss */, 'type' /* success, failure or timeout */],
1313
});
14+
15+
export const compositionTotalDurationMS = new metrics.Histogram({
16+
name: 'composition_total_duration_ms',
17+
help: 'Total time of processing a composition (includes time in queue + actual processing time)',
18+
});
19+
20+
export const compositionQueueDurationMS = new metrics.Histogram({
21+
name: 'composition_queue_duration_ms',
22+
help: 'Time spent in queue before being processed.',
23+
});
24+
25+
export const compositionWorkerDurationMS = new metrics.Histogram({
26+
name: 'composition_worker_duration_ms',
27+
help: 'Time of running composition in worker',
28+
labelNames: ['type' /* single, federation or stitching */],
29+
});
30+
31+
export const compositionCacheValueSizeBytes = new metrics.Histogram({
32+
name: 'composition_cache_value_size_bytes',
33+
help: 'The size of the cache entries.',
34+
});

0 commit comments

Comments
 (0)