Skip to content

Commit f88bdd3

Browse files
authored
Implement SetWorkerDeploymentManagerIdentity (#2068)
1 parent 8d43143 commit f88bdd3

File tree

5 files changed

+201
-9
lines changed

5 files changed

+201
-9
lines changed

client/client.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,18 @@ type (
389389
// NOTE: Experimental
390390
WorkerDeploymentSetRampingVersionResponse = internal.WorkerDeploymentSetRampingVersionResponse
391391

392+
// WorkerDeploymentSetManagerIdentityOptions provides options for
393+
// [WorkerDeploymentHandle.SetManagerIdentity].
394+
//
395+
// NOTE: Experimental
396+
WorkerDeploymentSetManagerIdentityOptions = internal.WorkerDeploymentSetManagerIdentityOptions
397+
398+
// WorkerDeploymentSetManagerIdentityResponse is the response for
399+
// [WorkerDeploymentHandle.SetManagerIdentity].
400+
//
401+
// NOTE: Experimental
402+
WorkerDeploymentSetManagerIdentityResponse = internal.WorkerDeploymentSetManagerIdentityResponse
403+
392404
// WorkerDeploymentDescribeVersionOptions provides options for
393405
// [WorkerDeploymentHandle.DescribeVersion].
394406
//

internal/cmd/build/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (b *builder) integrationTest() error {
121121
if *devServerFlag {
122122
devServer, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{
123123
CachedDownload: testsuite.CachedDownload{
124-
Version: "v1.4.1-cloud-v1-29-0-139-2.0",
124+
Version: "v1.5.0-rc",
125125
},
126126
ClientOptions: &client.Options{
127127
HostPort: "127.0.0.1:7233",

internal/internal_worker_deployment_client.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,48 @@ func (h *workerDeploymentHandleImpl) SetRampingVersion(ctx context.Context, opti
271271

272272
}
273273

274+
func (h *workerDeploymentHandleImpl) SetManagerIdentity(ctx context.Context, options WorkerDeploymentSetManagerIdentityOptions) (WorkerDeploymentSetManagerIdentityResponse, error) {
275+
if err := h.validate(); err != nil {
276+
return WorkerDeploymentSetManagerIdentityResponse{}, err
277+
}
278+
if err := h.workflowClient.ensureInitialized(ctx); err != nil {
279+
return WorkerDeploymentSetManagerIdentityResponse{}, err
280+
}
281+
282+
identity := h.workflowClient.identity
283+
if options.Identity != "" {
284+
identity = options.Identity
285+
}
286+
287+
request := &workflowservice.SetWorkerDeploymentManagerRequest{
288+
Namespace: h.workflowClient.namespace,
289+
DeploymentName: h.Name,
290+
ConflictToken: options.ConflictToken,
291+
Identity: identity,
292+
}
293+
if options.Self {
294+
if options.ManagerIdentity != "" {
295+
return WorkerDeploymentSetManagerIdentityResponse{}, fmt.Errorf("invalid input: if Self is true, ManagerIdentity must be empty but was '%s'", options.ManagerIdentity)
296+
}
297+
request.NewManagerIdentity = &workflowservice.SetWorkerDeploymentManagerRequest_Self{Self: true}
298+
} else {
299+
request.NewManagerIdentity = &workflowservice.SetWorkerDeploymentManagerRequest_ManagerIdentity{ManagerIdentity: options.ManagerIdentity}
300+
}
301+
grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
302+
defer cancel()
303+
304+
resp, err := h.workflowClient.workflowService.SetWorkerDeploymentManager(grpcCtx, request)
305+
if err != nil {
306+
return WorkerDeploymentSetManagerIdentityResponse{}, err
307+
}
308+
309+
return WorkerDeploymentSetManagerIdentityResponse{
310+
ConflictToken: resp.GetConflictToken(),
311+
PreviousManagerIdentity: resp.GetPreviousManagerIdentity(),
312+
}, nil
313+
314+
}
315+
274316
func workerDeploymentTaskQueuesInfosFromProto(tqInfos []*deployment.WorkerDeploymentVersionInfo_VersionTaskQueueInfo) []WorkerDeploymentTaskQueueInfo {
275317
result := []WorkerDeploymentTaskQueueInfo{}
276318
for _, info := range tqInfos {

internal/worker_deployment_client.go

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,12 @@ type (
138138
// The current token can be obtained with [WorkerDeploymentHandle.Describe],
139139
// or returned by other successful Worker Deployment operations.
140140
//
141-
// Optional: defaulted to empty token, which bypasses conflict detection.
141+
// Optional: defaults to empty token, which bypasses conflict detection.
142142
ConflictToken []byte
143143

144-
// Identity: The identity of the client who initiated this request.
144+
// Identity - The identity of the client who initiated this request.
145145
//
146-
// Optional: default to the identity of the underlying workflow client.
146+
// Optional: defaults to the identity of the underlying workflow client.
147147
Identity string
148148

149149
// IgnoreMissingTaskQueues - Override protection against accidental removal of Task Queues.
@@ -196,12 +196,12 @@ type (
196196
// The current token can be obtained with [WorkerDeploymentHandle.Describe],
197197
// or returned by other successful Worker Deployment operations.
198198
//
199-
// Optional: defaulted to empty token, which bypasses conflict detection.
199+
// Optional: defaults to empty token, which bypasses conflict detection.
200200
ConflictToken []byte
201201

202-
// Identity: The identity of the client who initiated this request.
202+
// Identity - The identity of the client who initiated this request.
203203
//
204-
// Optional: default to the identity of the underlying workflow client.
204+
// Optional: defaults to the identity of the underlying workflow client.
205205
Identity string
206206

207207
// IgnoreMissingTaskQueues - Override protection against accidental removal of Task Queues.
@@ -235,6 +235,54 @@ type (
235235
PreviousPercentage float32
236236
}
237237

238+
// WorkerDeploymentSetManagerIdentityOptions provides options for
239+
// [WorkerDeploymentHandle.SetManagerIdentity].
240+
//
241+
// NOTE: Experimental
242+
//
243+
// Exposed as: [go.temporal.io/sdk/client.WorkerDeploymentSetManagerIdentityOptions]
244+
WorkerDeploymentSetManagerIdentityOptions struct {
245+
// ManagerIdentity - string to set as the Worker Deployment's ManagerIdentity.
246+
// An empty string will clear the ManagerIdentity field.
247+
// It is invalid to set Self=true and ManagerIdentity != "".
248+
ManagerIdentity string
249+
250+
// Self - If true, set the Worker Deployment's ManagerIdentity field to the identity
251+
// of the user submitting this request.
252+
// It is invalid to set Self=true and ManagerIdentity != "".
253+
Self bool
254+
255+
// ConflictToken - Token to serialize Worker Deployment operations. Passing a non-empty
256+
// conflict token will cause this request to fail with
257+
// `serviceerror.FailedPrecondition` if the
258+
// Deployment's configuration has been modified between the API call that
259+
// generated the token and this one.
260+
// The current token can be obtained with [WorkerDeploymentHandle.Describe],
261+
// or returned by other successful Worker Deployment operations.
262+
//
263+
// Optional: defaults to empty token, which bypasses conflict detection.
264+
ConflictToken []byte
265+
266+
// Identity - The identity of the client who initiated this request.
267+
//
268+
// Optional: defaults to the identity of the underlying workflow client.
269+
Identity string
270+
}
271+
272+
// WorkerDeploymentSetManagerIdentityResponse is the response for
273+
// [WorkerDeploymentHandle.SetManagerIdentity].
274+
//
275+
// NOTE: Experimental
276+
//
277+
// Exposed as: [go.temporal.io/sdk/client.WorkerDeploymentSetManagerIdentityResponse]
278+
WorkerDeploymentSetManagerIdentityResponse struct {
279+
// ConflictToken - Token to serialize Worker Deployment operations.
280+
ConflictToken []byte
281+
282+
// PreviousManagerIdentity - The Manager Identity before executing this operation, if any.
283+
PreviousManagerIdentity string
284+
}
285+
238286
// WorkerDeploymentDescribeVersionOptions provides options for
239287
// [WorkerDeploymentHandle.DescribeVersion].
240288
//
@@ -344,7 +392,7 @@ type (
344392

345393
// Identity - The identity of the client who initiated this request.
346394
//
347-
// Optional: default to the identity of the underlying workflow client.
395+
// Optional: defaults to the identity of the underlying workflow client.
348396
Identity string
349397
}
350398

@@ -422,6 +470,11 @@ type (
422470
// NOTE: Experimental
423471
SetRampingVersion(ctx context.Context, options WorkerDeploymentSetRampingVersionOptions) (WorkerDeploymentSetRampingVersionResponse, error)
424472

473+
// SetManagerIdentity changes the Manager Identity of this Worker Deployment.
474+
//
475+
// NOTE: Experimental
476+
SetManagerIdentity(ctx context.Context, options WorkerDeploymentSetManagerIdentityOptions) (WorkerDeploymentSetManagerIdentityResponse, error)
477+
425478
// DescribeVersion gives a description of one the Versions in this Worker Deployment.
426479
//
427480
// NOTE: Experimental
@@ -537,7 +590,7 @@ type (
537590

538591
// Identity - The identity of the client who initiated this request.
539592
//
540-
// Optional: default to the identity of the underlying workflow client.
593+
// Optional: defaults to the identity of the underlying workflow client.
541594
Identity string
542595
}
543596

test/worker_deployment_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -993,6 +993,91 @@ func (ts *WorkerDeploymentTestSuite) TestRampVersions() {
993993
}, 10*time.Second, 300*time.Millisecond)
994994
}
995995

996+
func (ts *WorkerDeploymentTestSuite) TestSetManagerIdentity() {
997+
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
998+
ts.T().Skip("temporal server 1.27+ required")
999+
}
1000+
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
1001+
defer cancel()
1002+
1003+
deploymentName := "deploy-test-" + uuid.NewString()
1004+
v1 := worker.WorkerDeploymentVersion{
1005+
DeploymentName: deploymentName,
1006+
BuildID: "1.0",
1007+
}
1008+
1009+
worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{
1010+
DeploymentOptions: worker.DeploymentOptions{
1011+
UseVersioning: true,
1012+
Version: v1,
1013+
},
1014+
})
1015+
worker1.RegisterWorkflowWithOptions(ts.workflows.WaitSignalToStartVersionedOne, workflow.RegisterOptions{
1016+
Name: "WaitSignalToStartVersioned",
1017+
VersioningBehavior: workflow.VersioningBehaviorPinned,
1018+
})
1019+
1020+
ts.NoError(worker1.Start())
1021+
defer worker1.Stop()
1022+
1023+
dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName)
1024+
1025+
ts.waitForWorkerDeployment(ctx, dHandle)
1026+
1027+
// Check that initial manager identity is empty
1028+
response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
1029+
ts.NoError(err)
1030+
ts.Equal("", response1.Info.ManagerIdentity)
1031+
1032+
// Set arbitrary ManagerIdentity
1033+
response2, err := dHandle.SetManagerIdentity(ctx, client.WorkerDeploymentSetManagerIdentityOptions{
1034+
ManagerIdentity: "foo",
1035+
ConflictToken: response1.ConflictToken,
1036+
})
1037+
ts.NoError(err)
1038+
ts.Equal("", response2.PreviousManagerIdentity)
1039+
1040+
// Check that manager identity is set to foo
1041+
response3, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
1042+
ts.NoError(err)
1043+
ts.Equal("foo", response3.Info.ManagerIdentity)
1044+
1045+
// Set self as ManagerIdentity
1046+
response4, err := dHandle.SetManagerIdentity(ctx, client.WorkerDeploymentSetManagerIdentityOptions{
1047+
Self: true,
1048+
Identity: "my-identity",
1049+
ConflictToken: response3.ConflictToken,
1050+
})
1051+
ts.NoError(err)
1052+
ts.Equal("foo", response4.PreviousManagerIdentity)
1053+
1054+
// Check that manager identity is set to self
1055+
response5, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
1056+
ts.NoError(err)
1057+
ts.Equal("my-identity", response5.Info.ManagerIdentity)
1058+
1059+
// Unset ManagerIdentity
1060+
response6, err := dHandle.SetManagerIdentity(ctx, client.WorkerDeploymentSetManagerIdentityOptions{
1061+
ManagerIdentity: "",
1062+
ConflictToken: response5.ConflictToken,
1063+
})
1064+
ts.NoError(err)
1065+
ts.Equal("my-identity", response6.PreviousManagerIdentity)
1066+
1067+
// Check that manager identity is empty
1068+
response7, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
1069+
ts.NoError(err)
1070+
ts.Equal("", response7.Info.ManagerIdentity)
1071+
1072+
// Invalid set ManagerIdentity
1073+
_, err = dHandle.SetManagerIdentity(ctx, client.WorkerDeploymentSetManagerIdentityOptions{
1074+
ManagerIdentity: "foo",
1075+
Self: true,
1076+
ConflictToken: response7.ConflictToken,
1077+
})
1078+
ts.Error(err)
1079+
}
1080+
9961081
func (ts *WorkerDeploymentTestSuite) TestDeleteDeployment() {
9971082
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
9981083
ts.T().Skip("temporal server 1.27+ required")

0 commit comments

Comments
 (0)