Skip to content

Commit 0968a50

Browse files
authored
Pass AllowNoPollers flag to SetCurrentVersion and SetRampingVersion (#2067)
* Pass AllowNoPollers flag to SetCurrentVersion and SetRampingVersion * test allow-no-pollers flag
1 parent f88bdd3 commit 0968a50

File tree

3 files changed

+100
-0
lines changed

3 files changed

+100
-0
lines changed

internal/internal_worker_deployment_client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ func (h *workerDeploymentHandleImpl) SetCurrentVersion(ctx context.Context, opti
213213
ConflictToken: options.ConflictToken,
214214
Identity: identity,
215215
IgnoreMissingTaskQueues: options.IgnoreMissingTaskQueues,
216+
AllowNoPollers: options.AllowNoPollers,
216217
}
217218
grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
218219
defer cancel()
@@ -252,6 +253,7 @@ func (h *workerDeploymentHandleImpl) SetRampingVersion(ctx context.Context, opti
252253
ConflictToken: options.ConflictToken,
253254
Identity: identity,
254255
IgnoreMissingTaskQueues: options.IgnoreMissingTaskQueues,
256+
AllowNoPollers: options.AllowNoPollers,
255257
}
256258
grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
257259
defer cancel()

internal/worker_deployment_client.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,15 @@ type (
158158
//
159159
// Optional: default to reject request when queues are missing.
160160
IgnoreMissingTaskQueues bool
161+
162+
// AllowNoPollers - Override protection against accidentally sending tasks to a version without pollers.
163+
// When false this request will be rejected if no pollers have been seen for the proposed Current Version,
164+
// in order to protect users from routing tasks to pollers that do not exist, leading to possible timeouts.
165+
// Pass `true` here to bypass this protection.
166+
// WARNING: setting this flag could lead to tasks being sent to a version that has no pollers.
167+
//
168+
// Optional: default to reject request when version has never had pollers.
169+
AllowNoPollers bool
161170
}
162171

163172
// WorkerDeploymentSetCurrentVersionResponse is the response for
@@ -216,6 +225,15 @@ type (
216225
//
217226
// Optional: default to reject request when queues are missing.
218227
IgnoreMissingTaskQueues bool
228+
229+
// AllowNoPollers - Override protection against accidentally sending tasks to a version without pollers.
230+
// When false this request will be rejected if no pollers have been seen for the proposed Current Version,
231+
// in order to protect users from routing tasks to pollers that do not exist, leading to possible timeouts.
232+
// Pass `true` here to bypass this protection.
233+
// WARNING: setting this flag could lead to tasks being sent to a version that has no pollers.
234+
//
235+
// Optional: default to reject request when version has never had pollers.
236+
AllowNoPollers bool
219237
}
220238

221239
// WorkerDeploymentSetRampingVersionResponse is the response for

test/worker_deployment_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -993,6 +993,47 @@ func (ts *WorkerDeploymentTestSuite) TestRampVersions() {
993993
}, 10*time.Second, 300*time.Millisecond)
994994
}
995995

996+
func (ts *WorkerDeploymentTestSuite) TestRampVersion_AllowNoPollers() {
997+
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
998+
ts.T().Skip("temporal server 1.27+ required")
999+
}
1000+
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
1001+
defer cancel()
1002+
1003+
deploymentName := "deploy-test-" + uuid.NewString()
1004+
v1 := worker.WorkerDeploymentVersion{
1005+
DeploymentName: deploymentName,
1006+
BuildID: "1.0",
1007+
}
1008+
1009+
dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName)
1010+
1011+
// Setting Ramp without the AllowNoPollers flag fails when there are no pollers
1012+
_, err := dHandle.SetRampingVersion(ctx, client.WorkerDeploymentSetRampingVersionOptions{
1013+
BuildID: v1.BuildID,
1014+
ConflictToken: nil,
1015+
Percentage: float32(100.0),
1016+
})
1017+
ts.Error(err)
1018+
1019+
// Setting Ramp with the AllowNoPollers flag succeeds when there are no pollers
1020+
response1, err := dHandle.SetRampingVersion(ctx, client.WorkerDeploymentSetRampingVersionOptions{
1021+
BuildID: v1.BuildID,
1022+
ConflictToken: nil,
1023+
Percentage: float32(100.0),
1024+
AllowNoPollers: true,
1025+
})
1026+
ts.NoError(err)
1027+
ts.Nil(response1.PreviousVersion)
1028+
1029+
// Verify RoutingConfig is as expected
1030+
response2, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
1031+
ts.NoError(err)
1032+
ts.Equal(v1.BuildID, response2.Info.RoutingConfig.RampingVersion.BuildID)
1033+
ts.Equal(float32(100.0), response2.Info.RoutingConfig.RampingVersionPercentage)
1034+
ts.Nil(response2.Info.RoutingConfig.CurrentVersion)
1035+
}
1036+
9961037
func (ts *WorkerDeploymentTestSuite) TestSetManagerIdentity() {
9971038
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
9981039
ts.T().Skip("temporal server 1.27+ required")
@@ -1078,6 +1119,45 @@ func (ts *WorkerDeploymentTestSuite) TestSetManagerIdentity() {
10781119
ts.Error(err)
10791120
}
10801121

1122+
func (ts *WorkerDeploymentTestSuite) TestCurrentVersion_AllowNoPollers() {
1123+
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
1124+
ts.T().Skip("temporal server 1.27+ required")
1125+
}
1126+
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
1127+
defer cancel()
1128+
1129+
deploymentName := "deploy-test-" + uuid.NewString()
1130+
v1 := worker.WorkerDeploymentVersion{
1131+
DeploymentName: deploymentName,
1132+
BuildID: "1.0",
1133+
}
1134+
1135+
dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName)
1136+
1137+
// Setting Current without the AllowNoPollers flag fails when there are no pollers
1138+
_, err := dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{
1139+
BuildID: v1.BuildID,
1140+
ConflictToken: nil,
1141+
})
1142+
ts.Error(err)
1143+
1144+
// Setting Current with the AllowNoPollers flag succeeds when there are no pollers
1145+
response1, err := dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{
1146+
BuildID: v1.BuildID,
1147+
ConflictToken: nil,
1148+
AllowNoPollers: true,
1149+
})
1150+
ts.NoError(err)
1151+
ts.Nil(response1.PreviousVersion)
1152+
1153+
// Verify RoutingConfig is as expected
1154+
response2, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
1155+
ts.NoError(err)
1156+
ts.Equal(v1.BuildID, response2.Info.RoutingConfig.CurrentVersion.BuildID)
1157+
ts.Equal(float32(0), response2.Info.RoutingConfig.RampingVersionPercentage)
1158+
ts.Nil(response2.Info.RoutingConfig.RampingVersion)
1159+
}
1160+
10811161
func (ts *WorkerDeploymentTestSuite) TestDeleteDeployment() {
10821162
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
10831163
ts.T().Skip("temporal server 1.27+ required")

0 commit comments

Comments
 (0)