Skip to content

Commit b581ede

Browse files
Add poller autoscaling options for Springboot (#2554)
1 parent e7e3fa6 commit b581ede

File tree

13 files changed

+219
-64
lines changed

13 files changed

+219
-64
lines changed

temporal-sdk/src/main/java/io/temporal/worker/Worker.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -560,8 +560,8 @@ private static SingleWorkerOptions toActivityOptions(
560560
PollerOptions.newBuilder()
561561
.setMaximumPollRatePerSecond(options.getMaxWorkerActivitiesPerSecond())
562562
.setPollerBehavior(
563-
options.getActivityTaskPollersBehaviour() != null
564-
? options.getActivityTaskPollersBehaviour()
563+
options.getActivityTaskPollersBehavior() != null
564+
? options.getActivityTaskPollersBehavior()
565565
: new PollerBehaviorSimpleMaximum(
566566
options.getMaxConcurrentActivityTaskPollers()))
567567
.setUsingVirtualThreads(options.isUsingVirtualThreadsOnActivityWorker())
@@ -580,8 +580,8 @@ private static SingleWorkerOptions toNexusOptions(
580580
.setPollerOptions(
581581
PollerOptions.newBuilder()
582582
.setPollerBehavior(
583-
options.getNexusTaskPollersBehaviour() != null
584-
? options.getNexusTaskPollersBehaviour()
583+
options.getNexusTaskPollersBehavior() != null
584+
? options.getNexusTaskPollersBehavior()
585585
: new PollerBehaviorSimpleMaximum(
586586
options.getMaxConcurrentNexusTaskPollers()))
587587
.setUsingVirtualThreads(options.isUsingVirtualThreadsOnNexusWorker())
@@ -620,8 +620,8 @@ private static SingleWorkerOptions toWorkflowWorkerOptions(
620620
.setPollerOptions(
621621
PollerOptions.newBuilder()
622622
.setPollerBehavior(
623-
options.getWorkflowTaskPollersBehaviour() != null
624-
? options.getWorkflowTaskPollersBehaviour()
623+
options.getWorkflowTaskPollersBehavior() != null
624+
? options.getWorkflowTaskPollersBehavior()
625625
: new PollerBehaviorSimpleMaximum(maxConcurrentWorkflowTaskPollers))
626626
.setUsingVirtualThreads(options.isUsingVirtualThreadsOnWorkflowWorker())
627627
.build())

temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java

Lines changed: 61 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@ public static final class Builder {
7474
private boolean usingVirtualThreadsOnNexusWorker;
7575
private String identity;
7676
private WorkerDeploymentOptions deploymentOptions;
77-
private PollerBehavior workflowTaskPollersBehaviour;
78-
private PollerBehavior activityTaskPollersBehaviour;
79-
private PollerBehavior nexusTaskPollersBehaviour;
77+
private PollerBehavior workflowTaskPollersBehavior;
78+
private PollerBehavior activityTaskPollersBehavior;
79+
private PollerBehavior nexusTaskPollersBehavior;
8080

8181
private Builder() {}
8282

@@ -109,9 +109,9 @@ private Builder(WorkerOptions o) {
109109
this.usingVirtualThreadsOnLocalActivityWorker = o.usingVirtualThreadsOnLocalActivityWorker;
110110
this.usingVirtualThreadsOnNexusWorker = o.usingVirtualThreadsOnNexusWorker;
111111
this.deploymentOptions = o.deploymentOptions;
112-
this.workflowTaskPollersBehaviour = o.workflowTaskPollersBehaviour;
113-
this.activityTaskPollersBehaviour = o.activityTaskPollersBehaviour;
114-
this.nexusTaskPollersBehaviour = o.nexusTaskPollersBehaviour;
112+
this.workflowTaskPollersBehavior = o.workflowTaskPollersBehavior;
113+
this.activityTaskPollersBehavior = o.activityTaskPollersBehavior;
114+
this.nexusTaskPollersBehavior = o.nexusTaskPollersBehavior;
115115
}
116116

117117
/**
@@ -500,22 +500,22 @@ public Builder setDeploymentOptions(WorkerDeploymentOptions deploymentOptions) {
500500
* well.
501501
*/
502502
@Experimental
503-
public Builder setWorkflowTaskPollersBehaviour(PollerBehavior pollerBehavior) {
504-
this.workflowTaskPollersBehaviour = pollerBehavior;
503+
public Builder setWorkflowTaskPollersBehavior(PollerBehavior pollerBehavior) {
504+
this.workflowTaskPollersBehavior = pollerBehavior;
505505
return this;
506506
}
507507

508508
/** Set the poller behavior for activity task pollers. */
509509
@Experimental
510-
public Builder setActivityTaskPollersBehaviour(PollerBehavior pollerBehavior) {
511-
this.activityTaskPollersBehaviour = pollerBehavior;
510+
public Builder setActivityTaskPollersBehavior(PollerBehavior pollerBehavior) {
511+
this.activityTaskPollersBehavior = pollerBehavior;
512512
return this;
513513
}
514514

515515
/** Set the poller behavior for nexus task pollers. */
516516
@Experimental
517-
public Builder setNexusTaskPollersBehaviour(PollerBehavior pollerBehavior) {
518-
this.nexusTaskPollersBehaviour = pollerBehavior;
517+
public Builder setNexusTaskPollersBehavior(PollerBehavior pollerBehavior) {
518+
this.nexusTaskPollersBehavior = pollerBehavior;
519519
return this;
520520
}
521521

@@ -546,9 +546,9 @@ public WorkerOptions build() {
546546
usingVirtualThreadsOnLocalActivityWorker,
547547
usingVirtualThreadsOnNexusWorker,
548548
deploymentOptions,
549-
workflowTaskPollersBehaviour,
550-
activityTaskPollersBehaviour,
551-
nexusTaskPollersBehaviour);
549+
workflowTaskPollersBehavior,
550+
activityTaskPollersBehavior,
551+
nexusTaskPollersBehavior);
552552
}
553553

554554
public WorkerOptions validateAndBuildWithDefaults() {
@@ -608,6 +608,22 @@ public WorkerOptions validateAndBuildWithDefaults() {
608608
Preconditions.checkState(
609609
maxConcurrentNexusTaskPollers >= 0, "negative maxConcurrentNexusTaskPollers");
610610

611+
if (workflowTaskPollersBehavior != null) {
612+
Preconditions.checkState(
613+
maxConcurrentWorkflowTaskPollers == 0,
614+
"workflowTaskPollersBehavior and maxConcurrentWorkflowTaskPollers are mutually exclusive");
615+
}
616+
if (activityTaskPollersBehavior != null) {
617+
Preconditions.checkState(
618+
maxConcurrentActivityTaskPollers == 0,
619+
"activityTaskPollersBehavior and maxConcurrentActivityTaskPollers are mutually exclusive");
620+
}
621+
if (nexusTaskPollersBehavior != null) {
622+
Preconditions.checkState(
623+
maxConcurrentNexusTaskPollers == 0,
624+
"nexusTaskPollersBehavior and maxConcurrentNexusTaskPollers are mutually exclusive");
625+
}
626+
611627
return new WorkerOptions(
612628
maxWorkerActivitiesPerSecond,
613629
maxConcurrentActivityExecutionSize == 0
@@ -658,9 +674,9 @@ public WorkerOptions validateAndBuildWithDefaults() {
658674
usingVirtualThreadsOnLocalActivityWorker,
659675
usingVirtualThreadsOnNexusWorker,
660676
deploymentOptions,
661-
workflowTaskPollersBehaviour,
662-
activityTaskPollersBehaviour,
663-
nexusTaskPollersBehaviour);
677+
workflowTaskPollersBehavior,
678+
activityTaskPollersBehavior,
679+
nexusTaskPollersBehavior);
664680
}
665681
}
666682

@@ -689,9 +705,9 @@ public WorkerOptions validateAndBuildWithDefaults() {
689705
private final boolean usingVirtualThreadsOnLocalActivityWorker;
690706
private final boolean usingVirtualThreadsOnNexusWorker;
691707
private final WorkerDeploymentOptions deploymentOptions;
692-
private PollerBehavior workflowTaskPollersBehaviour;
693-
private PollerBehavior activityTaskPollersBehaviour;
694-
private PollerBehavior nexusTaskPollersBehaviour;
708+
private final PollerBehavior workflowTaskPollersBehavior;
709+
private final PollerBehavior activityTaskPollersBehavior;
710+
private final PollerBehavior nexusTaskPollersBehavior;
695711

696712
private WorkerOptions(
697713
double maxWorkerActivitiesPerSecond,
@@ -719,9 +735,9 @@ private WorkerOptions(
719735
boolean virtualThreadsEnabledOnLocalActivityWorker,
720736
boolean virtualThreadsEnabledOnNexusWorker,
721737
WorkerDeploymentOptions deploymentOptions,
722-
PollerBehavior workflowTaskPollersBehaviour,
723-
PollerBehavior activityTaskPollersBehaviour,
724-
PollerBehavior nexusTaskPollersBehaviour) {
738+
PollerBehavior workflowTaskPollersBehavior,
739+
PollerBehavior activityTaskPollersBehavior,
740+
PollerBehavior nexusTaskPollersBehavior) {
725741
this.maxWorkerActivitiesPerSecond = maxWorkerActivitiesPerSecond;
726742
this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize;
727743
this.maxConcurrentWorkflowTaskExecutionSize = maxConcurrentWorkflowTaskExecutionSize;
@@ -747,9 +763,9 @@ private WorkerOptions(
747763
this.usingVirtualThreadsOnLocalActivityWorker = virtualThreadsEnabledOnLocalActivityWorker;
748764
this.usingVirtualThreadsOnNexusWorker = virtualThreadsEnabledOnNexusWorker;
749765
this.deploymentOptions = deploymentOptions;
750-
this.workflowTaskPollersBehaviour = workflowTaskPollersBehaviour;
751-
this.activityTaskPollersBehaviour = activityTaskPollersBehaviour;
752-
this.nexusTaskPollersBehaviour = nexusTaskPollersBehaviour;
766+
this.workflowTaskPollersBehavior = workflowTaskPollersBehavior;
767+
this.activityTaskPollersBehavior = activityTaskPollersBehavior;
768+
this.nexusTaskPollersBehavior = nexusTaskPollersBehavior;
753769
}
754770

755771
public double getMaxWorkerActivitiesPerSecond() {
@@ -870,16 +886,16 @@ public WorkerDeploymentOptions getDeploymentOptions() {
870886
return deploymentOptions;
871887
}
872888

873-
public PollerBehavior getWorkflowTaskPollersBehaviour() {
874-
return workflowTaskPollersBehaviour;
889+
public PollerBehavior getWorkflowTaskPollersBehavior() {
890+
return workflowTaskPollersBehavior;
875891
}
876892

877-
public PollerBehavior getActivityTaskPollersBehaviour() {
878-
return activityTaskPollersBehaviour;
893+
public PollerBehavior getActivityTaskPollersBehavior() {
894+
return activityTaskPollersBehavior;
879895
}
880896

881-
public PollerBehavior getNexusTaskPollersBehaviour() {
882-
return nexusTaskPollersBehaviour;
897+
public PollerBehavior getNexusTaskPollersBehavior() {
898+
return nexusTaskPollersBehavior;
883899
}
884900

885901
@Override
@@ -912,9 +928,9 @@ && compare(maxTaskQueueActivitiesPerSecond, that.maxTaskQueueActivitiesPerSecond
912928
&& usingVirtualThreadsOnLocalActivityWorker == that.usingVirtualThreadsOnLocalActivityWorker
913929
&& usingVirtualThreadsOnNexusWorker == that.usingVirtualThreadsOnNexusWorker
914930
&& Objects.equals(deploymentOptions, that.deploymentOptions)
915-
&& Objects.equals(workflowTaskPollersBehaviour, that.workflowTaskPollersBehaviour)
916-
&& Objects.equals(activityTaskPollersBehaviour, that.activityTaskPollersBehaviour)
917-
&& Objects.equals(nexusTaskPollersBehaviour, that.nexusTaskPollersBehaviour);
931+
&& Objects.equals(workflowTaskPollersBehavior, that.workflowTaskPollersBehavior)
932+
&& Objects.equals(activityTaskPollersBehavior, that.activityTaskPollersBehavior)
933+
&& Objects.equals(nexusTaskPollersBehavior, that.nexusTaskPollersBehavior);
918934
}
919935

920936
@Override
@@ -945,9 +961,9 @@ public int hashCode() {
945961
usingVirtualThreadsOnLocalActivityWorker,
946962
usingVirtualThreadsOnNexusWorker,
947963
deploymentOptions,
948-
workflowTaskPollersBehaviour,
949-
activityTaskPollersBehaviour,
950-
nexusTaskPollersBehaviour);
964+
workflowTaskPollersBehavior,
965+
activityTaskPollersBehavior,
966+
nexusTaskPollersBehavior);
951967
}
952968

953969
@Override
@@ -1004,12 +1020,12 @@ public String toString() {
10041020
+ usingVirtualThreadsOnNexusWorker
10051021
+ ", deploymentOptions="
10061022
+ deploymentOptions
1007-
+ ", workflowTaskPollersBehaviour="
1008-
+ workflowTaskPollersBehaviour
1009-
+ ", activityTaskPollersBehaviour="
1010-
+ activityTaskPollersBehaviour
1011-
+ ", nexusTaskPollersBehaviour="
1012-
+ nexusTaskPollersBehaviour
1023+
+ ", workflowTaskPollersBehavior="
1024+
+ workflowTaskPollersBehavior
1025+
+ ", activityTaskPollersBehavior="
1026+
+ activityTaskPollersBehavior
1027+
+ ", nexusTaskPollersBehavior="
1028+
+ nexusTaskPollersBehavior
10131029
+ '}';
10141030
}
10151031
}

temporal-sdk/src/main/java/io/temporal/worker/tuning/PollerBehaviorAutoscaling.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.temporal.worker.tuning;
22

3+
import java.util.Objects;
4+
35
/**
46
* A poller behavior that will automatically scale the number of pollers based on feedback from the
57
* server. A slot must be available before beginning polling.
@@ -66,4 +68,31 @@ public int getMaxConcurrentTaskPollers() {
6668
public int getInitialMaxConcurrentTaskPollers() {
6769
return initialConcurrentTaskPollers;
6870
}
71+
72+
@Override
73+
public boolean equals(Object o) {
74+
if (o == null || getClass() != o.getClass()) return false;
75+
PollerBehaviorAutoscaling that = (PollerBehaviorAutoscaling) o;
76+
return minConcurrentTaskPollers == that.minConcurrentTaskPollers
77+
&& maxConcurrentTaskPollers == that.maxConcurrentTaskPollers
78+
&& initialConcurrentTaskPollers == that.initialConcurrentTaskPollers;
79+
}
80+
81+
@Override
82+
public int hashCode() {
83+
return Objects.hash(
84+
minConcurrentTaskPollers, maxConcurrentTaskPollers, initialConcurrentTaskPollers);
85+
}
86+
87+
@Override
88+
public String toString() {
89+
return "PollerBehaviorAutoscaling{"
90+
+ "minConcurrentTaskPollers="
91+
+ minConcurrentTaskPollers
92+
+ ", maxConcurrentTaskPollers="
93+
+ maxConcurrentTaskPollers
94+
+ ", initialConcurrentTaskPollers="
95+
+ initialConcurrentTaskPollers
96+
+ '}';
97+
}
6998
}

temporal-sdk/src/main/java/io/temporal/worker/tuning/PollerBehaviorSimpleMaximum.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.temporal.worker.tuning;
22

3+
import java.util.Objects;
4+
35
/**
46
* A poller behavior that will attempt to poll as long as a slot is available, up to the provided
57
* maximum. Cannot be less than two for workflow tasks, or one for other tasks.
@@ -28,4 +30,24 @@ public PollerBehaviorSimpleMaximum(int maxConcurrentTaskPollers) {
2830
public int getMaxConcurrentTaskPollers() {
2931
return maxConcurrentTaskPollers;
3032
}
33+
34+
@Override
35+
public boolean equals(Object o) {
36+
if (o == null || getClass() != o.getClass()) return false;
37+
PollerBehaviorSimpleMaximum that = (PollerBehaviorSimpleMaximum) o;
38+
return maxConcurrentTaskPollers == that.maxConcurrentTaskPollers;
39+
}
40+
41+
@Override
42+
public int hashCode() {
43+
return Objects.hashCode(maxConcurrentTaskPollers);
44+
}
45+
46+
@Override
47+
public String toString() {
48+
return "PollerBehaviorSimpleMaximum{"
49+
+ "maxConcurrentTaskPollers="
50+
+ maxConcurrentTaskPollers
51+
+ '}';
52+
}
3153
}

temporal-sdk/src/test/java/io/temporal/worker/PollerAutoScaleTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ public class PollerAutoScaleTests {
2020
SDKTestWorkflowRule.newBuilder()
2121
.setWorkerOptions(
2222
WorkerOptions.newBuilder()
23-
.setWorkflowTaskPollersBehaviour(new PollerBehaviorAutoscaling(1, 10, 5))
24-
.setActivityTaskPollersBehaviour(new PollerBehaviorAutoscaling(1, 10, 5))
23+
.setWorkflowTaskPollersBehavior(new PollerBehaviorAutoscaling(1, 10, 5))
24+
.setActivityTaskPollersBehavior(new PollerBehaviorAutoscaling(1, 10, 5))
2525
.build())
2626
.setActivityImplementations(new ResourceBasedTunerTests.ActivitiesImpl())
2727
.setWorkflowTypes(ResourceBasedTunerTests.ResourceTunerWorkflowImpl.class)

temporal-sdk/src/test/java/io/temporal/worker/shutdown/CleanActivityWorkerShutdownTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public CleanActivityWorkerShutdownTest(PollerBehavior pollerBehaviorAutoscaling)
5555
.setWorkflowTypes(TestWorkflowImpl.class)
5656
.setWorkerOptions(
5757
WorkerOptions.newBuilder()
58-
.setActivityTaskPollersBehaviour(pollerBehaviorAutoscaling)
58+
.setActivityTaskPollersBehavior(pollerBehaviorAutoscaling)
5959
.build())
6060
.setActivityImplementations(activitiesImpl)
6161
.build();

temporal-sdk/src/test/java/io/temporal/worker/shutdown/CleanNexusWorkerShutdownTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public CleanNexusWorkerShutdownTest(PollerBehavior pollerBehaviorAutoscaling) {
6060
.setWorkerOptions(
6161
WorkerOptions.newBuilder()
6262
.setLocalActivityWorkerOnly(true)
63-
.setNexusTaskPollersBehaviour(pollerBehaviorAutoscaling)
63+
.setNexusTaskPollersBehavior(pollerBehaviorAutoscaling)
6464
.build())
6565
.setNexusServiceImplementation(nexusServiceImpl)
6666
.build();

temporal-sdk/src/test/java/io/temporal/worker/shutdown/StickyWorkflowDrainShutdownTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public StickyWorkflowDrainShutdownTest(PollerBehavior pollerBehaviorAutoscaling)
4141
.setUseTimeskipping(false)
4242
.setWorkerOptions(
4343
WorkerOptions.newBuilder()
44-
.setWorkflowTaskPollersBehaviour(pollerBehaviorAutoscaling)
44+
.setWorkflowTaskPollersBehavior(pollerBehaviorAutoscaling)
4545
.setStickyTaskQueueDrainTimeout(DRAIN_TIME)
4646
.build())
4747
.setWorkflowServiceStubsOptions(

temporal-sdk/src/test/java/io/temporal/workflow/MetricsTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,8 @@ public void testWorkerMetricsAutoPoller() throws InterruptedException {
198198

199199
WorkerOptions workerOptions =
200200
WorkerOptions.newBuilder()
201-
.setWorkflowTaskPollersBehaviour(new PollerBehaviorAutoscaling(5, 5, 5))
202-
.setActivityTaskPollersBehaviour(new PollerBehaviorAutoscaling(5, 5, 5))
201+
.setWorkflowTaskPollersBehavior(new PollerBehaviorAutoscaling(5, 5, 5))
202+
.setActivityTaskPollersBehavior(new PollerBehaviorAutoscaling(5, 5, 5))
203203
.build();
204204
Worker worker = testEnvironment.newWorker(TASK_QUEUE, workerOptions);
205205
worker.registerWorkflowImplementationTypes(

0 commit comments

Comments
 (0)