Skip to content

Commit c8b28a0

Browse files
committed
Add fairness keys/weights
1 parent 5b2856a commit c8b28a0

File tree

3 files changed

+263
-10
lines changed

3 files changed

+263
-10
lines changed

temporal-sdk/src/main/java/io/temporal/common/Priority.java

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,16 @@ public static Priority getDefaultInstance() {
3131

3232
public static final class Builder {
3333
private int priorityKey;
34+
private String fairnessKey;
35+
private float fairnessWeight;
3436

3537
private Builder(Priority options) {
3638
if (options == null) {
3739
return;
3840
}
3941
this.priorityKey = options.getPriorityKey();
42+
this.fairnessKey = options.getFairnessKey();
43+
this.fairnessWeight = options.getFairnessWeight();
4044
}
4145

4246
/**
@@ -55,16 +59,44 @@ public Builder setPriorityKey(int priorityKey) {
5559
return this;
5660
}
5761

62+
/**
63+
* A fairness key is a short string used for balancing task dispatch. Tasks with the same
64+
* fairness key will be processed proportionally according to their fairness weight.
65+
*
66+
* <p>If not set, inherits from the parent workflow or uses an empty string if there is no
67+
* parent.
68+
*/
69+
public Builder setFairnessKey(String fairnessKey) {
70+
this.fairnessKey = fairnessKey;
71+
return this;
72+
}
73+
74+
/**
75+
* A fairness weight determines the relative proportion of task processing for a given fairness
76+
* key. The weight should be a positive number. A higher weight means more tasks will be
77+
* processed for that fairness key.
78+
*
79+
* <p>If not set or 0, defaults to 1.0. If there is a parent workflow, inherits from the parent.
80+
*/
81+
public Builder setFairnessWeight(float fairnessWeight) {
82+
this.fairnessWeight = fairnessWeight;
83+
return this;
84+
}
85+
5886
public Priority build() {
59-
return new Priority(priorityKey);
87+
return new Priority(priorityKey, fairnessKey, fairnessWeight);
6088
}
6189
}
6290

63-
private Priority(int priorityKey) {
91+
private Priority(int priorityKey, String fairnessKey, float fairnessWeight) {
6492
this.priorityKey = priorityKey;
93+
this.fairnessKey = fairnessKey;
94+
this.fairnessWeight = fairnessWeight;
6595
}
6696

6797
private final int priorityKey;
98+
private final String fairnessKey;
99+
private final float fairnessWeight;
68100

69101
/**
70102
* See {@link Builder#setPriorityKey(int)}
@@ -75,20 +107,48 @@ public int getPriorityKey() {
75107
return priorityKey;
76108
}
77109

110+
/**
111+
* See {@link Builder#setFairnessKey(String)}
112+
*
113+
* @return The fairness key
114+
*/
115+
public String getFairnessKey() {
116+
return fairnessKey;
117+
}
118+
119+
/**
120+
* See {@link Builder#setFairnessWeight(float)}
121+
*
122+
* @return The fairness weight
123+
*/
124+
public float getFairnessWeight() {
125+
return fairnessWeight;
126+
}
127+
78128
@Override
79129
public String toString() {
80-
return "Priority{" + "priorityKey=" + priorityKey + '}';
130+
return "Priority{"
131+
+ "priorityKey="
132+
+ priorityKey
133+
+ ", fairnessKey='"
134+
+ fairnessKey
135+
+ '\''
136+
+ ", fairnessWeight="
137+
+ fairnessWeight
138+
+ '}';
81139
}
82140

83141
@Override
84142
public boolean equals(Object o) {
85143
if (o == null || getClass() != o.getClass()) return false;
86144
Priority priority = (Priority) o;
87-
return priorityKey == priority.priorityKey;
145+
return priorityKey == priority.priorityKey
146+
&& Float.compare(priority.fairnessWeight, fairnessWeight) == 0
147+
&& Objects.equals(fairnessKey, priority.fairnessKey);
88148
}
89149

90150
@Override
91151
public int hashCode() {
92-
return Objects.hashCode(priorityKey);
152+
return Objects.hash(priorityKey, fairnessKey, fairnessWeight);
93153
}
94154
}

temporal-sdk/src/main/java/io/temporal/internal/common/ProtoConverters.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,27 @@
88

99
public class ProtoConverters {
1010
public static Priority toProto(io.temporal.common.Priority priority) {
11-
return Priority.newBuilder().setPriorityKey(priority.getPriorityKey()).build();
11+
Priority.Builder builder = Priority.newBuilder().setPriorityKey(priority.getPriorityKey());
12+
if (priority.getFairnessKey() != null) {
13+
builder.setFairnessKey(priority.getFairnessKey());
14+
}
15+
if (priority.getFairnessWeight() != 0.0f) {
16+
builder.setFairnessWeight(priority.getFairnessWeight());
17+
}
18+
return builder.build();
1219
}
1320

1421
@Nonnull
1522
public static io.temporal.common.Priority fromProto(@Nonnull Priority priority) {
16-
return io.temporal.common.Priority.newBuilder()
17-
.setPriorityKey(priority.getPriorityKey())
18-
.build();
23+
io.temporal.common.Priority.Builder builder =
24+
io.temporal.common.Priority.newBuilder().setPriorityKey(priority.getPriorityKey());
25+
if (!priority.getFairnessKey().isEmpty()) {
26+
builder.setFairnessKey(priority.getFairnessKey());
27+
}
28+
if (priority.getFairnessWeight() != 0.0f) {
29+
builder.setFairnessWeight(priority.getFairnessWeight());
30+
}
31+
return builder.build();
1932
}
2033

2134
public static io.temporal.api.deployment.v1.WorkerDeploymentVersion toProto(

temporal-sdk/src/test/java/io/temporal/workflow/PriorityInfoTest.java

Lines changed: 181 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,15 @@ public class PriorityInfoTest {
2020
@Rule
2121
public SDKTestWorkflowRule testWorkflowRule =
2222
SDKTestWorkflowRule.newBuilder()
23-
.setWorkflowTypes(TestPriority.class, TestPriorityChildWorkflow.class)
23+
.setWorkflowTypes(
24+
TestPriority.class,
25+
TestPriorityChildWorkflow.class,
26+
TestFairnessKeyChildWorkflow.class,
27+
TestFairnessWeightChildWorkflow.class,
28+
TestFairnessKeyAndWeightChildWorkflow.class,
29+
TestFairnessKeyWorkflow.class,
30+
TestFairnessWeightWorkflow.class,
31+
TestFairnessKeyAndWeightWorkflow.class)
2432
.setActivityImplementations(new PriorityActivitiesImpl())
2533
.build();
2634

@@ -39,9 +47,64 @@ public void testPriority() {
3947
assertEquals("5", result);
4048
}
4149

50+
@Test
51+
public void testFairnessKey() {
52+
TestWorkflow1 workflowStub =
53+
testWorkflowRule
54+
.getWorkflowClient()
55+
.newWorkflowStub(
56+
TestWorkflow1.class,
57+
WorkflowOptions.newBuilder()
58+
.setTaskQueue(testWorkflowRule.getTaskQueue())
59+
.setPriority(Priority.newBuilder().setFairnessKey("tenant-123").build())
60+
.build());
61+
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
62+
assertEquals("tenant-123", result);
63+
}
64+
65+
@Test
66+
public void testFairnessWeight() {
67+
TestWorkflow1 workflowStub =
68+
testWorkflowRule
69+
.getWorkflowClient()
70+
.newWorkflowStub(
71+
TestWorkflow1.class,
72+
WorkflowOptions.newBuilder()
73+
.setTaskQueue(testWorkflowRule.getTaskQueue())
74+
.setPriority(Priority.newBuilder().setFairnessWeight(2.5f).build())
75+
.build());
76+
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
77+
assertEquals("2.5", result);
78+
}
79+
80+
@Test
81+
public void testFairnessKeyAndWeight() {
82+
TestWorkflow1 workflowStub =
83+
testWorkflowRule
84+
.getWorkflowClient()
85+
.newWorkflowStub(
86+
TestWorkflow1.class,
87+
WorkflowOptions.newBuilder()
88+
.setTaskQueue(testWorkflowRule.getTaskQueue())
89+
.setPriority(
90+
Priority.newBuilder()
91+
.setFairnessKey("tenant-456")
92+
.setFairnessWeight(1.5f)
93+
.build())
94+
.build());
95+
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
96+
assertEquals("tenant-456:1.5", result);
97+
}
98+
4299
@ActivityInterface
43100
public interface PriorityActivities {
44101
String activity1(String a1);
102+
103+
String getFairnessKey();
104+
105+
String getFairnessWeight();
106+
107+
String getFairnessKeyAndWeight();
45108
}
46109

47110
public static class PriorityActivitiesImpl implements PriorityActivities {
@@ -50,6 +113,25 @@ public String activity1(String a1) {
50113
return String.valueOf(
51114
Activity.getExecutionContext().getInfo().getPriority().getPriorityKey());
52115
}
116+
117+
@Override
118+
public String getFairnessKey() {
119+
String fairnessKey = Activity.getExecutionContext().getInfo().getPriority().getFairnessKey();
120+
return fairnessKey != null ? fairnessKey : "null";
121+
}
122+
123+
@Override
124+
public String getFairnessWeight() {
125+
return String.valueOf(
126+
Activity.getExecutionContext().getInfo().getPriority().getFairnessWeight());
127+
}
128+
129+
@Override
130+
public String getFairnessKeyAndWeight() {
131+
Priority priority = Activity.getExecutionContext().getInfo().getPriority();
132+
String key = priority.getFairnessKey() != null ? priority.getFairnessKey() : "null";
133+
return key + ":" + priority.getFairnessWeight();
134+
}
53135
}
54136

55137
public static class TestPriorityChildWorkflow implements TestWorkflows.TestWorkflowReturnString {
@@ -59,6 +141,33 @@ public String execute() {
59141
}
60142
}
61143

144+
public static class TestFairnessKeyChildWorkflow
145+
implements TestWorkflows.TestWorkflowReturnString {
146+
@Override
147+
public String execute() {
148+
String fairnessKey = Workflow.getInfo().getPriority().getFairnessKey();
149+
return fairnessKey != null ? fairnessKey : "null";
150+
}
151+
}
152+
153+
public static class TestFairnessWeightChildWorkflow
154+
implements TestWorkflows.TestWorkflowReturnString {
155+
@Override
156+
public String execute() {
157+
return String.valueOf(Workflow.getInfo().getPriority().getFairnessWeight());
158+
}
159+
}
160+
161+
public static class TestFairnessKeyAndWeightChildWorkflow
162+
implements TestWorkflows.TestWorkflowReturnString {
163+
@Override
164+
public String execute() {
165+
Priority priority = Workflow.getInfo().getPriority();
166+
String key = priority.getFairnessKey() != null ? priority.getFairnessKey() : "null";
167+
return key + ":" + priority.getFairnessWeight();
168+
}
169+
}
170+
62171
public static class TestPriority implements TestWorkflow1 {
63172

64173
@Override
@@ -127,4 +236,75 @@ public String execute(String taskQueue) {
127236
return String.valueOf(Workflow.getInfo().getPriority().getPriorityKey());
128237
}
129238
}
239+
240+
public static class TestFairnessKeyWorkflow implements TestWorkflow1 {
241+
@Override
242+
public String execute(String taskQueue) {
243+
// Test fairness key inheritance in activities
244+
String fairnessKey =
245+
Workflow.newActivityStub(
246+
PriorityActivities.class,
247+
ActivityOptions.newBuilder()
248+
.setTaskQueue(taskQueue)
249+
.setStartToCloseTimeout(Duration.ofSeconds(10))
250+
.setDisableEagerExecution(true)
251+
.build())
252+
.getFairnessKey();
253+
Assert.assertEquals("tenant-123", fairnessKey);
254+
255+
// Return the workflow's fairness key
256+
String workflowFairnessKey = Workflow.getInfo().getPriority().getFairnessKey();
257+
return workflowFairnessKey != null ? workflowFairnessKey : "null";
258+
}
259+
}
260+
261+
public static class TestFairnessWeightWorkflow implements TestWorkflow1 {
262+
@Override
263+
public String execute(String taskQueue) {
264+
// Test fairness weight inheritance in activities
265+
String fairnessWeight =
266+
Workflow.newActivityStub(
267+
PriorityActivities.class,
268+
ActivityOptions.newBuilder()
269+
.setTaskQueue(taskQueue)
270+
.setStartToCloseTimeout(Duration.ofSeconds(10))
271+
.setDisableEagerExecution(true)
272+
.build())
273+
.getFairnessWeight();
274+
Assert.assertEquals("2.5", fairnessWeight);
275+
276+
// Return the workflow's fairness weight
277+
return String.valueOf(Workflow.getInfo().getPriority().getFairnessWeight());
278+
}
279+
}
280+
281+
public static class TestFairnessKeyAndWeightWorkflow implements TestWorkflow1 {
282+
@Override
283+
public String execute(String taskQueue) {
284+
// Test fairness key and weight inheritance in activities
285+
String fairnessKeyAndWeight =
286+
Workflow.newActivityStub(
287+
PriorityActivities.class,
288+
ActivityOptions.newBuilder()
289+
.setTaskQueue(taskQueue)
290+
.setStartToCloseTimeout(Duration.ofSeconds(10))
291+
.setDisableEagerExecution(true)
292+
.build())
293+
.getFairnessKeyAndWeight();
294+
Assert.assertEquals("tenant-456:1.5", fairnessKeyAndWeight);
295+
296+
// Test child workflow inheritance
297+
String childResult =
298+
Workflow.newChildWorkflowStub(
299+
TestWorkflows.TestWorkflowReturnString.class,
300+
ChildWorkflowOptions.newBuilder().build())
301+
.execute();
302+
Assert.assertEquals("tenant-456:1.5", childResult);
303+
304+
// Return the workflow's fairness key and weight
305+
Priority priority = Workflow.getInfo().getPriority();
306+
String key = priority.getFairnessKey() != null ? priority.getFairnessKey() : "null";
307+
return key + ":" + priority.getFairnessWeight();
308+
}
309+
}
130310
}

0 commit comments

Comments
 (0)