From 693d85b49d8761fca3947f9942950cee853770de Mon Sep 17 00:00:00 2001 From: salahayoub Date: Tue, 26 Aug 2025 18:07:37 +0200 Subject: [PATCH] Add workflow versioning support --- registry/errors.go | 16 ++++ registry/registry.go | 80 ++++++++++++++++++- registry/registry_test.go | 162 ++++++++++++++++++++++++++++++++++++++ worker/worker.go | 5 ++ worker/worker_test.go | 85 ++++++++++++++++++++ 5 files changed, 346 insertions(+), 2 deletions(-) create mode 100644 worker/worker_test.go diff --git a/registry/errors.go b/registry/errors.go index 04833e21..7dd03186 100644 --- a/registry/errors.go +++ b/registry/errors.go @@ -31,3 +31,19 @@ type ErrActivityAlreadyRegistered struct { func (e *ErrActivityAlreadyRegistered) Error() string { return e.msg } + +type ErrVersionedWorkflowAlreadyRegistered struct { + msg string +} + +func (e *ErrVersionedWorkflowAlreadyRegistered) Error() string { + return e.msg +} + +type ErrWorkflowVersionNotFound struct { + msg string +} + +func (e *ErrWorkflowVersionNotFound) Error() string { + return e.msg +} diff --git a/registry/registry.go b/registry/registry.go index 38bb35e3..e02776e7 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -17,13 +17,17 @@ type Registry struct { workflowMap map[string]any activityMap map[string]any + + // Versioned workflow storage: workflowName -> version -> implementation + versionedWorkflowMap map[string]map[string]any } // New creates a new registry instance. func New() *Registry { return &Registry{ - workflowMap: make(map[string]any), - activityMap: make(map[string]any), + workflowMap: make(map[string]any), + activityMap: make(map[string]any), + versionedWorkflowMap: make(map[string]map[string]any), } } @@ -76,6 +80,63 @@ func (r *Registry) RegisterWorkflow(workflow any, opts ...RegisterOption) error return nil } +// RegisterVersionedWorkflow registers a workflow with a specific version identifier. +func (r *Registry) RegisterVersionedWorkflow(workflow any, version string, opts ...RegisterOption) error { + cfg := registerOptions(opts).applyRegisterOptions(registerConfig{}) + name := cfg.Name + if name == "" { + name = fn.Name(workflow) + } + + if version == "" { + return &ErrInvalidWorkflow{"version cannot be empty"} + } + + wfType := reflect.TypeOf(workflow) + if wfType.Kind() != reflect.Func { + return &ErrInvalidWorkflow{"workflow is not a function"} + } + + if wfType.NumIn() == 0 { + return &ErrInvalidWorkflow{"workflow does not accept context parameter"} + } + + if !args.IsOwnContext(wfType.In(0)) { + return &ErrInvalidWorkflow{"workflow does not accept context as first parameter"} + } + + if wfType.NumOut() == 0 { + return &ErrInvalidWorkflow{"workflow must return error"} + } + + if wfType.NumOut() > 2 { + return &ErrInvalidWorkflow{"workflow must return at most two values"} + } + + errType := reflect.TypeOf((*error)(nil)).Elem() + if (wfType.NumOut() == 1 && !wfType.Out(0).Implements(errType)) || + (wfType.NumOut() == 2 && !wfType.Out(1).Implements(errType)) { + return &ErrInvalidWorkflow{"workflow must return error as last return value"} + } + + r.Lock() + defer r.Unlock() + + // Initialize workflow name map if it doesn't exist + if r.versionedWorkflowMap[name] == nil { + r.versionedWorkflowMap[name] = make(map[string]any) + } + + // Check if this version already exists + if _, ok := r.versionedWorkflowMap[name][version]; ok { + return &ErrVersionedWorkflowAlreadyRegistered{fmt.Sprintf("workflow %q version %q already registered", name, version)} + } + + r.versionedWorkflowMap[name][version] = workflow + + return nil +} + func (r *Registry) RegisterActivity(activity any, opts ...RegisterOption) error { cfg := registerOptions(opts).applyRegisterOptions(registerConfig{}) @@ -163,6 +224,21 @@ func (r *Registry) GetWorkflow(name string) (any, error) { return nil, errors.New("workflow not found") } +// GetVersionedWorkflow retrieves a specific version of a workflow. +func (r *Registry) GetVersionedWorkflow(name, version string) (any, error) { + r.Lock() + defer r.Unlock() + + if versionMap, ok := r.versionedWorkflowMap[name]; ok { + if workflow, ok := versionMap[version]; ok { + return workflow, nil + } + return nil, &ErrWorkflowVersionNotFound{fmt.Sprintf("workflow %q version %q not found", name, version)} + } + + return nil, &ErrWorkflowVersionNotFound{fmt.Sprintf("workflow %q not found", name)} +} + func (r *Registry) GetActivity(name string) (interface{}, error) { r.Lock() defer r.Unlock() diff --git a/registry/registry_test.go b/registry/registry_test.go index aa52d590..0260c009 100644 --- a/registry/registry_test.go +++ b/registry/registry_test.go @@ -234,3 +234,165 @@ func Test_ActivityRegistrationOnStruct_Invalid(t *testing.T) { err := r.RegisterActivity(a) require.Error(t, err) } +func reg_workflow_v1(ctx sync.Context) error { + return nil +} + +func reg_workflow_v2(ctx sync.Context) (string, error) { + return "v2", nil +} + +func TestRegistry_RegisterVersionedWorkflow(t *testing.T) { + tests := []struct { + name string + workflow any + version string + workflowName string + wantErr bool + }{ + { + name: "valid versioned workflow", + workflow: reg_workflow_v1, + version: "1.0.0", + }, + { + name: "valid versioned workflow with custom name", + workflow: reg_workflow_v1, + version: "1.0.0", + workflowName: "CustomWorkflow", + }, + { + name: "valid versioned workflow with results", + workflow: reg_workflow_v2, + version: "2.0.0", + }, + { + name: "empty version", + workflow: reg_workflow_v1, + version: "", + wantErr: true, + }, + { + name: "invalid workflow - not a function", + workflow: "not a function", + version: "1.0.0", + wantErr: true, + }, + { + name: "invalid workflow - missing context", + workflow: func() error { return nil }, + version: "1.0.0", + wantErr: true, + }, + { + name: "invalid workflow - missing error return", + workflow: func(ctx sync.Context) {}, + version: "1.0.0", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := New() + + var opts []RegisterOption + if tt.workflowName != "" { + opts = append(opts, WithName(tt.workflowName)) + } + + err := r.RegisterVersionedWorkflow(tt.workflow, tt.version, opts...) + + if (err != nil) != tt.wantErr { + t.Errorf("Registry.RegisterVersionedWorkflow() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if !tt.wantErr { + expectedName := tt.workflowName + if expectedName == "" { + expectedName = fn.Name(tt.workflow) + } + + workflow, err := r.GetVersionedWorkflow(expectedName, tt.version) + require.NoError(t, err) + require.NotNil(t, workflow) + } + }) + } +} + +func TestRegistry_RegisterVersionedWorkflow_Conflict(t *testing.T) { + r := New() + + // Register first version + err := r.RegisterVersionedWorkflow(reg_workflow_v1, "1.0.0") + require.NoError(t, err) + + // Try to register same version again - should fail + err = r.RegisterVersionedWorkflow(reg_workflow_v1, "1.0.0") + var wantErr *ErrVersionedWorkflowAlreadyRegistered + require.ErrorAs(t, err, &wantErr) + + // Register different version - should succeed + err = r.RegisterVersionedWorkflow(reg_workflow_v2, "2.0.0") + require.NoError(t, err) + + // Register same workflow with custom name and same version - should succeed + err = r.RegisterVersionedWorkflow(reg_workflow_v1, "1.0.0", WithName("CustomWorkflow")) + require.NoError(t, err) + + // Try to register same custom name and version again - should fail + err = r.RegisterVersionedWorkflow(reg_workflow_v1, "1.0.0", WithName("CustomWorkflow")) + require.ErrorAs(t, err, &wantErr) +} + +func TestRegistry_GetVersionedWorkflow(t *testing.T) { + r := New() + + // Register multiple versions of the same workflow + err := r.RegisterVersionedWorkflow(reg_workflow_v1, "1.0.0") + require.NoError(t, err) + + err = r.RegisterVersionedWorkflow(reg_workflow_v2, "2.0.0") + require.NoError(t, err) + + // Test successful retrieval + workflow1, err := r.GetVersionedWorkflow("reg_workflow_v1", "1.0.0") + require.NoError(t, err) + require.NotNil(t, workflow1) + + workflow2, err := r.GetVersionedWorkflow("reg_workflow_v2", "2.0.0") + require.NoError(t, err) + require.NotNil(t, workflow2) + + // Test workflow not found + _, err = r.GetVersionedWorkflow("nonexistent", "1.0.0") + var notFoundErr *ErrWorkflowVersionNotFound + require.ErrorAs(t, err, ¬FoundErr) + + // Test version not found + _, err = r.GetVersionedWorkflow("reg_workflow_v1", "3.0.0") + require.ErrorAs(t, err, ¬FoundErr) +} + +func TestRegistry_VersionedWorkflow_BackwardCompatibility(t *testing.T) { + r := New() + + // Register regular workflow + err := r.RegisterWorkflow(reg_workflow1) + require.NoError(t, err) + + // Register versioned workflow with same name + err = r.RegisterVersionedWorkflow(reg_workflow_v1, "1.0.0") + require.NoError(t, err) + + // Both should be retrievable independently + regularWorkflow, err := r.GetWorkflow("reg_workflow1") + require.NoError(t, err) + require.NotNil(t, regularWorkflow) + + versionedWorkflow, err := r.GetVersionedWorkflow("reg_workflow_v1", "1.0.0") + require.NoError(t, err) + require.NotNil(t, versionedWorkflow) +} \ No newline at end of file diff --git a/worker/worker.go b/worker/worker.go index d0433f8d..16f5223d 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -145,6 +145,11 @@ func (w *Worker) RegisterWorkflow(wf workflow.Workflow, opts ...registry.Registe return w.registry.RegisterWorkflow(wf, opts...) } +// RegisterWorkflowVersion registers a workflow with a specific version identifier. +func (w *Worker) RegisterWorkflowVersion(wf workflow.Workflow, version string, opts ...registry.RegisterOption) error { + return w.registry.RegisterVersionedWorkflow(wf, version, opts...) +} + // RegisterActivity registers an activity with the worker's registry. func (w *Worker) RegisterActivity(a workflow.Activity, opts ...registry.RegisterOption) error { return w.registry.RegisterActivity(a, opts...) diff --git a/worker/worker_test.go b/worker/worker_test.go new file mode 100644 index 00000000..d9f2a871 --- /dev/null +++ b/worker/worker_test.go @@ -0,0 +1,85 @@ +package worker + +import ( + "testing" + + "github.com/cschleiden/go-workflows/backend/memory" + "github.com/cschleiden/go-workflows/internal/sync" + "github.com/cschleiden/go-workflows/registry" + "github.com/stretchr/testify/require" +) + +func worker_workflow_v1(ctx sync.Context) error { + return nil +} + +func worker_workflow_v2(ctx sync.Context) (string, error) { + return "v2", nil +} + +func TestWorker_RegisterWorkflowVersion(t *testing.T) { + backend := memory.NewBackend() + worker := New(backend, nil) + + // Test successful versioned workflow registration + err := worker.RegisterWorkflowVersion(worker_workflow_v1, "1.0.0") + require.NoError(t, err) + + // Test registering different version of same workflow + err = worker.RegisterWorkflowVersion(worker_workflow_v2, "2.0.0") + require.NoError(t, err) + + // Test registering same version again - should fail + err = worker.RegisterWorkflowVersion(worker_workflow_v1, "1.0.0") + var wantErr *registry.ErrVersionedWorkflowAlreadyRegistered + require.ErrorAs(t, err, &wantErr) + + // Test with custom name + err = worker.RegisterWorkflowVersion(worker_workflow_v1, "1.0.0", registry.WithName("CustomWorkflow")) + require.NoError(t, err) +} + +func TestWorker_RegisterWorkflowVersion_InvalidWorkflow(t *testing.T) { + backend := memory.NewBackend() + worker := New(backend, nil) + + // Test empty version + err := worker.RegisterWorkflowVersion(worker_workflow_v1, "") + require.Error(t, err) + + // Test invalid workflow + err = worker.RegisterWorkflowVersion("not a function", "1.0.0") + require.Error(t, err) + + // Test workflow without context + err = worker.RegisterWorkflowVersion(func() error { return nil }, "1.0.0") + require.Error(t, err) +} + +func TestWorker_VersionedWorkflow_Integration(t *testing.T) { + backend := memory.NewBackend() + worker := New(backend, nil) + + // Register both regular and versioned workflows + err := worker.RegisterWorkflow(worker_workflow_v1) + require.NoError(t, err) + + err = worker.RegisterWorkflowVersion(worker_workflow_v1, "1.0.0") + require.NoError(t, err) + + err = worker.RegisterWorkflowVersion(worker_workflow_v2, "2.0.0") + require.NoError(t, err) + + // Verify they can be retrieved from the registry + regularWorkflow, err := worker.registry.GetWorkflow("worker_workflow_v1") + require.NoError(t, err) + require.NotNil(t, regularWorkflow) + + versionedWorkflow1, err := worker.registry.GetVersionedWorkflow("worker_workflow_v1", "1.0.0") + require.NoError(t, err) + require.NotNil(t, versionedWorkflow1) + + versionedWorkflow2, err := worker.registry.GetVersionedWorkflow("worker_workflow_v2", "2.0.0") + require.NoError(t, err) + require.NotNil(t, versionedWorkflow2) +} \ No newline at end of file