Skip to content

Commit cce3ab9

Browse files
committed
Add workflow interceptor support
1 parent 6d9d2e0 commit cce3ab9

File tree

3 files changed

+125
-3
lines changed

3 files changed

+125
-3
lines changed

packages/test/src/test-metrics-custom.ts

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { ExecutionContext } from 'ava';
2-
import { Runtime } from '@temporalio/worker';
2+
import { ActivityInboundCallsInterceptor, ActivityOutboundCallsInterceptor, Runtime } from '@temporalio/worker';
33
import * as workflow from '@temporalio/workflow';
4+
import { MetricTags } from '@temporalio/common';
5+
import { Context as ActivityContext, metricMeter as activityMetricMeter } from '@temporalio/activity';
46
import { Context as BaseContext, helpers, makeTestFunction } from './helpers-integration';
57
import { getRandomPort } from './helpers';
68

@@ -261,7 +263,7 @@ export async function MetricTagsWorkflow(): Promise<void> {
261263

262264
test('Metric tags in Workflow works', async (t) => {
263265
const { createWorker, executeWorkflow, taskQueue } = helpers(t);
264-
const tags = `labelA="value-a",labelB="value-b2",labelC="value-c",labelD="value-d",labelX="value-x",labelY="value-y",namespace="default",taskQueue="${taskQueue}"`;
266+
const tags = `labelA="value-a",labelB="value-b2",labelC="value-c",labelD="value-d",labelX="value-x",labelY="value-y",namespace="default",taskQueue="${taskQueue}",workflowType="MetricTagsWorkflow"`;
265267

266268
const worker = await createWorker();
267269
await worker.runUntil(executeWorkflow(MetricTagsWorkflow));
@@ -270,3 +272,103 @@ test('Metric tags in Workflow works', async (t) => {
270272
await assertMetricReported(t, new RegExp(`workflow2_histogram_bucket{${tags},le="50"} 1`));
271273
await assertMetricReported(t, new RegExp(`workflow2_gauge{${tags}} 2`));
272274
});
275+
276+
// Define workflow interceptor for metrics
277+
export const interceptors = (): workflow.WorkflowInterceptors => ({
278+
outbound: [
279+
{
280+
getMetricTags(tags: MetricTags): MetricTags {
281+
if (!workflow.workflowInfo().workflowType.includes('Interceptor')) return tags;
282+
return {
283+
...tags,
284+
intercepted: 'workflow-interceptor',
285+
};
286+
},
287+
},
288+
],
289+
});
290+
291+
// Define activity interceptor for metrics
292+
export function activityInterceptorFactory(_ctx: ActivityContext): {
293+
inbound?: ActivityInboundCallsInterceptor;
294+
outbound?: ActivityOutboundCallsInterceptor;
295+
} {
296+
return {
297+
outbound: {
298+
getMetricTags(tags: MetricTags): MetricTags {
299+
return {
300+
...tags,
301+
intercepted: 'activity-interceptor',
302+
};
303+
},
304+
},
305+
};
306+
}
307+
308+
// Activity that uses metrics
309+
export async function metricActivity(): Promise<void> {
310+
const { metricMeter } = ActivityContext.current();
311+
312+
const counter = metricMeter.createCounter('activity-counter');
313+
counter.add(5);
314+
315+
const histogram = metricMeter.createHistogram('activity-histogram');
316+
histogram.record(10);
317+
318+
// Use the `metricMeter` exported from the top level of the activity module rather than the one in the context
319+
const gauge = activityMetricMeter.createGauge('activity-gauge');
320+
gauge.set(15);
321+
}
322+
323+
// Workflow that uses metrics and calls the activity
324+
export async function metricsInterceptorWorkflow(): Promise<void> {
325+
const metricMeter = workflow.metricMeter;
326+
327+
// Use workflow metrics
328+
const counter = metricMeter.createCounter('intercepted-workflow-counter');
329+
counter.add(3);
330+
331+
const histogram = metricMeter.createHistogram('intercepted-workflow-histogram');
332+
histogram.record(6);
333+
334+
const gauge = metricMeter.createGauge('intercepted-workflow-gauge');
335+
gauge.set(9);
336+
337+
// Call activity with metrics
338+
await workflow
339+
.proxyActivities({
340+
startToCloseTimeout: '1 minute',
341+
})
342+
.metricActivity();
343+
}
344+
345+
// Test for workflow metrics interceptor
346+
test('Workflow and Activity Context metrics interceptors add tags', async (t) => {
347+
const { createWorker, executeWorkflow, taskQueue } = helpers(t);
348+
349+
const worker = await createWorker({
350+
taskQueue,
351+
workflowsPath: __filename,
352+
activities: {
353+
metricActivity,
354+
},
355+
interceptors: {
356+
activity: [activityInterceptorFactory],
357+
},
358+
});
359+
360+
await worker.runUntil(executeWorkflow(metricsInterceptorWorkflow));
361+
362+
// Verify workflow metrics have interceptor tag
363+
await assertMetricReported(t, /intercepted_workflow_counter{[^}]*intercepted="workflow-interceptor"[^}]*} 3/);
364+
await assertMetricReported(
365+
t,
366+
/intercepted_workflow_histogram_bucket{[^}]*intercepted="workflow-interceptor"[^}]*} \d+/
367+
);
368+
await assertMetricReported(t, /intercepted_workflow_gauge{[^}]*intercepted="workflow-interceptor"[^}]*} 9/);
369+
370+
// Verify activity metrics have interceptor tag
371+
await assertMetricReported(t, /activity_counter{[^}]*intercepted="activity-interceptor"[^}]*} 5/);
372+
await assertMetricReported(t, /activity_histogram_bucket{[^}]*intercepted="activity-interceptor"[^}]*} \d+/);
373+
await assertMetricReported(t, /activity_gauge{[^}]*intercepted="activity-interceptor"[^}]*} 15/);
374+
});

packages/test/src/test-sinks.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ if (RUN_INTEGRATION_TESTS) {
9494
});
9595

9696
// Capture volatile values that are hard to predict
97+
// eslint-disable-next-line deprecation/deprecation
9798
const { historySize, startTime, runStartTime, currentBuildId } = recordedCalls[0].info;
9899
t.true(historySize > 300);
99100

packages/workflow/src/metrics.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ class WorkflowMetricCounter implements MetricCounter {
5555
) {}
5656

5757
add(value: number, extraTags: MetricTags = {}): void {
58+
if (value < 0) {
59+
throw new Error(`MetricCounter value must be non-negative (got ${value})`);
60+
}
5861
if (!workflowInfo().unsafe.isReplaying) {
5962
metricSink.addMetricCounterValue(this.name, this.unit, this.description, value, extraTags);
6063
}
@@ -75,6 +78,9 @@ class WorkflowMetricHistogram implements MetricHistogram {
7578
) {}
7679

7780
record(value: number, extraTags: MetricTags = {}): void {
81+
if (value < 0) {
82+
throw new Error(`MetricHistogram value must be non-negative (got ${value})`);
83+
}
7884
if (!workflowInfo().unsafe.isReplaying) {
7985
metricSink.recordMetricHistogramValue(this.name, this.valueType, this.unit, this.description, value, extraTags);
8086
}
@@ -95,6 +101,9 @@ class WorkflowMetricGauge implements MetricGauge {
95101
) {}
96102

97103
set(value: number, tags?: MetricTags): void {
104+
if (value < 0) {
105+
throw new Error(`MetricGauge value must be non-negative (got ${value})`);
106+
}
98107
if (!workflowInfo().unsafe.isReplaying) {
99108
metricSink.setMetricGaugeValue(this.name, this.valueType, this.unit, this.description, value, tags ?? {});
100109
}
@@ -168,6 +177,16 @@ export interface WorkflowMetricMeter extends Sink {
168177
*/
169178
export const metricMeter: MetricMeter = MetricMeterWithComposedTags.compose(
170179
new WorkflowMetricMeterImpl(),
171-
() => ({}), // FIXME: Add base tags + call getTags interceptor
180+
() => {
181+
const activator = assertInWorkflowContext('Workflow.metricMeter may only be used from workflow context.');
182+
const getMetricTags = composeInterceptors(activator.interceptors.outbound, 'getMetricTags', (a) => a);
183+
184+
const info = activator.info;
185+
return getMetricTags({
186+
namespace: info.namespace,
187+
taskQueue: info.taskQueue,
188+
workflowType: info.workflowType,
189+
});
190+
},
172191
true
173192
);

0 commit comments

Comments
 (0)