-
Notifications
You must be signed in to change notification settings - Fork 46
Expand file tree
/
Copy pathnode_set.go
More file actions
239 lines (217 loc) · 8.93 KB
/
node_set.go
File metadata and controls
239 lines (217 loc) · 8.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
package simple_node_set
import (
"context"
"fmt"
"os"
"slices"
"strings"
"sync"
"time"
"golang.org/x/sync/errgroup"
"github.com/smartcontractkit/chainlink-testing-framework/framework/pods"
"github.com/smartcontractkit/chainlink-testing-framework/framework"
"github.com/smartcontractkit/chainlink-testing-framework/framework/components/blockchain"
"github.com/smartcontractkit/chainlink-testing-framework/framework/components/clnode"
"github.com/smartcontractkit/chainlink-testing-framework/framework/components/postgres"
)
const (
DefaultHTTPPortStaticRangeStart = 10000
DefaultP2PStaticRangeStart = 12000
DefaultOCR2P2PStaticRangeStart = 14000
)
// Input is a node set configuration input
type Input struct {
Name string `toml:"name" validate:"required" comment:"Node set name, ex.:'don-1', Docker containers will be prefixed with this name so tests can distinguish one DON from another"`
Nodes int `toml:"nodes" validate:"required" comment:"Number of nodes in node set"`
HTTPPortRangeStart int `toml:"http_port_range_start" comment:"HTTP ports range starting with port X and increasing by 1"`
P2PPortRangeStart int `toml:"p2p_port_range_start" comment:"P2P ports range starting with port X and increasing by 1"`
OCR2P2PRangeStart int `toml:"ocr2_p2p_port_range_start" comment:"OCR2/SharedPeering ports range starting with port X and increasing by 1"`
DlvPortRangeStart int `toml:"dlv_port_range_start" comment:"Delve debugger ports range starting with port X and increasing by 1"`
OverrideMode string `toml:"override_mode" validate:"required,oneof=all each" comment:"Override mode, applicable only to 'localcre'. Changes how config overrides to TOML nodes apply"`
DbInput *postgres.Input `toml:"db" validate:"required" comment:"Shared node set data base input for PostgreSQL"`
NodeSpecs []*clnode.Input `toml:"node_specs" validate:"required" comment:"Chainlink node TOML configurations"`
NoDNS bool `toml:"no_dns" comment:"Turn DNS on, helpful to isolate container from the internet"`
Out *Output `toml:"out" comment:"Nodeset config output"`
}
// Output is a node set configuration output, used for caching or external components
type Output struct {
// UseCache Whether to respect caching or not, if cache = true component won't be deployed again
UseCache bool `toml:"use_cache" comment:"Whether to respect caching or not, if cache = true component won't be deployed again"`
// DBOut Nodeset shared database output (PostgreSQL)
DBOut *postgres.Output `toml:"db_out" comment:"Nodeset shared database output (PostgreSQL)"`
// CLNodes Chainlink node config outputs
CLNodes []*clnode.Output `toml:"cl_nodes" comment:"Chainlink node config outputs"`
}
// NewSharedDBNodeSet create a new node set with a shared database instance
// all the nodes have their own isolated database
func NewSharedDBNodeSet(in *Input, bcOut *blockchain.Output) (*Output, error) {
return NewSharedDBNodeSetWithContext(context.Background(), in, bcOut)
}
// NewSharedDBNodeSetWithContext create a new node set with a shared database instance
// all the nodes have their own isolated database
func NewSharedDBNodeSetWithContext(ctx context.Context, in *Input, bcOut *blockchain.Output) (*Output, error) {
if in.Out != nil && in.Out.UseCache {
return in.Out, nil
}
var (
out *Output
err error
)
defer func() {
printURLs(out)
in.Out = out
}()
if len(in.NodeSpecs) != in.Nodes && in.OverrideMode == "each" {
return nil, fmt.Errorf("amount of 'nodes' must be equal to specs provided in override_mode='each'")
}
out, err = sharedDBSetup(ctx, in, bcOut)
if err != nil {
return nil, err
}
return out, nil
}
func NodeNamePrefix(nodeSetName string) string {
return nodeSetName + "-" + "node"
}
func printURLs(out *Output) {
if out == nil {
return
}
httpURLs, _, pgURLs := make([]string, 0), make([]string, 0), make([]string, 0)
for _, n := range out.CLNodes {
httpURLs = append(httpURLs, n.Node.ExternalURL)
pgURLs = append(pgURLs, n.PostgreSQL.Url)
}
framework.L.Info().Any("UI", httpURLs).Send()
framework.L.Debug().Any("DB", pgURLs).Send()
}
func sharedDBSetup(ctx context.Context, in *Input, bcOut *blockchain.Output) (*Output, error) {
in.DbInput.Name = fmt.Sprintf("%s-%s", in.Name, "ns-postgresql")
in.DbInput.VolumeName = in.Name
// create database for each node
in.DbInput.Databases = in.Nodes
dbOut, err := postgres.NewWithContext(ctx, in.DbInput)
if err != nil {
return nil, err
}
nodeOuts := make([]*clnode.Output, 0)
envImage := os.Getenv("CTF_CHAINLINK_IMAGE")
// to make it easier for chaos testing we use static ports
// there is no need to check them in advance since testcontainers-go returns a nice error
var (
httpPortRangeStart = DefaultHTTPPortStaticRangeStart
p2pPortRangeStart = DefaultP2PStaticRangeStart
ocr2PortRangeStart = DefaultOCR2P2PStaticRangeStart
dlvPortStart = clnode.DefaultDebuggerPort
)
if in.HTTPPortRangeStart != 0 {
httpPortRangeStart = in.HTTPPortRangeStart
}
if in.P2PPortRangeStart != 0 {
p2pPortRangeStart = in.P2PPortRangeStart
}
if in.OCR2P2PRangeStart != 0 {
ocr2PortRangeStart = in.OCR2P2PRangeStart
} else {
// Keep OCR2 host ports aligned with each node set's HTTP range to avoid
// collisions when multiple node sets run on the same host.
ocr2PortRangeStart = httpPortRangeStart + (DefaultOCR2P2PStaticRangeStart - DefaultHTTPPortStaticRangeStart)
}
if in.DlvPortRangeStart != 0 {
dlvPortStart = in.DlvPortRangeStart
}
eg := &errgroup.Group{}
mu := &sync.Mutex{}
for i := 0; i < in.Nodes; i++ {
overrideIdx := i
if in.OverrideMode == "all" {
if len(in.NodeSpecs[overrideIdx].Node.CustomPorts) > 0 {
return nil, fmt.Errorf("custom_ports can be used only with override_mode = 'each'")
}
}
eg.Go(func() error {
var net string
var err error
if bcOut != nil {
net, err = clnode.NewNetworkCfgOneNetworkAllNodes(bcOut)
if err != nil {
return err
}
}
if in.NodeSpecs[overrideIdx].Node.TestConfigOverrides != "" {
net = in.NodeSpecs[overrideIdx].Node.TestConfigOverrides
}
nodeWithNodeSetPrefixName := NodeNamePrefix(in.Name) + fmt.Sprint(i)
nodeSpec := &clnode.Input{
NoDNS: in.NoDNS,
DbInput: in.DbInput,
Node: &clnode.NodeInput{
HTTPPort: httpPortRangeStart + i,
P2PPort: p2pPortRangeStart + i,
OCR2P2PPort: ocr2PortRangeStart + i,
DebuggerPort: dlvPortStart + i,
CustomPorts: in.NodeSpecs[overrideIdx].Node.CustomPorts,
Image: in.NodeSpecs[overrideIdx].Node.Image,
Name: nodeWithNodeSetPrefixName,
PullImage: in.NodeSpecs[overrideIdx].Node.PullImage,
DockerFilePath: in.NodeSpecs[overrideIdx].Node.DockerFilePath,
DockerContext: in.NodeSpecs[overrideIdx].Node.DockerContext,
DockerBuildArgs: in.NodeSpecs[overrideIdx].Node.DockerBuildArgs,
CapabilitiesBinaryPaths: in.NodeSpecs[overrideIdx].Node.CapabilitiesBinaryPaths,
CapabilityContainerDir: in.NodeSpecs[overrideIdx].Node.CapabilityContainerDir,
TestConfigOverrides: net,
UserConfigOverrides: in.NodeSpecs[overrideIdx].Node.UserConfigOverrides,
TestSecretsOverrides: in.NodeSpecs[overrideIdx].Node.TestSecretsOverrides,
UserSecretsOverrides: in.NodeSpecs[overrideIdx].Node.UserSecretsOverrides,
ContainerResources: in.NodeSpecs[overrideIdx].Node.ContainerResources,
EnvVars: in.NodeSpecs[overrideIdx].Node.EnvVars,
},
}
if envImage != "" {
nodeSpec.Node.Image = envImage
// unset docker build context and file path to avoid conflicts, image provided via env var takes precedence
nodeSpec.Node.DockerContext = ""
nodeSpec.Node.DockerFilePath = ""
}
dbURLHost := strings.Replace(dbOut.Url, "/chainlink?sslmode=disable", fmt.Sprintf("/db_%d?sslmode=disable", i), -1)
dbURL := strings.Replace(dbOut.InternalURL, "/chainlink?sslmode=disable", fmt.Sprintf("/db_%d?sslmode=disable", i), -1)
dbSpec := &postgres.Output{
Url: dbURLHost,
InternalURL: dbURL,
}
o, err := clnode.NewNodeWithContext(ctx, nodeSpec, dbSpec)
if err != nil {
return err
}
mu.Lock()
nodeOuts = append(nodeOuts, o)
mu.Unlock()
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
sortNodeOutsByHostPort(nodeOuts)
// wait for all K8s services at once
if pods.K8sEnabled() {
if err := pods.WaitReady(ctx, 3*time.Minute); err != nil {
return nil, err
}
}
return &Output{
UseCache: true,
DBOut: dbOut,
CLNodes: nodeOuts,
}, nil
}
func sortNodeOutsByHostPort(nodes []*clnode.Output) {
slices.SortFunc(nodes, func(a, b *clnode.Output) int {
aa := strings.Split(a.Node.ExternalURL, ":")
bb := strings.Split(b.Node.ExternalURL, ":")
if aa[len(aa)-1] < bb[len(bb)-1] {
return -1
}
return 1
})
}