Skip to content

Commit 75537b8

Browse files
authored
[#680]feature: workflow versioning behavior
1 parent 8891949 commit 75537b8

File tree

8 files changed

+176
-202
lines changed

8 files changed

+176
-202
lines changed

aggregatedpool/workers.go

Lines changed: 9 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -18,37 +18,8 @@ const tq = "taskqueue"
1818
func TemporalWorkers(wDef *Workflow, actDef *Activity, wi []*internal.WorkerInfo, log *zap.Logger, tc temporalClient.Client, interceptors map[string]api.Interceptor) ([]worker.Worker, error) {
1919
workers := make([]worker.Worker, 0, 1)
2020

21-
for i := 0; i < len(wi); i++ {
22-
log.Debug("worker info",
23-
zap.String(tq, wi[i].TaskQueue),
24-
zap.Any("flags", wi[i].Flags),
25-
zap.Int("num_workflows", len(wi[i].Workflows)),
26-
zap.Int("num_activities", len(wi[i].Activities)),
27-
zap.Int("max_concurrent_activity_execution_size", wi[i].Options.MaxConcurrentActivityExecutionSize),
28-
zap.Float64("worker_activities_per_second", wi[i].Options.WorkerActivitiesPerSecond),
29-
zap.Int("max_concurrent_local_activity_execution_size", wi[i].Options.MaxConcurrentLocalActivityExecutionSize),
30-
zap.Float64("worker_local_activities_per_second", wi[i].Options.WorkerLocalActivitiesPerSecond),
31-
zap.Float64("task_queue_activities_per_second", wi[i].Options.TaskQueueActivitiesPerSecond),
32-
zap.Int("max_concurrent_activity_task_pollers", wi[i].Options.MaxConcurrentActivityTaskPollers),
33-
zap.Int("max_concurrent_workflow_task_pollers", wi[i].Options.MaxConcurrentWorkflowTaskPollers),
34-
zap.Bool("enable_logging_in_replay", wi[i].Options.EnableLoggingInReplay),
35-
zap.String("workflow_panic_policy", policyToString(wi[i].Options.WorkflowPanicPolicy)),
36-
zap.Duration("worker_stop_timeout", wi[i].Options.WorkerStopTimeout),
37-
zap.Bool("enable_session_worker", wi[i].Options.EnableSessionWorker),
38-
zap.Int("max_concurrent_session_execution_size", wi[i].Options.MaxConcurrentSessionExecutionSize),
39-
zap.Bool("disable_workflow_worker", wi[i].Options.DisableWorkflowWorker),
40-
zap.Bool("local_activity_worker_only", wi[i].Options.LocalActivityWorkerOnly),
41-
zap.String("identity", wi[i].Options.Identity),
42-
zap.Duration("deadlock_detection_timeout", wi[i].Options.DeadlockDetectionTimeout),
43-
zap.Duration("max_heartbeat_throttle_interval", wi[i].Options.MaxHeartbeatThrottleInterval),
44-
zap.Duration("default_heartbeat_throttle_interval", wi[i].Options.DefaultHeartbeatThrottleInterval),
45-
zap.Bool("disable_eager_activities", wi[i].Options.DisableEagerActivities),
46-
zap.Int("max_concurrent_eager_activity_execution_size", wi[i].Options.MaxConcurrentEagerActivityExecutionSize),
47-
zap.Bool("disable_registration_aliasing", wi[i].Options.DisableRegistrationAliasing),
48-
zap.Bool("use_versioning", wi[i].Options.DeploymentOptions.UseVersioning),
49-
zap.String("build_id", wi[i].Options.DeploymentOptions.Version.BuildId),
50-
zap.String("deployment_name", wi[i].Options.DeploymentOptions.Version.DeploymentName),
51-
)
21+
for i := range wi {
22+
log.Debug("worker info", zap.Any("worker_info", wi[i]))
5223

5324
// just to be sure
5425
wi[i].Options.WorkerStopTimeout = 0
@@ -59,7 +30,7 @@ func TemporalWorkers(wDef *Workflow, actDef *Activity, wi []*internal.WorkerInfo
5930

6031
if wi[i].Options.Identity == "" {
6132
wi[i].Options.Identity = fmt.Sprintf(
62-
"%s:%s",
33+
"roadrunner:%s:%s",
6334
wi[i].TaskQueue,
6435
uuid.NewString(),
6536
)
@@ -76,15 +47,18 @@ func TemporalWorkers(wDef *Workflow, actDef *Activity, wi []*internal.WorkerInfo
7647
for j := 0; j < len(wi[i].Workflows); j++ {
7748
wrk.RegisterWorkflowWithOptions(wDef, workflow.RegisterOptions{
7849
Name: wi[i].Workflows[j].Name,
50+
VersioningBehavior: wi[i].Workflows[j].VersioningBehavior,
7951
DisableAlreadyRegisteredCheck: false,
8052
})
8153

82-
log.Debug("workflow registered", zap.String(tq, wi[i].TaskQueue), zap.Any("workflow name", wi[i].Workflows[j].Name))
54+
log.Debug("workflow registered", zap.String(tq, wi[i].TaskQueue), zap.Any("workflow name", wi[i].Workflows[j].Name), zap.Int("versioning_behavior", int(wi[i].Workflows[j].VersioningBehavior)))
8355
}
8456

8557
if actDef.disableActivityWorkers {
8658
log.Debug("activity workers disabled", zap.String(tq, wi[i].TaskQueue))
87-
goto RegisterWorkflows
59+
// add worker to the pool without activities
60+
workers = append(workers, wrk)
61+
continue
8862
}
8963

9064
for j := 0; j < len(wi[i].Activities); j++ {
@@ -96,32 +70,11 @@ func TemporalWorkers(wDef *Workflow, actDef *Activity, wi []*internal.WorkerInfo
9670

9771
log.Debug("activity registered", zap.String(tq, wi[i].TaskQueue), zap.Any("workflow name", wi[i].Activities[j].Name))
9872
}
99-
100-
RegisterWorkflows:
101-
for j := 0; j < len(wi[i].Workflows); j++ {
102-
wrk.RegisterWorkflowWithOptions(wDef, workflow.RegisterOptions{
103-
Name: wi[i].Workflows[j].Name,
104-
DisableAlreadyRegisteredCheck: false,
105-
})
106-
107-
log.Debug("workflow registered", zap.String(tq, wi[i].TaskQueue), zap.Any("workflow name", wi[i].Workflows[j].Name))
108-
}
109-
73+
// add worker to the pool
11074
workers = append(workers, wrk)
11175
}
11276

11377
log.Debug("workers initialized", zap.Int("num_workers", len(workers)))
11478

11579
return workers, nil
11680
}
117-
118-
func policyToString(enum worker.WorkflowPanicPolicy) string {
119-
switch enum {
120-
case worker.BlockWorkflow:
121-
return "block"
122-
case worker.FailWorkflow:
123-
return "fail"
124-
default:
125-
return "unknown"
126-
}
127-
}

go.mod

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
module github.com/temporalio/roadrunner-temporal/v5
22

3-
go 1.24.5
4-
5-
toolchain go1.24.6
3+
go 1.25
64

75
require (
86
github.com/goccy/go-json v0.10.5
@@ -12,14 +10,22 @@ require (
1210
github.com/roadrunner-server/errors v1.4.1
1311
github.com/roadrunner-server/events v1.0.1
1412
github.com/roadrunner-server/pool v1.1.3
15-
github.com/stretchr/testify v1.11.0
13+
github.com/stretchr/testify v1.11.1
1614
github.com/uber-go/tally/v4 v4.1.17
1715
go.temporal.io/api v1.52.0
18-
go.temporal.io/sdk v1.35.0
16+
go.temporal.io/sdk v1.36.0
1917
go.temporal.io/sdk/contrib/tally v0.2.0
2018
go.temporal.io/server v1.28.1
2119
go.uber.org/zap v1.27.0
22-
google.golang.org/protobuf v1.36.7
20+
google.golang.org/protobuf v1.36.8
21+
)
22+
23+
require (
24+
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
25+
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect
26+
go.opentelemetry.io/otel v1.38.0 // indirect
27+
go.opentelemetry.io/otel/sdk/metric v1.38.0 // indirect
28+
gopkg.in/yaml.v2 v2.4.0 // indirect
2329
)
2430

2531
replace github.com/uber-go/tally/v4 => github.com/uber-go/tally/v4 v4.1.10
@@ -33,15 +39,14 @@ require (
3339
github.com/go-ole/go-ole v1.3.0 // indirect
3440
github.com/gogo/protobuf v1.3.2 // indirect
3541
github.com/golang/mock v1.7.0-rc.1 // indirect
36-
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
37-
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect
42+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect
3843
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
3944
github.com/nexus-rpc/sdk-go v0.4.0 // indirect
4045
github.com/pkg/errors v0.9.1 // indirect
4146
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
42-
github.com/prometheus/client_golang v1.23.0
47+
github.com/prometheus/client_golang v1.23.1
4348
github.com/prometheus/client_model v0.6.2 // indirect
44-
github.com/prometheus/common v0.65.0 // indirect
49+
github.com/prometheus/common v0.66.0 // indirect
4550
github.com/prometheus/procfs v0.17.0 // indirect
4651
github.com/roadrunner-server/goridge/v3 v3.8.3
4752
github.com/robfig/cron v1.2.0 // indirect
@@ -59,8 +64,8 @@ require (
5964
golang.org/x/sys v0.35.0 // indirect
6065
golang.org/x/text v0.28.0 // indirect
6166
golang.org/x/time v0.12.0 // indirect
62-
google.golang.org/genproto/googleapis/api v0.0.0-20250811230008-5f3141c8851a // indirect
63-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250811230008-5f3141c8851a // indirect
64-
google.golang.org/grpc v1.74.2
67+
google.golang.org/genproto/googleapis/api v0.0.0-20250826171959-ef028d996bc1 // indirect
68+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1 // indirect
69+
google.golang.org/grpc v1.75.0
6570
gopkg.in/yaml.v3 v3.0.1 // indirect
6671
)

0 commit comments

Comments
 (0)