Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions registry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,19 @@ type ErrActivityAlreadyRegistered struct {
func (e *ErrActivityAlreadyRegistered) Error() string {
return e.msg
}

type ErrVersionedWorkflowAlreadyRegistered struct {
msg string
}

func (e *ErrVersionedWorkflowAlreadyRegistered) Error() string {
return e.msg
}

type ErrWorkflowVersionNotFound struct {
msg string
}

func (e *ErrWorkflowVersionNotFound) Error() string {
return e.msg
}
80 changes: 78 additions & 2 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ type Registry struct {

workflowMap map[string]any
activityMap map[string]any

// Versioned workflow storage: workflowName -> version -> implementation
versionedWorkflowMap map[string]map[string]any
}

// New creates a new registry instance.
func New() *Registry {
return &Registry{
workflowMap: make(map[string]any),
activityMap: make(map[string]any),
workflowMap: make(map[string]any),
activityMap: make(map[string]any),
versionedWorkflowMap: make(map[string]map[string]any),
}
}

Expand Down Expand Up @@ -76,6 +80,63 @@ func (r *Registry) RegisterWorkflow(workflow any, opts ...RegisterOption) error
return nil
}

// RegisterVersionedWorkflow registers a workflow with a specific version identifier.
func (r *Registry) RegisterVersionedWorkflow(workflow any, version string, opts ...RegisterOption) error {
cfg := registerOptions(opts).applyRegisterOptions(registerConfig{})
name := cfg.Name
if name == "" {
name = fn.Name(workflow)
}

if version == "" {
return &ErrInvalidWorkflow{"version cannot be empty"}
}

wfType := reflect.TypeOf(workflow)
if wfType.Kind() != reflect.Func {
return &ErrInvalidWorkflow{"workflow is not a function"}
}

if wfType.NumIn() == 0 {
return &ErrInvalidWorkflow{"workflow does not accept context parameter"}
}

if !args.IsOwnContext(wfType.In(0)) {
return &ErrInvalidWorkflow{"workflow does not accept context as first parameter"}
}

if wfType.NumOut() == 0 {
return &ErrInvalidWorkflow{"workflow must return error"}
}

if wfType.NumOut() > 2 {
return &ErrInvalidWorkflow{"workflow must return at most two values"}
}

errType := reflect.TypeOf((*error)(nil)).Elem()
if (wfType.NumOut() == 1 && !wfType.Out(0).Implements(errType)) ||
(wfType.NumOut() == 2 && !wfType.Out(1).Implements(errType)) {
return &ErrInvalidWorkflow{"workflow must return error as last return value"}
}

r.Lock()
defer r.Unlock()

// Initialize workflow name map if it doesn't exist
if r.versionedWorkflowMap[name] == nil {
r.versionedWorkflowMap[name] = make(map[string]any)
}

// Check if this version already exists
if _, ok := r.versionedWorkflowMap[name][version]; ok {
return &ErrVersionedWorkflowAlreadyRegistered{fmt.Sprintf("workflow %q version %q already registered", name, version)}
}

r.versionedWorkflowMap[name][version] = workflow

return nil
}

func (r *Registry) RegisterActivity(activity any, opts ...RegisterOption) error {
cfg := registerOptions(opts).applyRegisterOptions(registerConfig{})

Expand Down Expand Up @@ -163,6 +224,21 @@ func (r *Registry) GetWorkflow(name string) (any, error) {
return nil, errors.New("workflow not found")
}

// GetVersionedWorkflow retrieves a specific version of a workflow.
func (r *Registry) GetVersionedWorkflow(name, version string) (any, error) {
r.Lock()
defer r.Unlock()

if versionMap, ok := r.versionedWorkflowMap[name]; ok {
if workflow, ok := versionMap[version]; ok {
return workflow, nil
}
return nil, &ErrWorkflowVersionNotFound{fmt.Sprintf("workflow %q version %q not found", name, version)}
}

return nil, &ErrWorkflowVersionNotFound{fmt.Sprintf("workflow %q not found", name)}
}

func (r *Registry) GetActivity(name string) (interface{}, error) {
r.Lock()
defer r.Unlock()
Expand Down
162 changes: 162 additions & 0 deletions registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,165 @@ func Test_ActivityRegistrationOnStruct_Invalid(t *testing.T) {
err := r.RegisterActivity(a)
require.Error(t, err)
}
func reg_workflow_v1(ctx sync.Context) error {
return nil
}

func reg_workflow_v2(ctx sync.Context) (string, error) {
return "v2", nil
}

func TestRegistry_RegisterVersionedWorkflow(t *testing.T) {
tests := []struct {
name string
workflow any
version string
workflowName string
wantErr bool
}{
{
name: "valid versioned workflow",
workflow: reg_workflow_v1,
version: "1.0.0",
},
{
name: "valid versioned workflow with custom name",
workflow: reg_workflow_v1,
version: "1.0.0",
workflowName: "CustomWorkflow",
},
{
name: "valid versioned workflow with results",
workflow: reg_workflow_v2,
version: "2.0.0",
},
{
name: "empty version",
workflow: reg_workflow_v1,
version: "",
wantErr: true,
},
{
name: "invalid workflow - not a function",
workflow: "not a function",
version: "1.0.0",
wantErr: true,
},
{
name: "invalid workflow - missing context",
workflow: func() error { return nil },
version: "1.0.0",
wantErr: true,
},
{
name: "invalid workflow - missing error return",
workflow: func(ctx sync.Context) {},
version: "1.0.0",
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := New()

var opts []RegisterOption
if tt.workflowName != "" {
opts = append(opts, WithName(tt.workflowName))
}

err := r.RegisterVersionedWorkflow(tt.workflow, tt.version, opts...)

if (err != nil) != tt.wantErr {
t.Errorf("Registry.RegisterVersionedWorkflow() error = %v, wantErr %v", err, tt.wantErr)
return
}

if !tt.wantErr {
expectedName := tt.workflowName
if expectedName == "" {
expectedName = fn.Name(tt.workflow)
}

workflow, err := r.GetVersionedWorkflow(expectedName, tt.version)
require.NoError(t, err)
require.NotNil(t, workflow)
}
})
}
}

func TestRegistry_RegisterVersionedWorkflow_Conflict(t *testing.T) {
r := New()

// Register first version
err := r.RegisterVersionedWorkflow(reg_workflow_v1, "1.0.0")
require.NoError(t, err)

// Try to register same version again - should fail
err = r.RegisterVersionedWorkflow(reg_workflow_v1, "1.0.0")
var wantErr *ErrVersionedWorkflowAlreadyRegistered
require.ErrorAs(t, err, &wantErr)

// Register different version - should succeed
err = r.RegisterVersionedWorkflow(reg_workflow_v2, "2.0.0")
require.NoError(t, err)

// Register same workflow with custom name and same version - should succeed
err = r.RegisterVersionedWorkflow(reg_workflow_v1, "1.0.0", WithName("CustomWorkflow"))
require.NoError(t, err)

// Try to register same custom name and version again - should fail
err = r.RegisterVersionedWorkflow(reg_workflow_v1, "1.0.0", WithName("CustomWorkflow"))
require.ErrorAs(t, err, &wantErr)
}

func TestRegistry_GetVersionedWorkflow(t *testing.T) {
r := New()

// Register multiple versions of the same workflow
err := r.RegisterVersionedWorkflow(reg_workflow_v1, "1.0.0")
require.NoError(t, err)

err = r.RegisterVersionedWorkflow(reg_workflow_v2, "2.0.0")
require.NoError(t, err)

// Test successful retrieval
workflow1, err := r.GetVersionedWorkflow("reg_workflow_v1", "1.0.0")
require.NoError(t, err)
require.NotNil(t, workflow1)

workflow2, err := r.GetVersionedWorkflow("reg_workflow_v2", "2.0.0")
require.NoError(t, err)
require.NotNil(t, workflow2)

// Test workflow not found
_, err = r.GetVersionedWorkflow("nonexistent", "1.0.0")
var notFoundErr *ErrWorkflowVersionNotFound
require.ErrorAs(t, err, &notFoundErr)

// Test version not found
_, err = r.GetVersionedWorkflow("reg_workflow_v1", "3.0.0")
require.ErrorAs(t, err, &notFoundErr)
}

func TestRegistry_VersionedWorkflow_BackwardCompatibility(t *testing.T) {
r := New()

// Register regular workflow
err := r.RegisterWorkflow(reg_workflow1)
require.NoError(t, err)

// Register versioned workflow with same name
err = r.RegisterVersionedWorkflow(reg_workflow_v1, "1.0.0")
require.NoError(t, err)

// Both should be retrievable independently
regularWorkflow, err := r.GetWorkflow("reg_workflow1")
require.NoError(t, err)
require.NotNil(t, regularWorkflow)

versionedWorkflow, err := r.GetVersionedWorkflow("reg_workflow_v1", "1.0.0")
require.NoError(t, err)
require.NotNil(t, versionedWorkflow)
}
5 changes: 5 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ func (w *Worker) RegisterWorkflow(wf workflow.Workflow, opts ...registry.Registe
return w.registry.RegisterWorkflow(wf, opts...)
}

// RegisterWorkflowVersion registers a workflow with a specific version identifier.
func (w *Worker) RegisterWorkflowVersion(wf workflow.Workflow, version string, opts ...registry.RegisterOption) error {
return w.registry.RegisterVersionedWorkflow(wf, version, opts...)
}

// RegisterActivity registers an activity with the worker's registry.
func (w *Worker) RegisterActivity(a workflow.Activity, opts ...registry.RegisterOption) error {
return w.registry.RegisterActivity(a, opts...)
Expand Down
85 changes: 85 additions & 0 deletions worker/worker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package worker

import (
"testing"

"github.com/cschleiden/go-workflows/backend/memory"
"github.com/cschleiden/go-workflows/internal/sync"
"github.com/cschleiden/go-workflows/registry"
"github.com/stretchr/testify/require"
)

func worker_workflow_v1(ctx sync.Context) error {
return nil
}

func worker_workflow_v2(ctx sync.Context) (string, error) {
return "v2", nil
}

func TestWorker_RegisterWorkflowVersion(t *testing.T) {
backend := memory.NewBackend()
worker := New(backend, nil)

// Test successful versioned workflow registration
err := worker.RegisterWorkflowVersion(worker_workflow_v1, "1.0.0")
require.NoError(t, err)

// Test registering different version of same workflow
err = worker.RegisterWorkflowVersion(worker_workflow_v2, "2.0.0")
require.NoError(t, err)

// Test registering same version again - should fail
err = worker.RegisterWorkflowVersion(worker_workflow_v1, "1.0.0")
var wantErr *registry.ErrVersionedWorkflowAlreadyRegistered
require.ErrorAs(t, err, &wantErr)

// Test with custom name
err = worker.RegisterWorkflowVersion(worker_workflow_v1, "1.0.0", registry.WithName("CustomWorkflow"))
require.NoError(t, err)
}

func TestWorker_RegisterWorkflowVersion_InvalidWorkflow(t *testing.T) {
backend := memory.NewBackend()
worker := New(backend, nil)

// Test empty version
err := worker.RegisterWorkflowVersion(worker_workflow_v1, "")
require.Error(t, err)

// Test invalid workflow
err = worker.RegisterWorkflowVersion("not a function", "1.0.0")
require.Error(t, err)

// Test workflow without context
err = worker.RegisterWorkflowVersion(func() error { return nil }, "1.0.0")
require.Error(t, err)
}

func TestWorker_VersionedWorkflow_Integration(t *testing.T) {
backend := memory.NewBackend()
worker := New(backend, nil)

// Register both regular and versioned workflows
err := worker.RegisterWorkflow(worker_workflow_v1)
require.NoError(t, err)

err = worker.RegisterWorkflowVersion(worker_workflow_v1, "1.0.0")
require.NoError(t, err)

err = worker.RegisterWorkflowVersion(worker_workflow_v2, "2.0.0")
require.NoError(t, err)

// Verify they can be retrieved from the registry
regularWorkflow, err := worker.registry.GetWorkflow("worker_workflow_v1")
require.NoError(t, err)
require.NotNil(t, regularWorkflow)

versionedWorkflow1, err := worker.registry.GetVersionedWorkflow("worker_workflow_v1", "1.0.0")
require.NoError(t, err)
require.NotNil(t, versionedWorkflow1)

versionedWorkflow2, err := worker.registry.GetVersionedWorkflow("worker_workflow_v2", "2.0.0")
require.NoError(t, err)
require.NotNil(t, versionedWorkflow2)
}