Skip to content

Commit aff25ac

Browse files
authored
EXP-590 Waiting message in queue metric (#309)
* EXP-590 Adding PrometheusMessageQueueTimeMetric * EXP-590 exposing new metric + release prepare * EXP-590 Adding tests * Exposing missing export
1 parent d2f12bd commit aff25ac

File tree

5 files changed

+259
-4
lines changed

5 files changed

+259
-4
lines changed

packages/metrics/lib/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
export * from './prometheus/metrics/message-time/PrometheusMessageTimeMetric.ts'
12
export * from './prometheus/metrics/message-time/PrometheusMessageLifetimeMetric.ts'
23
export * from './prometheus/metrics/message-time/PrometheusMessageProcessingTimeMetric.ts'
4+
export * from './prometheus/metrics/message-time/PrometheusMessageQueueTimeMetric.ts'
35
export * from './prometheus/metrics/message-error/PrometheusMessageErrorCounter.ts'
46
export * from './prometheus/PrometheusMessageMetric.ts'
57
export * from './prometheus/types.ts'

packages/metrics/lib/prometheus/metrics/message-time/PrometheusMessageLifetimeMetric.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,8 @@ export class PrometheusMessageLifetimeMetric<
77
protected calculateObservedValue(
88
metadata: ProcessedMessageMetadata<MessagePayload>,
99
): number | null {
10-
if (!metadata.messageTimestamp) {
11-
return null
12-
}
10+
if (!metadata.messageTimestamp) return null
11+
1312
return metadata.messageProcessingEndTimestamp - metadata.messageTimestamp
1413
}
1514
}
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
import type { ProcessedMessageMetadata } from '@message-queue-toolkit/core'
2+
import * as promClient from 'prom-client'
3+
import type { Histogram } from 'prom-client'
4+
import { describe, expect, it, vi } from 'vitest'
5+
import { PrometheusMessageQueueTimeMetric } from './PrometheusMessageQueueTimeMetric.js'
6+
7+
type TestMessage = {
8+
id: string
9+
messageType: 'test'
10+
timestamp?: string
11+
metadata?: {
12+
schemaVersion: string
13+
}
14+
}
15+
16+
describe('PrometheusMessageQueueTimeMetric', () => {
17+
it('creates and uses Histogram metric properly', () => {
18+
// Given
19+
const registeredMessages: ProcessedMessageMetadata<TestMessage>[] = []
20+
const metric = new PrometheusMessageQueueTimeMetric<TestMessage>(
21+
{
22+
name: 'test_metric',
23+
helpDescription: 'test description',
24+
buckets: [1, 2, 3],
25+
messageVersion: (metadata: ProcessedMessageMetadata<TestMessage>) => {
26+
registeredMessages.push(metadata) // Mocking it to check if value is registered properly
27+
return undefined
28+
},
29+
},
30+
promClient,
31+
)
32+
33+
// When
34+
const messages: TestMessage[] = [
35+
{
36+
id: '1',
37+
messageType: 'test',
38+
timestamp: new Date().toISOString(),
39+
},
40+
{
41+
id: '2',
42+
messageType: 'test',
43+
timestamp: new Date().toISOString(),
44+
metadata: {
45+
schemaVersion: '1.0.0',
46+
},
47+
},
48+
]
49+
50+
const timestamp = Date.now()
51+
const processedMessageMetadataEntries: ProcessedMessageMetadata<TestMessage>[] = messages.map(
52+
(message) => ({
53+
messageId: message.id,
54+
messageType: message.messageType,
55+
processingResult: { status: 'consumed' },
56+
message: message,
57+
queueName: 'test-queue',
58+
messageTimestamp: timestamp,
59+
messageProcessingStartTimestamp: timestamp,
60+
messageProcessingEndTimestamp: timestamp + 102,
61+
}),
62+
)
63+
64+
for (const processedMessageMetadata of processedMessageMetadataEntries) {
65+
metric.registerProcessedMessage(processedMessageMetadata)
66+
}
67+
68+
// Then
69+
expect(registeredMessages).toStrictEqual(processedMessageMetadataEntries)
70+
})
71+
72+
it('skips message if messageTimestamp is not available', () => {
73+
// Given
74+
const observedValues: { labels: Record<string, any>; value: number }[] = []
75+
vi.spyOn(promClient.register, 'getSingleMetric').mockReturnValue({
76+
observe(labels: Record<string, string | number>, value: number) {
77+
observedValues.push({ labels, value })
78+
},
79+
} as Histogram)
80+
81+
const metric = new PrometheusMessageQueueTimeMetric<TestMessage>(
82+
{
83+
name: 'Test metric',
84+
helpDescription: 'test description',
85+
buckets: [1, 2, 3],
86+
},
87+
promClient,
88+
)
89+
90+
// When
91+
const message: TestMessage = {
92+
id: '1',
93+
messageType: 'test',
94+
timestamp: new Date().toISOString(),
95+
}
96+
97+
const timestamp = Date.now()
98+
metric.registerProcessedMessage({
99+
messageId: message.id,
100+
messageType: message.messageType,
101+
processingResult: { status: 'consumed' },
102+
message: message,
103+
queueName: 'test-queue',
104+
messageTimestamp: undefined,
105+
messageProcessingStartTimestamp: timestamp,
106+
messageProcessingEndTimestamp: timestamp,
107+
})
108+
109+
// Then
110+
expect(observedValues).toStrictEqual([])
111+
})
112+
113+
it('registers values properly', () => {
114+
// Given
115+
const observedValues: { labels: Record<string, any>; value: number }[] = []
116+
vi.spyOn(promClient.register, 'getSingleMetric').mockReturnValue({
117+
observe(labels: Record<string, string | number>, value: number) {
118+
observedValues.push({ labels, value })
119+
},
120+
} as Histogram)
121+
122+
const metric = new PrometheusMessageQueueTimeMetric<TestMessage>(
123+
{
124+
name: 'Test metric',
125+
helpDescription: 'test description',
126+
buckets: [1, 2, 3],
127+
},
128+
promClient,
129+
)
130+
131+
// When
132+
const message: TestMessage = {
133+
id: '1',
134+
messageType: 'test',
135+
timestamp: new Date().toISOString(),
136+
}
137+
138+
const timestamp = Date.now()
139+
metric.registerProcessedMessage({
140+
messageId: message.id,
141+
messageType: message.messageType,
142+
processingResult: { status: 'consumed' },
143+
message: message,
144+
queueName: 'test-queue',
145+
messageTimestamp: timestamp,
146+
messageProcessingStartTimestamp: timestamp + 102,
147+
messageProcessingEndTimestamp: timestamp,
148+
})
149+
150+
// Then
151+
expect(observedValues).toStrictEqual([
152+
{
153+
labels: {
154+
messageType: 'test',
155+
version: undefined,
156+
result: 'consumed',
157+
queue: 'test-queue',
158+
},
159+
value: 102,
160+
},
161+
])
162+
})
163+
164+
it('resolves version properly', () => {
165+
// Given
166+
const observedValues: { labels: Record<string, any>; value: number }[] = []
167+
vi.spyOn(promClient.register, 'getSingleMetric').mockReturnValue({
168+
observe(labels: Record<string, string | number>, value: number) {
169+
observedValues.push({ labels, value })
170+
},
171+
} as Histogram)
172+
173+
const metric = new PrometheusMessageQueueTimeMetric<TestMessage>(
174+
{
175+
name: 'Test metric',
176+
helpDescription: 'test description',
177+
buckets: [1, 2, 3],
178+
messageVersion: (metadata: ProcessedMessageMetadata<TestMessage>) =>
179+
metadata.message?.metadata?.schemaVersion,
180+
},
181+
promClient,
182+
)
183+
184+
// When
185+
const queueName = 'test-queue'
186+
const messages: TestMessage[] = [
187+
{
188+
id: '1',
189+
messageType: 'test',
190+
timestamp: new Date().toISOString(),
191+
},
192+
{
193+
id: '2',
194+
messageType: 'test',
195+
timestamp: new Date().toISOString(),
196+
metadata: {
197+
schemaVersion: '1.0.0',
198+
},
199+
},
200+
]
201+
202+
for (const message of messages) {
203+
const timestamp = Date.now()
204+
metric.registerProcessedMessage({
205+
messageId: message.id,
206+
messageType: message.messageType,
207+
processingResult: { status: 'consumed' },
208+
message: message,
209+
queueName,
210+
messageTimestamp: Date.now(),
211+
messageProcessingStartTimestamp: timestamp + 53,
212+
messageProcessingEndTimestamp: timestamp,
213+
})
214+
}
215+
216+
// Then
217+
expect(observedValues).toStrictEqual([
218+
{
219+
labels: {
220+
messageType: 'test',
221+
version: undefined,
222+
result: 'consumed',
223+
queue: queueName,
224+
},
225+
value: 53,
226+
},
227+
{
228+
labels: {
229+
messageType: 'test',
230+
version: '1.0.0',
231+
result: 'consumed',
232+
queue: queueName,
233+
},
234+
value: 53,
235+
},
236+
])
237+
})
238+
})
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import type { ProcessedMessageMetadata } from '@message-queue-toolkit/core'
2+
import { PrometheusMessageTimeMetric } from './PrometheusMessageTimeMetric.ts'
3+
4+
/**
5+
* This metric measures the time a message spends in the queue before processing starts.
6+
*/
7+
export class PrometheusMessageQueueTimeMetric<
8+
MessagePayload extends object,
9+
> extends PrometheusMessageTimeMetric<MessagePayload> {
10+
protected calculateObservedValue(
11+
metadata: ProcessedMessageMetadata<MessagePayload>,
12+
): number | null {
13+
if (!metadata.messageTimestamp) return null
14+
return metadata.messageProcessingStartTimestamp - metadata.messageTimestamp
15+
}
16+
}

packages/metrics/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/metrics",
3-
"version": "4.0.1",
3+
"version": "4.1.0",
44
"private": false,
55
"license": "MIT",
66
"description": "Utilities for collecting metrics in message-queue-toolkit",

0 commit comments

Comments
 (0)