Skip to content

Commit c6f5639

Browse files
committed
Add Fairness keys & weights
1 parent 24844c4 commit c6f5639

File tree

4 files changed

+72
-10
lines changed

4 files changed

+72
-10
lines changed

packages/common/src/priority.ts

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ import type { temporal } from '@temporalio/proto';
1111
* calling workflow, then use the default (documented on the field).
1212
* The overall semantics of Priority are:
1313
* 1. First, consider "priority_key": lower number goes first.
14-
* (more will be added here later)
14+
* 2. Then, consider fairness: the fairness mechanism attempts to dispatch tasks for a given key in
15+
* proportion to its weight.
1516
*/
1617
export interface Priority {
1718
/**
@@ -26,13 +27,43 @@ export interface Priority {
2627
* The default priority is (min+max)/2. With the default max of 5 and min of 1, that comes out to 3.
2728
*/
2829
priorityKey?: number;
30+
31+
/**
32+
* FairnessKey is a short string that's used as a key for a fairness
33+
* balancing mechanism. It may correspond to a tenant id, or to a fixed
34+
* string like "high" or "low". The default is the empty string.
35+
*
36+
* The fairness mechanism attempts to dispatch tasks for a given key in
37+
* proportion to its weight. For example, using a thousand distinct tenant
38+
* ids, each with a weight of 1.0 (the default) will result in each tenant
39+
* getting a roughly equal share of task dispatch throughput.
40+
*
41+
* Fairness keys are limited to 64 bytes.
42+
*/
43+
fairnessKey?: string;
44+
45+
/**
46+
* FairnessWeight for a task can come from multiple sources for
47+
* flexibility. From highest to lowest precedence:
48+
* 1. Weights for a small set of keys can be overridden in task queue
49+
* configuration with an API.
50+
* 2. It can be attached to the workflow/activity in this field.
51+
* 3. The default weight of 1.0 will be used.
52+
*
53+
* Weight values are clamped to the range [0.001, 1000].
54+
*/
55+
fairnessWeight?: number;
2956
}
3057

3158
/**
3259
* Turn a proto compatible Priority into a TS Priority
3360
*/
3461
export function decodePriority(priority?: temporal.api.common.v1.IPriority | null): Priority {
35-
return { priorityKey: priority?.priorityKey ?? undefined };
62+
return {
63+
priorityKey: priority?.priorityKey ?? undefined,
64+
fairnessKey: priority?.fairnessKey ?? undefined,
65+
fairnessWeight: priority?.fairnessWeight ?? undefined,
66+
};
3667
}
3768

3869
/**
@@ -50,5 +81,7 @@ export function compilePriority(priority: Priority): temporal.api.common.v1.IPri
5081

5182
return {
5283
priorityKey: priority.priorityKey ?? 0,
84+
fairnessKey: priority.fairnessKey ?? '',
85+
fairnessWeight: priority.fairnessWeight ?? 0,
5386
};
5487
}

packages/test/src/helpers.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,3 +308,14 @@ export async function saveHistory(fname: string, history: iface.temporal.api.his
308308
const fpath = path.resolve(__dirname, `../history_files/${fname}`);
309309
await fs.writeFile(fpath, historyToJSON(history));
310310
}
311+
312+
export function approximatelyEqual(
313+
a: number | null | undefined,
314+
b: number | null | undefined,
315+
tolerance = Number.EPSILON
316+
): boolean {
317+
if (a === null || a === undefined || b === null || b === undefined) {
318+
return false;
319+
}
320+
return Math.abs(a - b) < tolerance;
321+
}

packages/test/src/test-integration-split-three.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { configMacro, makeTestFn } from './helpers-integration-multi-codec';
88
import { configurableHelpers } from './helpers-integration';
99
import { withZeroesHTTPServer } from './zeroes-http-server';
1010
import * as activities from './activities';
11-
import { cleanOptionalStackTrace } from './helpers';
11+
import { approximatelyEqual, cleanOptionalStackTrace } from './helpers';
1212
import * as workflows from './workflows';
1313

1414
const test = makeTestFn(() => bundleWorkflowCode({ workflowsPath: require.resolve('./workflows') }));
@@ -151,7 +151,7 @@ test(
151151
const worker = await createWorkerWithDefaults(t, { activities });
152152
const handle = await startWorkflow(workflows.priorityWorkflow, {
153153
args: [false, 1],
154-
priority: { priorityKey: 1 },
154+
priority: { priorityKey: 1, fairnessKey: 'main-workflow', fairnessWeight: 3.0 },
155155
});
156156
await worker.runUntil(handle.result());
157157
let firstChild = true;
@@ -161,19 +161,28 @@ test(
161161
switch (event.eventType) {
162162
case temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
163163
t.deepEqual(event.workflowExecutionStartedEventAttributes?.priority?.priorityKey, 1);
164+
t.deepEqual(event.workflowExecutionStartedEventAttributes?.priority?.fairnessKey, 'main-workflow');
165+
t.deepEqual(event.workflowExecutionStartedEventAttributes?.priority?.fairnessWeight, 3.0);
164166
break;
165167
case temporal.api.enums.v1.EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED: {
166-
const pri = event.startChildWorkflowExecutionInitiatedEventAttributes?.priority?.priorityKey;
168+
const priority = event.startChildWorkflowExecutionInitiatedEventAttributes?.priority;
167169
if (firstChild) {
168-
t.deepEqual(pri, 4);
170+
t.deepEqual(priority?.priorityKey, 4);
171+
t.deepEqual(priority?.fairnessKey, 'child-workflow-1');
172+
t.deepEqual(priority?.fairnessWeight, 2.5);
169173
firstChild = false;
170174
} else {
171-
t.deepEqual(pri, 2);
175+
t.deepEqual(priority?.priorityKey, 2);
176+
t.deepEqual(priority?.fairnessKey, 'child-workflow-2');
177+
t.deepEqual(priority?.fairnessWeight, 1.0);
172178
}
173179
break;
174180
}
175181
case temporal.api.enums.v1.EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
176182
t.deepEqual(event.activityTaskScheduledEventAttributes?.priority?.priorityKey, 5);
183+
t.deepEqual(event.activityTaskScheduledEventAttributes?.priority?.fairnessKey, 'fair-activity');
184+
// For some insane reason when proto reads this event it mangles the number to 4.19999999 something. Thanks Javascript.
185+
t.assert(approximatelyEqual(event.activityTaskScheduledEventAttributes?.priority?.fairnessWeight, 4.2));
177186
break;
178187
}
179188
}

packages/test/src/workflows/priority.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import { executeChild, proxyActivities, startChild, workflowInfo } from '@temporalio/workflow';
22
import type * as activities from '../activities';
33

4-
const { echo } = proxyActivities<typeof activities>({ startToCloseTimeout: '5s', priority: { priorityKey: 5 } });
4+
const { echo } = proxyActivities<typeof activities>({
5+
startToCloseTimeout: '5s',
6+
priority: { priorityKey: 5, fairnessKey: 'fair-activity', fairnessWeight: 4.2 },
7+
});
58

69
export async function priorityWorkflow(stopAfterCheck: boolean, expectedPriority: number | undefined): Promise<void> {
710
const info = workflowInfo();
@@ -17,9 +20,15 @@ export async function priorityWorkflow(stopAfterCheck: boolean, expectedPriority
1720
return;
1821
}
1922

20-
await executeChild(priorityWorkflow, { args: [true, 4], priority: { priorityKey: 4 } });
23+
await executeChild(priorityWorkflow, {
24+
args: [true, 4],
25+
priority: { priorityKey: 4, fairnessKey: 'child-workflow-1', fairnessWeight: 2.5 },
26+
});
2127

22-
const child = await startChild(priorityWorkflow, { args: [true, 2], priority: { priorityKey: 2 } });
28+
const child = await startChild(priorityWorkflow, {
29+
args: [true, 2],
30+
priority: { priorityKey: 2, fairnessKey: 'child-workflow-2', fairnessWeight: 1.0 },
31+
});
2332
await child.result();
2433

2534
await echo('hi');

0 commit comments

Comments
 (0)