Skip to content

Commit b6b4290

Browse files
authored
Fairness Keys & Weights (#2633)
1 parent 71c7426 commit b6b4290

File tree

8 files changed

+158
-55
lines changed

8 files changed

+158
-55
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ jobs:
7272

7373
- name: Start containerized server and dependencies
7474
env:
75-
TEMPORAL_CLI_VERSION: 1.4.0
75+
TEMPORAL_CLI_VERSION: 1.4.1-cloud-v1-29-0-139-2.0
7676
run: |
7777
wget -O temporal_cli.tar.gz https://github.com/temporalio/cli/releases/download/v${TEMPORAL_CLI_VERSION}/temporal_cli_${TEMPORAL_CLI_VERSION}_linux_amd64.tar.gz
7878
tar -xzf temporal_cli.tar.gz

temporal-sdk/src/main/java/io/temporal/common/Priority.java

Lines changed: 76 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,16 @@ public static Priority getDefaultInstance() {
3131

3232
public static final class Builder {
3333
private int priorityKey;
34+
private String fairnessKey;
35+
private float fairnessWeight;
3436

3537
private Builder(Priority options) {
3638
if (options == null) {
3739
return;
3840
}
3941
this.priorityKey = options.getPriorityKey();
42+
this.fairnessKey = options.getFairnessKey();
43+
this.fairnessWeight = options.getFairnessWeight();
4044
}
4145

4246
/**
@@ -55,16 +59,55 @@ public Builder setPriorityKey(int priorityKey) {
5559
return this;
5660
}
5761

62+
/**
63+
* FairnessKey is a short string that's used as a key for a fairness balancing mechanism. It may
64+
* correspond to a tenant id, or to a fixed string like "high" or "low". The default is the
65+
* empty string.
66+
*
67+
* <p>>The fairness mechanism attempts to dispatch tasks for a given key in proportion to its
68+
* weight. For example, using a thousand distinct tenant ids, each with a weight of 1.0 (the
69+
* default) will result in each tenant getting a roughly equal share of task dispatch
70+
* throughput.
71+
*
72+
* <p>Fairness keys are limited to 64 bytes.
73+
*/
74+
public Builder setFairnessKey(String fairnessKey) {
75+
this.fairnessKey = fairnessKey;
76+
return this;
77+
}
78+
79+
/**
80+
* FairnessWeight for a task can come from multiple sources for flexibility. From highest to
81+
* lowest precedence:
82+
*
83+
* <ul>
84+
* <li>Weights for a small set of keys can be overridden in task queue configuration with an
85+
* API.
86+
* <li>It can be attached to the workflow/activity in this field.
87+
* <li>The default weight of 1.0 will be used.
88+
* </ul>
89+
*
90+
* <p>Weight values are clamped to the range [0.001, 1000].
91+
*/
92+
public Builder setFairnessWeight(float fairnessWeight) {
93+
this.fairnessWeight = fairnessWeight;
94+
return this;
95+
}
96+
5897
public Priority build() {
59-
return new Priority(priorityKey);
98+
return new Priority(priorityKey, fairnessKey, fairnessWeight);
6099
}
61100
}
62101

63-
private Priority(int priorityKey) {
102+
private Priority(int priorityKey, String fairnessKey, float fairnessWeight) {
64103
this.priorityKey = priorityKey;
104+
this.fairnessKey = fairnessKey;
105+
this.fairnessWeight = fairnessWeight;
65106
}
66107

67108
private final int priorityKey;
109+
private final String fairnessKey;
110+
private final float fairnessWeight;
68111

69112
/**
70113
* See {@link Builder#setPriorityKey(int)}
@@ -75,20 +118,48 @@ public int getPriorityKey() {
75118
return priorityKey;
76119
}
77120

121+
/**
122+
* See {@link Builder#setFairnessKey(String)}
123+
*
124+
* @return The fairness key
125+
*/
126+
public String getFairnessKey() {
127+
return fairnessKey;
128+
}
129+
130+
/**
131+
* See {@link Builder#setFairnessWeight(float)}
132+
*
133+
* @return The fairness weight
134+
*/
135+
public float getFairnessWeight() {
136+
return fairnessWeight;
137+
}
138+
78139
@Override
79140
public String toString() {
80-
return "Priority{" + "priorityKey=" + priorityKey + '}';
141+
return "Priority{"
142+
+ "priorityKey="
143+
+ priorityKey
144+
+ ", fairnessKey='"
145+
+ fairnessKey
146+
+ '\''
147+
+ ", fairnessWeight="
148+
+ fairnessWeight
149+
+ '}';
81150
}
82151

83152
@Override
84153
public boolean equals(Object o) {
85154
if (o == null || getClass() != o.getClass()) return false;
86155
Priority priority = (Priority) o;
87-
return priorityKey == priority.priorityKey;
156+
return priorityKey == priority.priorityKey
157+
&& Float.compare(priority.fairnessWeight, fairnessWeight) == 0
158+
&& Objects.equals(fairnessKey, priority.fairnessKey);
88159
}
89160

90161
@Override
91162
public int hashCode() {
92-
return Objects.hashCode(priorityKey);
163+
return Objects.hash(priorityKey, fairnessKey, fairnessWeight);
93164
}
94165
}

temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -166,14 +166,19 @@ private RuntimeException failureToExceptionImpl(Failure failure, DataConverter d
166166
case NEXUS_OPERATION_EXECUTION_FAILURE_INFO:
167167
{
168168
NexusOperationFailureInfo info = failure.getNexusOperationExecutionFailureInfo();
169-
return new NexusOperationFailure(
170-
failure.getMessage(),
171-
info.getScheduledEventId(),
172-
info.getEndpoint(),
173-
info.getService(),
174-
info.getOperation(),
175-
info.getOperationToken().isEmpty() ? info.getOperationId() : info.getOperationToken(),
176-
cause);
169+
@SuppressWarnings("deprecation")
170+
NexusOperationFailure f =
171+
new NexusOperationFailure(
172+
failure.getMessage(),
173+
info.getScheduledEventId(),
174+
info.getEndpoint(),
175+
info.getService(),
176+
info.getOperation(),
177+
info.getOperationToken().isEmpty()
178+
? info.getOperationId()
179+
: info.getOperationToken(),
180+
cause);
181+
return f;
177182
}
178183
case NEXUS_HANDLER_FAILURE_INFO:
179184
{
@@ -307,6 +312,7 @@ private Failure exceptionToFailure(Throwable throwable) {
307312
failure.setCanceledFailureInfo(info);
308313
} else if (throwable instanceof NexusOperationFailure) {
309314
NexusOperationFailure no = (NexusOperationFailure) throwable;
315+
@SuppressWarnings("deprecation")
310316
NexusOperationFailureInfo.Builder op =
311317
NexusOperationFailureInfo.newBuilder()
312318
.setScheduledEventId(no.getScheduledEventId())

temporal-sdk/src/main/java/io/temporal/internal/common/ProtoConverters.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,27 @@
88

99
public class ProtoConverters {
1010
public static Priority toProto(io.temporal.common.Priority priority) {
11-
return Priority.newBuilder().setPriorityKey(priority.getPriorityKey()).build();
11+
Priority.Builder builder = Priority.newBuilder().setPriorityKey(priority.getPriorityKey());
12+
if (priority.getFairnessKey() != null) {
13+
builder.setFairnessKey(priority.getFairnessKey());
14+
}
15+
if (priority.getFairnessWeight() != 0.0f) {
16+
builder.setFairnessWeight(priority.getFairnessWeight());
17+
}
18+
return builder.build();
1219
}
1320

1421
@Nonnull
1522
public static io.temporal.common.Priority fromProto(@Nonnull Priority priority) {
16-
return io.temporal.common.Priority.newBuilder()
17-
.setPriorityKey(priority.getPriorityKey())
18-
.build();
23+
io.temporal.common.Priority.Builder builder =
24+
io.temporal.common.Priority.newBuilder().setPriorityKey(priority.getPriorityKey());
25+
if (!priority.getFairnessKey().isEmpty()) {
26+
builder.setFairnessKey(priority.getFairnessKey());
27+
}
28+
if (priority.getFairnessWeight() != 0.0f) {
29+
builder.setFairnessWeight(priority.getFairnessWeight());
30+
}
31+
return builder.build();
1932
}
2033

2134
public static io.temporal.api.deployment.v1.WorkerDeploymentVersion toProto(

temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ private CancelOperationResponse handleCancelledOperation(
186186
OperationContext.Builder ctx, CancelOperationRequest task) {
187187
ctx.setService(task.getService()).setOperation(task.getOperation());
188188

189+
@SuppressWarnings("deprecation") // getOperationId kept to support old server for a while
189190
OperationCancelDetails operationCancelDetails =
190191
OperationCancelDetails.newBuilder()
191192
.setOperationToken(

temporal-sdk/src/test/java/io/temporal/workflow/PriorityInfoTest.java

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,15 @@ public void testPriority() {
3333
TestWorkflow1.class,
3434
WorkflowOptions.newBuilder()
3535
.setTaskQueue(testWorkflowRule.getTaskQueue())
36-
.setPriority(Priority.newBuilder().setPriorityKey(5).build())
36+
.setPriority(
37+
Priority.newBuilder()
38+
.setPriorityKey(5)
39+
.setFairnessKey("tenant-123")
40+
.setFairnessWeight(2.5f)
41+
.build())
3742
.build());
3843
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
39-
assertEquals("5", result);
44+
assertEquals("5:tenant-123:2.5", result);
4045
}
4146

4247
@ActivityInterface
@@ -47,15 +52,18 @@ public interface PriorityActivities {
4752
public static class PriorityActivitiesImpl implements PriorityActivities {
4853
@Override
4954
public String activity1(String a1) {
50-
return String.valueOf(
51-
Activity.getExecutionContext().getInfo().getPriority().getPriorityKey());
55+
Priority priority = Activity.getExecutionContext().getInfo().getPriority();
56+
String key = priority.getFairnessKey() != null ? priority.getFairnessKey() : "null";
57+
return priority.getPriorityKey() + ":" + key + ":" + priority.getFairnessWeight();
5258
}
5359
}
5460

5561
public static class TestPriorityChildWorkflow implements TestWorkflows.TestWorkflowReturnString {
5662
@Override
5763
public String execute() {
58-
return String.valueOf(Workflow.getInfo().getPriority().getPriorityKey());
64+
Priority priority = Workflow.getInfo().getPriority();
65+
String key = priority.getFairnessKey() != null ? priority.getFairnessKey() : "null";
66+
return priority.getPriorityKey() + ":" + key + ":" + priority.getFairnessWeight();
5967
}
6068
}
6169

@@ -70,12 +78,17 @@ public String execute(String taskQueue) {
7078
ActivityOptions.newBuilder()
7179
.setTaskQueue(taskQueue)
7280
.setStartToCloseTimeout(Duration.ofSeconds(10))
73-
.setPriority(Priority.newBuilder().setPriorityKey(3).build())
81+
.setPriority(
82+
Priority.newBuilder()
83+
.setPriorityKey(3)
84+
.setFairnessKey("override")
85+
.setFairnessWeight(1.5f)
86+
.build())
7487
.setDisableEagerExecution(true)
7588
.build())
7689
.activity1("1");
77-
Assert.assertEquals("3", priority);
78-
// Test that of if no priority is set the workflows priority is used
90+
Assert.assertEquals("3:override:1.5", priority);
91+
// Test that if no priority is set the workflow's priority is used
7992
priority =
8093
Workflow.newActivityStub(
8194
PriorityActivities.class,
@@ -85,46 +98,37 @@ public String execute(String taskQueue) {
8598
.setDisableEagerExecution(true)
8699
.build())
87100
.activity1("2");
88-
Assert.assertEquals("5", priority);
89-
// Test that of if a default priority is set the workflows priority is used
90-
priority =
91-
Workflow.newActivityStub(
92-
PriorityActivities.class,
93-
ActivityOptions.newBuilder()
94-
.setTaskQueue(taskQueue)
95-
.setStartToCloseTimeout(Duration.ofSeconds(10))
96-
.setPriority(Priority.newBuilder().build())
97-
.setDisableEagerExecution(true)
98-
.build())
99-
.activity1("2");
100-
Assert.assertEquals("5", priority);
101+
Assert.assertEquals("5:tenant-123:2.5", priority);
101102
// Test that the priority is passed to child workflows
102103
priority =
103104
Workflow.newChildWorkflowStub(
104105
TestWorkflows.TestWorkflowReturnString.class,
105106
ChildWorkflowOptions.newBuilder()
106-
.setPriority(Priority.newBuilder().setPriorityKey(1).build())
107+
.setPriority(
108+
Priority.newBuilder()
109+
.setPriorityKey(1)
110+
.setFairnessKey("child")
111+
.setFairnessWeight(0.5f)
112+
.build())
107113
.build())
108114
.execute();
109-
Assert.assertEquals("1", priority);
110-
// Test that of no priority is set the workflows priority is used
115+
Assert.assertEquals("1:child:0.5", priority);
116+
// Test that if no priority is set the workflow's priority is used
111117
priority =
112118
Workflow.newChildWorkflowStub(
113119
TestWorkflows.TestWorkflowReturnString.class,
114120
ChildWorkflowOptions.newBuilder().build())
115121
.execute();
116-
Assert.assertEquals("5", priority);
117-
// Test that if a default priority is set the workflows priority is used
118-
priority =
119-
Workflow.newChildWorkflowStub(
120-
TestWorkflows.TestWorkflowReturnString.class,
121-
ChildWorkflowOptions.newBuilder()
122-
.setPriority(Priority.newBuilder().build())
123-
.build())
124-
.execute();
125-
Assert.assertEquals("5", priority);
126-
// Return the workflows priority
127-
return String.valueOf(Workflow.getInfo().getPriority().getPriorityKey());
122+
Assert.assertEquals("5:tenant-123:2.5", priority);
123+
// Return the workflow's priority
124+
Priority workflowPriority = Workflow.getInfo().getPriority();
125+
String key =
126+
workflowPriority.getFairnessKey() != null ? workflowPriority.getFairnessKey() : "null";
127+
return workflowPriority.getPriorityKey()
128+
+ ":"
129+
+ key
130+
+ ":"
131+
+ workflowPriority.getFairnessWeight();
128132
}
129133
}
130134
}

temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2563,9 +2563,17 @@ static Priority mergePriorities(Priority parent, Priority child) {
25632563
}
25642564
Priority.Builder result = Priority.newBuilder();
25652565
result.setPriorityKey(parent.getPriorityKey());
2566+
result.setFairnessKey(child.getFairnessKey());
2567+
result.setFairnessWeight(child.getFairnessWeight());
25662568
if (child.getPriorityKey() != 0) {
25672569
result.setPriorityKey(child.getPriorityKey());
25682570
}
2571+
if (!child.getFairnessKey().isEmpty()) {
2572+
result.setFairnessKey(child.getFairnessKey());
2573+
}
2574+
if (child.getFairnessWeight() != 0) {
2575+
result.setFairnessWeight(child.getFairnessWeight());
2576+
}
25692577
return result.build();
25702578
}
25712579
}

0 commit comments

Comments
 (0)