-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbuilder.go
More file actions
140 lines (121 loc) · 4.77 KB
/
builder.go
File metadata and controls
140 lines (121 loc) · 4.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package flyt
import (
"context"
"time"
)
// NodeBuilder provides a fluent interface for creating and configuring nodes.
// It implements the Node interface while also providing chainable methods
// for configuration. This allows both styles:
// - Traditional: flyt.NewNode(WithExecFunc(...), WithMaxRetries(3))
// - Builder: flyt.NewNode().WithExecFunc(...).WithMaxRetries(3)
type NodeBuilder struct {
*CustomNode
}
// Prep implements Node.Prep by delegating to the embedded CustomNode
func (b *NodeBuilder) Prep(ctx context.Context, shared *SharedStore) (any, error) {
return b.CustomNode.Prep(ctx, shared)
}
// Exec implements Node.Exec by delegating to the embedded CustomNode
func (b *NodeBuilder) Exec(ctx context.Context, prepResult any) (any, error) {
return b.CustomNode.Exec(ctx, prepResult)
}
// Post implements Node.Post by delegating to the embedded CustomNode
func (b *NodeBuilder) Post(ctx context.Context, shared *SharedStore, prepResult, execResult any) (Action, error) {
return b.CustomNode.Post(ctx, shared, prepResult, execResult)
}
// ExecFallback implements FallbackNode.ExecFallback by delegating to the embedded CustomNode
func (b *NodeBuilder) ExecFallback(prepResult any, err error) (any, error) {
return b.CustomNode.ExecFallback(prepResult, err)
}
// GetMaxRetries implements RetryableNode.GetMaxRetries by delegating to the embedded BaseNode
func (b *NodeBuilder) GetMaxRetries() int {
return b.CustomNode.GetMaxRetries()
}
// GetWait implements RetryableNode.GetWait by delegating to the embedded BaseNode
func (b *NodeBuilder) GetWait() time.Duration {
return b.CustomNode.GetWait()
}
// WithMaxRetries sets the maximum number of retries for the node's Exec phase.
// Returns the builder for method chaining.
func (b *NodeBuilder) WithMaxRetries(retries int) *NodeBuilder {
WithMaxRetries(retries)(b.BaseNode)
return b
}
// WithWait sets the wait duration between retries.
// Returns the builder for method chaining.
func (b *NodeBuilder) WithWait(wait time.Duration) *NodeBuilder {
WithWait(wait)(b.BaseNode)
return b
}
// WithPrepFunc sets a custom Prep implementation using Result types.
// Returns the builder for method chaining.
func (b *NodeBuilder) WithPrepFunc(fn func(context.Context, *SharedStore) (Result, error)) *NodeBuilder {
b.prepFunc = fn
return b
}
// WithExecFunc sets a custom Exec implementation using Result types.
// Returns the builder for method chaining.
func (b *NodeBuilder) WithExecFunc(fn func(context.Context, Result) (Result, error)) *NodeBuilder {
b.execFunc = fn
return b
}
// WithPostFunc sets a custom Post implementation using Result types.
// Returns the builder for method chaining.
func (b *NodeBuilder) WithPostFunc(fn func(context.Context, *SharedStore, Result, Result) (Action, error)) *NodeBuilder {
b.postFunc = fn
return b
}
// WithExecFallbackFunc sets a custom ExecFallback implementation.
// Returns the builder for method chaining.
func (b *NodeBuilder) WithExecFallbackFunc(fn func(any, error) (any, error)) *NodeBuilder {
b.execFallbackFunc = fn
return b
}
// WithPrepFuncAny sets a custom Prep implementation using any types.
// Returns the builder for method chaining.
func (b *NodeBuilder) WithPrepFuncAny(fn func(context.Context, *SharedStore) (any, error)) *NodeBuilder {
b.prepFunc = func(ctx context.Context, shared *SharedStore) (Result, error) {
val, err := fn(ctx, shared)
if err != nil {
return Result{}, err
}
return NewResult(val), nil
}
return b
}
// WithExecFuncAny sets a custom Exec implementation using any types.
// Returns the builder for method chaining.
func (b *NodeBuilder) WithExecFuncAny(fn func(context.Context, any) (any, error)) *NodeBuilder {
b.execFunc = func(ctx context.Context, prepResult Result) (Result, error) {
val, err := fn(ctx, prepResult.Value())
if err != nil {
return Result{}, err
}
return NewResult(val), nil
}
return b
}
// WithPostFuncAny sets a custom Post implementation using any types.
// Returns the builder for method chaining.
func (b *NodeBuilder) WithPostFuncAny(fn func(context.Context, *SharedStore, any, any) (Action, error)) *NodeBuilder {
b.postFunc = func(ctx context.Context, shared *SharedStore, prepResult, execResult Result) (Action, error) {
return fn(ctx, shared, prepResult.Value(), execResult.Value())
}
return b
}
// WithBatchConcurrency sets the concurrency level for batch processing.
// Returns the builder for method chaining.
func (b *NodeBuilder) WithBatchConcurrency(n int) *NodeBuilder {
b.batchConcurrency = n
return b
}
// WithBatchErrorHandling sets the error handling strategy for batch processing.
// Returns the builder for method chaining.
func (b *NodeBuilder) WithBatchErrorHandling(continueOnError bool) *NodeBuilder {
if continueOnError {
b.batchErrorHandling = "continue"
} else {
b.batchErrorHandling = "stop"
}
return b
}