Skip to content

Commit 4a4f9b2

Browse files
[CLD-354]: feat(operation): support execution of dynamic operation (#174)
To enhance the flexibility of Operations API, we want to support dynamic execution of Operation. Based on the operation definition, we can decide on run time what operations to execute and what input and deps to pass in to that operation. This can be quite powerful to allow users to dynamically construct a series of operation under a sequence. Refer to this example code snippet: ```go func ExampleOperationRegistry() { type Deps1 struct{} type Deps2 struct{} // Create operations with different input/output types stringOp := NewOperation( "string-op", semver.MustParse("1.0.0"), "Echo string operation", func(e Bundle, deps Deps1, input string) (string, error) { return input, nil }, ) intOp := NewOperation( "int-op", semver.MustParse("1.0.0"), "Echo integer operation", func(e Bundle, deps Deps2, input int) (int, error) { return input, nil }, ) // Create registry with untyped operations by providing optional initial operation registry := NewOperationRegistry(stringOp.AsUntyped()) // An alternative way to register additional operations without calling AsUntyped() RegisterOperation(registry, intOp) // Create execution environment b := NewBundle(context.Background, logger.Nop(), NewMemoryReporter(), WithOperationRegistry(registry)) inputs := []any{"input1", 42} deps := []any{Deps1{}, Deps2{}} defs := []Definition{ stringOp.Def(), intOp.Def(), } // dynamically retrieve and execute operations on different inputs for i, def := range defs { retrievedOp, err := registry.Retrieve(def) if err != nil { fmt.Println("error retrieving operation:", err) continue } report, err := ExecuteOperation(b, retrievedOp, deps[i], inputs[i]) if err != nil { fmt.Println("error executing operation:", err) continue } fmt.Println("operation output:", report.Output) } // Output: // operation output: input1 // operation output: 42 } ``` JIRA: https://smartcontract-it.atlassian.net/browse/CLD-354
1 parent f0d2548 commit 4a4f9b2

File tree

5 files changed

+332
-7
lines changed

5 files changed

+332
-7
lines changed

.changeset/moody-pots-find.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"chainlink-deployments-framework": minor
3+
---
4+
5+
feat: support dynamic execution of operation

operations/operation.go

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package operations
22

33
import (
44
"context"
5+
"errors"
56
"sync"
67

78
"github.com/Masterminds/semver/v3"
@@ -16,17 +17,36 @@ type Bundle struct {
1617
GetContext func() context.Context
1718
reporter Reporter
1819
// internal use only, for storing the hash of the report to avoid repeat sha256 computation.
19-
reportHashCache *sync.Map
20+
reportHashCache *sync.Map
21+
OperationRegistry *OperationRegistry
22+
}
23+
24+
// BundleOption is a functional option for configuring a Bundle
25+
type BundleOption func(*Bundle)
26+
27+
// WithOperationRegistry sets a custom OperationRegistry for the Bundle
28+
func WithOperationRegistry(registry *OperationRegistry) BundleOption {
29+
return func(b *Bundle) {
30+
b.OperationRegistry = registry
31+
}
2032
}
2133

2234
// NewBundle creates and returns a new Bundle.
23-
func NewBundle(getContext func() context.Context, logger logger.Logger, reporter Reporter) Bundle {
24-
return Bundle{
25-
Logger: logger,
26-
GetContext: getContext,
27-
reporter: reporter,
28-
reportHashCache: &sync.Map{},
35+
func NewBundle(getContext func() context.Context, logger logger.Logger, reporter Reporter, opts ...BundleOption) Bundle {
36+
b := Bundle{
37+
Logger: logger,
38+
GetContext: getContext,
39+
reporter: reporter,
40+
reportHashCache: &sync.Map{},
41+
OperationRegistry: NewOperationRegistry(),
2942
}
43+
44+
// Apply all provided options
45+
for _, opt := range opts {
46+
opt(&b)
47+
}
48+
49+
return b
3050
}
3151

3252
// OperationHandler is the function signature of an operation handler.
@@ -66,6 +86,11 @@ func (o *Operation[IN, OUT, DEP]) Description() string {
6686
return o.def.Description
6787
}
6888

89+
// Def returns the operation definition.
90+
func (o *Operation[IN, OUT, DEP]) Def() Definition {
91+
return o.def
92+
}
93+
6994
// execute runs the operation by calling the OperationHandler.
7095
func (o *Operation[IN, OUT, DEP]) execute(b Bundle, deps DEP, input IN) (output OUT, err error) {
7196
b.Logger.Infow("Executing operation",
@@ -74,6 +99,38 @@ func (o *Operation[IN, OUT, DEP]) execute(b Bundle, deps DEP, input IN) (output
7499
return o.handler(b, deps, input)
75100
}
76101

102+
// AsUntyped converts the operation to an untyped operation.
103+
// This is useful for storing operations in a slice or passing them around without type constraints.
104+
// Warning: The input and output types will be converted to `any`, so type safety is lost.
105+
func (o *Operation[IN, OUT, DEP]) AsUntyped() *Operation[any, any, any] {
106+
return &Operation[any, any, any]{
107+
def: Definition{
108+
ID: o.def.ID,
109+
Version: o.def.Version,
110+
Description: o.def.Description,
111+
},
112+
handler: func(b Bundle, deps any, input any) (any, error) {
113+
var typedInput IN
114+
if input != nil {
115+
var ok bool
116+
if typedInput, ok = input.(IN); !ok {
117+
return nil, errors.New("input type mismatch")
118+
}
119+
}
120+
121+
var typedDeps DEP
122+
if deps != nil {
123+
var ok bool
124+
if typedDeps, ok = deps.(DEP); !ok {
125+
return nil, errors.New("dependencies type mismatch")
126+
}
127+
}
128+
129+
return o.execute(b, typedDeps, typedInput)
130+
},
131+
}
132+
}
133+
77134
// NewOperation creates a new operation.
78135
// Version can be created using semver.MustParse("1.0.0") or semver.New("1.0.0").
79136
// Note: The handler should only perform maximum 1 side effect.

operations/operation_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ func Test_NewOperation(t *testing.T) {
3232
assert.Equal(t, "sum", op.ID())
3333
assert.Equal(t, version.String(), op.Version())
3434
assert.Equal(t, description, op.Description())
35+
assert.Equal(t, op.def, op.Def())
3536
res, err := op.handler(Bundle{}, OpDeps{}, OpInput{1, 2})
3637
require.NoError(t, err)
3738
assert.Equal(t, 3, res)
@@ -82,3 +83,67 @@ func Test_Operation_WithEmptyInput(t *testing.T) {
8283
require.NoError(t, err)
8384
assert.Equal(t, 1, out)
8485
}
86+
87+
func Test_Operation_AsUntyped(t *testing.T) {
88+
t.Parallel()
89+
90+
version := semver.MustParse("1.0.0")
91+
description := "test operation"
92+
handler1 := func(b Bundle, deps OpDeps, input OpInput) (output int, err error) {
93+
return input.A + input.B, nil
94+
}
95+
typedOp := NewOperation("sum", version, description, handler1)
96+
97+
untypedOp := typedOp.AsUntyped()
98+
bundle := NewBundle(context.Background, logger.Test(t), nil)
99+
100+
assert.Equal(t, "sum", untypedOp.ID())
101+
assert.Equal(t, version.String(), untypedOp.Version())
102+
assert.Equal(t, description, untypedOp.Description())
103+
104+
tests := []struct {
105+
name string
106+
deps any
107+
input any
108+
wantResult any
109+
wantErr bool
110+
errContains string
111+
}{
112+
{
113+
name: "valid input and dependencies",
114+
deps: OpDeps{},
115+
input: OpInput{A: 3, B: 4},
116+
wantResult: 7,
117+
wantErr: false,
118+
},
119+
{
120+
name: "invalid input type",
121+
deps: OpDeps{},
122+
input: struct{ C int }{C: 5},
123+
wantErr: true,
124+
errContains: "input type mismatch",
125+
},
126+
{
127+
name: "invalid dependencies type",
128+
deps: "invalid",
129+
input: OpInput{A: 1, B: 2},
130+
wantErr: true,
131+
errContains: "dependencies type mismatch",
132+
},
133+
}
134+
135+
for _, tt := range tests {
136+
t.Run(tt.name, func(t *testing.T) {
137+
t.Parallel()
138+
result, err := untypedOp.handler(bundle, tt.deps, tt.input)
139+
140+
if tt.wantErr {
141+
require.Error(t, err)
142+
assert.Contains(t, err.Error(), tt.errContains)
143+
} else {
144+
require.NoError(t, err)
145+
assert.Equal(t, tt.wantResult, result)
146+
}
147+
})
148+
}
149+
}

operations/registry.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package operations
2+
3+
import "errors"
4+
5+
// OperationRegistry is a store for operations that allows retrieval based on their definitions.
6+
type OperationRegistry struct {
7+
ops []*Operation[any, any, any]
8+
}
9+
10+
// NewOperationRegistry creates a new OperationRegistry with the provided untyped operations.
11+
func NewOperationRegistry(ops ...*Operation[any, any, any]) *OperationRegistry {
12+
return &OperationRegistry{
13+
ops: ops,
14+
}
15+
}
16+
17+
// Retrieve retrieves an operation from the store based on its definition.
18+
// It returns an error if the operation is not found.
19+
// The definition must match the operation's ID and version.
20+
func (s OperationRegistry) Retrieve(def Definition) (*Operation[any, any, any], error) {
21+
for _, op := range s.ops {
22+
if op.ID() == def.ID && op.Version() == def.Version.String() {
23+
return op, nil
24+
}
25+
}
26+
27+
return nil, errors.New("operation not found in registry")
28+
}
29+
30+
// RegisterOperation registers new operations in the registry.
31+
// To register operations with different input, output, and dependency types,
32+
// call RegisterOperation multiple times with different type parameters.
33+
func RegisterOperation[D, I, O any](r *OperationRegistry, op ...*Operation[D, I, O]) {
34+
for _, o := range op {
35+
r.ops = append(r.ops, o.AsUntyped())
36+
}
37+
}

operations/registry_test.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package operations
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
8+
"github.com/Masterminds/semver/v3"
9+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
// ExampleOperationRegistry demonstrates how to create and use an OperationRegistry
15+
// with operations being executed dynamically with different input/output types.
16+
func ExampleOperationRegistry() {
17+
// example dependencies for operations
18+
type Deps1 struct{}
19+
type Deps2 struct{}
20+
21+
// Create operations with different input/output types
22+
stringOp := NewOperation(
23+
"string-op",
24+
semver.MustParse("1.0.0"),
25+
"Echo string operation",
26+
func(e Bundle, deps Deps1, input string) (string, error) {
27+
return input, nil
28+
},
29+
)
30+
31+
intOp := NewOperation(
32+
"int-op",
33+
semver.MustParse("1.0.0"),
34+
"Echo integer operation",
35+
func(e Bundle, deps Deps2, input int) (int, error) {
36+
return input, nil
37+
},
38+
)
39+
// Create registry with untyped operations by providing optional initial operation
40+
registry := NewOperationRegistry(stringOp.AsUntyped())
41+
42+
// An alternative way to register additional operations without calling AsUntyped()
43+
RegisterOperation(registry, intOp)
44+
45+
// Create execution environment
46+
b := NewBundle(context.Background, logger.Nop(), NewMemoryReporter(), WithOperationRegistry(registry))
47+
48+
// Define inputs and dependencies for operations
49+
// inputs[0] is for stringOp, inputs[1] is for intOp
50+
// deps[0] is for stringOp, deps[1] is for intOp
51+
inputs := []any{"input1", 42}
52+
deps := []any{Deps1{}, Deps2{}}
53+
defs := []Definition{
54+
stringOp.Def(),
55+
intOp.Def(),
56+
}
57+
58+
// dynamically retrieve and execute operations on different inputs
59+
for i, def := range defs {
60+
retrievedOp, err := registry.Retrieve(def)
61+
if err != nil {
62+
fmt.Println("error retrieving operation:", err)
63+
continue
64+
}
65+
66+
report, err := ExecuteOperation(b, retrievedOp, deps[i], inputs[i])
67+
if err != nil {
68+
fmt.Println("error executing operation:", err)
69+
continue
70+
}
71+
72+
fmt.Println("operation output:", report.Output)
73+
}
74+
75+
// Output:
76+
// operation output: input1
77+
// operation output: 42
78+
}
79+
80+
func TestOperationRegistry_Retrieve(t *testing.T) {
81+
t.Parallel()
82+
83+
op1 := NewOperation(
84+
"test-op-1",
85+
semver.MustParse("1.0.0"),
86+
"Operation 1",
87+
func(e Bundle, deps OpDeps, input string) (string, error) { return input, nil },
88+
)
89+
op2 := NewOperation(
90+
"test-op-2",
91+
semver.MustParse("2.0.0"),
92+
"Operation 2",
93+
func(e Bundle, deps OpDeps, input int) (int, error) { return input * 2, nil },
94+
)
95+
96+
tests := []struct {
97+
name string
98+
operations []*Operation[any, any, any]
99+
lookup Definition
100+
wantErr bool
101+
wantErrMsg string
102+
wantID string
103+
wantVersion string
104+
}{
105+
{
106+
name: "empty registry",
107+
operations: nil,
108+
lookup: Definition{ID: "test-op-1", Version: semver.MustParse("1.0.0")},
109+
wantErr: true,
110+
wantErrMsg: "operation not found in registry",
111+
},
112+
{
113+
name: "retrieval by exact match - first operation",
114+
operations: []*Operation[any, any, any]{op1.AsUntyped(), op2.AsUntyped()},
115+
lookup: Definition{ID: "test-op-1", Version: semver.MustParse("1.0.0")},
116+
wantErr: false,
117+
wantID: "test-op-1",
118+
wantVersion: "1.0.0",
119+
},
120+
{
121+
name: "retrieval by exact match - second operation",
122+
operations: []*Operation[any, any, any]{op1.AsUntyped(), op2.AsUntyped()},
123+
lookup: Definition{ID: "test-op-2", Version: semver.MustParse("2.0.0")},
124+
wantErr: false,
125+
wantID: "test-op-2",
126+
wantVersion: "2.0.0",
127+
},
128+
{
129+
name: "operation not found - non-existent ID",
130+
operations: []*Operation[any, any, any]{op1.AsUntyped(), op2.AsUntyped()},
131+
lookup: Definition{ID: "non-existent", Version: semver.MustParse("1.0.0")},
132+
wantErr: true,
133+
wantErrMsg: "operation not found in registry",
134+
},
135+
{
136+
name: "operation not found - wrong version",
137+
operations: []*Operation[any, any, any]{op1.AsUntyped(), op2.AsUntyped()},
138+
lookup: Definition{ID: "test-op-1", Version: semver.MustParse("3.0.0")},
139+
wantErr: true,
140+
wantErrMsg: "operation not found in registry",
141+
},
142+
}
143+
144+
for _, tt := range tests {
145+
t.Run(tt.name, func(t *testing.T) {
146+
t.Parallel()
147+
148+
registry := NewOperationRegistry(tt.operations...)
149+
retrievedOp, err := registry.Retrieve(tt.lookup)
150+
151+
if tt.wantErr {
152+
require.Error(t, err)
153+
assert.ErrorContains(t, err, tt.wantErrMsg)
154+
} else {
155+
require.NoError(t, err)
156+
assert.Equal(t, tt.wantID, retrievedOp.ID())
157+
assert.Equal(t, tt.wantVersion, retrievedOp.Version())
158+
}
159+
})
160+
}
161+
}

0 commit comments

Comments
 (0)