Skip to content

Commit bcf3280

Browse files
authored
Update Workflow/Activity priority (#8396)
## What changed? Allow updating priority of Workflow and Activity. Based on - temporalio/api#610 - #8103 ## Why? Users want to change the priority after starting the workflow/activity. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) - [x] added new functional test(s)
1 parent eb979f6 commit bcf3280

File tree

24 files changed

+504
-61
lines changed

24 files changed

+504
-61
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ require (
5858
go.opentelemetry.io/otel/sdk v1.34.0
5959
go.opentelemetry.io/otel/sdk/metric v1.34.0
6060
go.opentelemetry.io/otel/trace v1.34.0
61-
go.temporal.io/api v1.58.1-0.20251126231839-2fcd2247e106
61+
go.temporal.io/api v1.58.1-0.20251128181858-703071215042
6262
go.temporal.io/sdk v1.35.0
6363
go.uber.org/fx v1.24.0
6464
go.uber.org/mock v0.6.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,8 +390,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
390390
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
391391
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
392392
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
393-
go.temporal.io/api v1.58.1-0.20251126231839-2fcd2247e106 h1:V2H8rfBDapmWpIsNDWZLOS95WIIWAgPXnG7gpNrWO5Y=
394-
go.temporal.io/api v1.58.1-0.20251126231839-2fcd2247e106/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
393+
go.temporal.io/api v1.58.1-0.20251128181858-703071215042 h1:44+nPe+rGhYUwA1oDi46rkXEYEVfoAxOmb0myvTm4Es=
394+
go.temporal.io/api v1.58.1-0.20251128181858-703071215042/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
395395
go.temporal.io/sdk v1.35.0 h1:lRNAQ5As9rLgYa7HBvnmKyzxLcdElTuoFJ0FXM/AsLQ=
396396
go.temporal.io/sdk v1.35.0/go.mod h1:1q5MuLc2MEJ4lneZTHJzpVebW2oZnyxoIOWX3oFVebw=
397397
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=

service/frontend/workflow_handler.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5950,6 +5950,9 @@ func (wh *WorkflowHandler) UpdateWorkflowExecutionOptions(
59505950
if err != nil {
59515951
return nil, serviceerror.NewInvalidArgumentf("error parsing UpdateMask: %s", err.Error())
59525952
}
5953+
if err := priorities.Validate(opts.GetPriority()); err != nil {
5954+
return nil, err
5955+
}
59535956

59545957
namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
59555958
if err != nil {
@@ -5987,6 +5990,9 @@ func (wh *WorkflowHandler) UpdateActivityOptions(
59875990
if request.GetActivity() == nil {
59885991
return nil, errActivityIDOrTypeNotSet
59895992
}
5993+
if err := priorities.Validate(request.GetActivityOptions().GetPriority()); err != nil {
5994+
return nil, err
5995+
}
59905996

59915997
namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
59925998
if err != nil {

service/frontend/workflow_handler_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/stretchr/testify/mock"
1515
"github.com/stretchr/testify/require"
1616
"github.com/stretchr/testify/suite"
17+
activitypb "go.temporal.io/api/activity/v1"
1718
batchpb "go.temporal.io/api/batch/v1"
1819
commonpb "go.temporal.io/api/common/v1"
1920
enumspb "go.temporal.io/api/enums/v1"
@@ -914,6 +915,31 @@ func (s *WorkflowHandlerSuite) TestStartWorkflowExecution_Failed_InvalidAggregat
914915
s.ErrorContains(err, "cannot attach more than 10 links per request, got 11")
915916
}
916917

918+
func (s *WorkflowHandlerSuite) TestStartWorkflowExecution_Priority() {
919+
config := s.newConfig()
920+
wh := s.getWorkflowHandler(config)
921+
922+
s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(gomock.Any()).Return(nil, nil).AnyTimes()
923+
924+
request := &workflowservice.StartWorkflowExecutionRequest{
925+
Namespace: s.testNamespace.String(),
926+
WorkflowId: "workflow-id",
927+
WorkflowType: &commonpb.WorkflowType{
928+
Name: "workflow-type",
929+
},
930+
TaskQueue: &taskqueuepb.TaskQueue{
931+
Name: "task-queue",
932+
},
933+
Priority: &commonpb.Priority{PriorityKey: -1},
934+
}
935+
936+
_, err := wh.StartWorkflowExecution(context.Background(), request)
937+
var invalidArg *serviceerror.InvalidArgument
938+
s.ErrorAs(err, &invalidArg)
939+
s.ErrorContains(err, "priority key can't be negative")
940+
// NOTE: only testing a single validation scenario here; the priority validation has its own unit tests
941+
}
942+
917943
func (s *WorkflowHandlerSuite) TestSignalWithStartWorkflowExecution_InvalidWorkflowIdConflictPolicy() {
918944
config := s.newConfig()
919945
wh := s.getWorkflowHandler(config)
@@ -1011,6 +1037,32 @@ func (s *WorkflowHandlerSuite) TestSignalWithStartWorkflowExecution_Failed_Inval
10111037
s.ErrorContains(err, "link exceeds allowed size of 4000")
10121038
}
10131039

1040+
func (s *WorkflowHandlerSuite) TestSignalWithStartWorkflowExecution_Priority() {
1041+
config := s.newConfig()
1042+
wh := s.getWorkflowHandler(config)
1043+
1044+
s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(gomock.Any()).Return(nil, nil).AnyTimes()
1045+
1046+
request := &workflowservice.SignalWithStartWorkflowExecutionRequest{
1047+
Namespace: s.testNamespace.String(),
1048+
WorkflowId: "workflow-id",
1049+
WorkflowType: &commonpb.WorkflowType{
1050+
Name: "workflow-type",
1051+
},
1052+
TaskQueue: &taskqueuepb.TaskQueue{
1053+
Name: "task-queue",
1054+
},
1055+
SignalName: "signal-name",
1056+
Priority: &commonpb.Priority{PriorityKey: -1},
1057+
}
1058+
1059+
_, err := wh.SignalWithStartWorkflowExecution(context.Background(), request)
1060+
var invalidArg *serviceerror.InvalidArgument
1061+
s.ErrorAs(err, &invalidArg)
1062+
s.ErrorContains(err, "priority key can't be negative")
1063+
// NOTE: only testing a single validation scenario here; the priority validation has its own unit tests
1064+
}
1065+
10141066
func (s *WorkflowHandlerSuite) TestSignalWorkflowExecution_Failed_InvalidLinks() {
10151067
s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(gomock.Any()).AnyTimes().Return(nil, nil)
10161068
config := s.newConfig()
@@ -3958,3 +4010,52 @@ func (s *WorkflowHandlerSuite) TestUpdateTaskQueueConfig_Validation() {
39584010
s.NotNil(resp)
39594011
})
39604012
}
4013+
4014+
func (s *WorkflowHandlerSuite) TestUpdateWorkflowExecutionOptions_Priority() {
4015+
config := s.newConfig()
4016+
wh := s.getWorkflowHandler(config)
4017+
4018+
request := &workflowservice.UpdateWorkflowExecutionOptionsRequest{
4019+
Namespace: s.testNamespace.String(),
4020+
WorkflowExecution: &commonpb.WorkflowExecution{
4021+
WorkflowId: "workflow-id",
4022+
RunId: "run-id",
4023+
},
4024+
WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{
4025+
Priority: &commonpb.Priority{PriorityKey: -1},
4026+
},
4027+
UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"priority"}},
4028+
}
4029+
4030+
_, err := wh.UpdateWorkflowExecutionOptions(context.Background(), request)
4031+
var invalidArg *serviceerror.InvalidArgument
4032+
s.ErrorAs(err, &invalidArg)
4033+
s.ErrorContains(err, "priority key can't be negative")
4034+
// NOTE: only testing a single validation scenario here; the priority validation has its own unit tests
4035+
}
4036+
4037+
func (s *WorkflowHandlerSuite) TestUpdateActivityOptions_Priority() {
4038+
config := s.newConfig()
4039+
wh := s.getWorkflowHandler(config)
4040+
4041+
request := &workflowservice.UpdateActivityOptionsRequest{
4042+
Namespace: s.testNamespace.String(),
4043+
Execution: &commonpb.WorkflowExecution{
4044+
WorkflowId: "workflow-id",
4045+
RunId: "run-id",
4046+
},
4047+
Activity: &workflowservice.UpdateActivityOptionsRequest_Id{
4048+
Id: "activity-id",
4049+
},
4050+
ActivityOptions: &activitypb.ActivityOptions{
4051+
Priority: &commonpb.Priority{PriorityKey: -1},
4052+
},
4053+
UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"priority"}},
4054+
}
4055+
4056+
_, err := wh.UpdateActivityOptions(context.Background(), request)
4057+
var invalidArg *serviceerror.InvalidArgument
4058+
s.ErrorAs(err, &invalidArg)
4059+
s.ErrorContains(err, "priority key can't be negative")
4060+
// NOTE: only testing a single validation scenario here; the priority validation has its own unit tests
4061+
}

service/history/api/recordactivitytaskstarted/api.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,12 +165,11 @@ func recordActivityTaskStarted(
165165
}
166166

167167
if ai.Stamp != request.Stamp {
168-
// activity has changes before task is started.
169-
// ErrActivityStampMismatch is the error to indicate that requested activity has mismatched stamp
168+
// This happens when the workflow task was rescheduled.
170169
errorMessage := fmt.Sprintf(
171-
"Activity task with this stamp not found. Id: %s,: type: %s, current stamp: %d",
170+
"Activity task rejected; stamp has changed. Id: %s,: type: %s, current stamp: %d",
172171
ai.ActivityId, ai.ActivityType.Name, ai.Stamp)
173-
return nil, rejectCodeUndefined, serviceerror.NewNotFound(errorMessage)
172+
return nil, rejectCodeUndefined, serviceerrors.NewObsoleteMatchingTask(errorMessage)
174173
}
175174

176175
wfBehavior := mutableState.GetEffectiveVersioningBehavior()

service/history/api/startworkflow/api.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -670,7 +670,8 @@ func (s *Starter) handleUseExistingWorkflowOnConflictOptions(
670670
requestID,
671671
completionCallbacks,
672672
links,
673-
"",
673+
"", // identity
674+
nil, // priority
674675
)
675676
return api.UpdateWorkflowWithoutWorkflowTask, err
676677
},

service/history/api/updateactivityoptions/api.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"go.temporal.io/api/workflowservice/v1"
1414
"go.temporal.io/server/api/historyservice/v1"
1515
persistencespb "go.temporal.io/server/api/persistence/v1"
16+
"go.temporal.io/server/common"
1617
"go.temporal.io/server/common/definition"
1718
"go.temporal.io/server/common/namespace"
1819
"go.temporal.io/server/common/util"
@@ -148,6 +149,7 @@ func processActivityOptionsUpdate(
148149
ScheduleToStartTimeout: ai.ScheduleToStartTimeout,
149150
StartToCloseTimeout: ai.StartToCloseTimeout,
150151
HeartbeatTimeout: ai.HeartbeatTimeout,
152+
Priority: common.CloneProto(ai.Priority),
151153
RetryPolicy: &commonpb.RetryPolicy{
152154
BackoffCoefficient: ai.RetryBackoffCoefficient,
153155
InitialInterval: ai.RetryInitialInterval,
@@ -202,10 +204,48 @@ func mergeActivityOptions(
202204
mergeInto.HeartbeatTimeout = mergeFrom.HeartbeatTimeout
203205
}
204206

207+
if _, ok := updateFields["priority"]; ok {
208+
mergeInto.Priority = mergeFrom.Priority
209+
}
210+
211+
if _, ok := updateFields["priority.priorityKey"]; ok {
212+
if mergeFrom.Priority == nil {
213+
return serviceerror.NewInvalidArgument("Priority is not provided")
214+
}
215+
if mergeInto.Priority == nil {
216+
mergeInto.Priority = &commonpb.Priority{}
217+
}
218+
mergeInto.Priority.PriorityKey = mergeFrom.Priority.PriorityKey
219+
}
220+
221+
if _, ok := updateFields["priority.fairnessKey"]; ok {
222+
if mergeFrom.Priority == nil {
223+
return serviceerror.NewInvalidArgument("Priority is not provided")
224+
}
225+
if mergeInto.Priority == nil {
226+
mergeInto.Priority = &commonpb.Priority{}
227+
}
228+
mergeInto.Priority.FairnessKey = mergeFrom.Priority.FairnessKey
229+
}
230+
231+
if _, ok := updateFields["priority.fairnessWeight"]; ok {
232+
if mergeFrom.Priority == nil {
233+
return serviceerror.NewInvalidArgument("Priority is not provided")
234+
}
235+
if mergeInto.Priority == nil {
236+
mergeInto.Priority = &commonpb.Priority{}
237+
}
238+
mergeInto.Priority.FairnessWeight = mergeFrom.Priority.FairnessWeight
239+
}
240+
205241
if mergeInto.RetryPolicy == nil {
206242
mergeInto.RetryPolicy = &commonpb.RetryPolicy{}
207243
}
208244

245+
if _, ok := updateFields["retryPolicy"]; ok {
246+
mergeInto.RetryPolicy = mergeFrom.RetryPolicy
247+
}
248+
209249
if _, ok := updateFields["retryPolicy.initialInterval"]; ok {
210250
if mergeFrom.RetryPolicy == nil {
211251
return serviceerror.NewInvalidArgument("RetryPolicy is not provided")
@@ -295,6 +335,7 @@ func updateActivityOptions(
295335
activityInfo.ScheduleToStartTimeout = activityOptions.ScheduleToStartTimeout
296336
activityInfo.StartToCloseTimeout = activityOptions.StartToCloseTimeout
297337
activityInfo.HeartbeatTimeout = activityOptions.HeartbeatTimeout
338+
activityInfo.Priority = activityOptions.Priority
298339
activityInfo.RetryMaximumInterval = activityOptions.RetryPolicy.MaximumInterval
299340
activityInfo.RetryBackoffCoefficient = activityOptions.RetryPolicy.BackoffCoefficient
300341
activityInfo.RetryInitialInterval = activityOptions.RetryPolicy.InitialInterval
@@ -375,6 +416,7 @@ func restoreOriginalOptions(
375416
ScheduleToStartTimeout: originalOptions.ScheduleToStartTimeout,
376417
StartToCloseTimeout: originalOptions.StartToCloseTimeout,
377418
HeartbeatTimeout: originalOptions.HeartbeatTimeout,
419+
Priority: originalOptions.Priority,
378420
RetryPolicy: originalOptions.RetryPolicy,
379421
}
380422

0 commit comments

Comments
 (0)