Skip to content

Commit d7a92d8

Browse files
akrem-chabchoubgacevicljubisadarkobas2
authored
refactor: separate smoke and load checks into distinct packages (#525)
* refactor(load): separate load check in a new package * refactor(smoke): clean up smoke package * refactor(load): remove whitespace * refactor(load): improve logging and restore old comments * refactor(load, smoke): unify NewMetrics function for smoke and load checks * refactor(smoke): improve client selection logic for smoke checks by using ShuffledFullNodeClients * refactor(load): improve client selection logic for load checks by using ShuffledFullNodeClient * refactor: use ShuffledFullNodeClients (#528) * refactor: use ShuffledFullNodeClients * fix(client): add NodeGroup option to ClientOptions * refactor(cluster): remove unused Next method from ClientList * fix(load): fix NewDefaultOptions * fix: add whitespace for linter * check-smoke: add node name label to metrics (#529) * chore: bump golangci-lint to v2 and golang to v1.25.1 (#508) * chore: bump golangci-lint to v2 * chore: bump golang to 1.25.1 * chore: gofumpt * fix: merge from master * fix: split load and smoke metrics --------- Co-authored-by: akrem-chabchoub <chabchoub.akrem@gmail.com> Co-authored-by: Not Darko <93942788+darkobas2@users.noreply.github.com> * fix: improve log msgs for smoke check * fix: merge from master --------- Co-authored-by: Ljubiša Gačević <35105035+gacevicljubisa@users.noreply.github.com> Co-authored-by: Not Darko <93942788+darkobas2@users.noreply.github.com> Co-authored-by: Ljubisa Gacevic <ljubisa.rs@gmail.com>
1 parent bca0c35 commit d7a92d8

File tree

17 files changed

+384
-207
lines changed

17 files changed

+384
-207
lines changed

pkg/bee/client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type Client struct {
2929
swapClient swap.BlockTimeFetcher
3030
log logging.Logger
3131
name string
32+
nodeGroup string
3233
apiURL *url.URL
3334
retryCount int
3435
}
@@ -37,6 +38,7 @@ type Client struct {
3738
type ClientOptions struct {
3839
APIURL *url.URL
3940
Name string
41+
NodeGroup string
4042
Retry int
4143
SwapClient swap.BlockTimeFetcher
4244
HTTPClient *http.Client
@@ -54,6 +56,7 @@ func NewClient(opts ClientOptions) (c *Client, err error) {
5456
log: opts.Logger,
5557
swapClient: opts.SwapClient,
5658
name: opts.Name,
59+
nodeGroup: opts.NodeGroup,
5760
apiURL: opts.APIURL,
5861
}
5962

@@ -90,6 +93,10 @@ func (c *Client) API() *api.Client {
9093
return c.api
9194
}
9295

96+
func (c *Client) NodeGroup() string {
97+
return c.nodeGroup
98+
}
99+
93100
// Addresses returns node's addresses
94101
func (c *Client) Addresses(ctx context.Context) (resp Addresses, err error) {
95102
a, err := c.api.Node.Addresses(ctx)

pkg/check/balances/balances.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"github.com/ethersphere/beekeeper/pkg/beekeeper"
99
"github.com/ethersphere/beekeeper/pkg/logging"
1010
"github.com/ethersphere/beekeeper/pkg/orchestration"
11-
test "github.com/ethersphere/beekeeper/pkg/test"
11+
"github.com/ethersphere/beekeeper/pkg/test"
1212
)
1313

1414
// Options represents check options
Lines changed: 101 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package smoke
1+
package load
22

33
import (
44
"bytes"
@@ -15,32 +15,71 @@ import (
1515
"github.com/ethersphere/beekeeper/pkg/beekeeper"
1616
"github.com/ethersphere/beekeeper/pkg/logging"
1717
"github.com/ethersphere/beekeeper/pkg/orchestration"
18+
"github.com/ethersphere/beekeeper/pkg/random"
1819
"github.com/ethersphere/beekeeper/pkg/scheduler"
20+
"github.com/ethersphere/beekeeper/pkg/test"
21+
"github.com/prometheus/client_golang/prometheus"
1922
)
2023

24+
type Options struct {
25+
ContentSize int64
26+
RndSeed int64
27+
PostageTTL time.Duration
28+
PostageDepth uint64
29+
PostageLabel string
30+
TxOnErrWait time.Duration
31+
RxOnErrWait time.Duration
32+
NodesSyncWait time.Duration
33+
Duration time.Duration
34+
UploaderCount int
35+
UploadGroups []string
36+
DownloaderCount int
37+
DownloadGroups []string
38+
MaxCommittedDepth uint8
39+
CommittedDepthCheckWait time.Duration
40+
IterationWait time.Duration
41+
}
42+
43+
func NewDefaultOptions() Options {
44+
return Options{
45+
ContentSize: 5000000,
46+
RndSeed: time.Now().UnixNano(),
47+
PostageTTL: 24 * time.Hour,
48+
PostageDepth: 24,
49+
PostageLabel: "test-label",
50+
TxOnErrWait: 10 * time.Second,
51+
RxOnErrWait: 10 * time.Second,
52+
NodesSyncWait: time.Minute,
53+
Duration: 12 * time.Hour,
54+
UploaderCount: 1,
55+
UploadGroups: []string{"bee"},
56+
DownloaderCount: 0,
57+
DownloadGroups: []string{},
58+
MaxCommittedDepth: 2,
59+
CommittedDepthCheckWait: 5 * time.Minute,
60+
IterationWait: 5 * time.Minute,
61+
}
62+
}
63+
2164
func init() {
2265
rand.New(rand.NewSource(time.Now().UnixNano()))
2366
}
2467

25-
// compile check whether Check implements interface
26-
var _ beekeeper.Action = (*LoadCheck)(nil)
68+
var _ beekeeper.Action = (*Check)(nil)
2769

28-
// Check instance
29-
type LoadCheck struct {
70+
type Check struct {
3071
metrics metrics
3172
logger logging.Logger
3273
}
3374

34-
// NewCheck returns new check
35-
func NewLoadCheck(log logging.Logger) beekeeper.Action {
36-
return &LoadCheck{
75+
func NewCheck(log logging.Logger) beekeeper.Action {
76+
return &Check{
3777
metrics: newMetrics("check_load"),
3878
logger: log,
3979
}
4080
}
4181

42-
// Run creates file of specified size that is uploaded and downloaded.
43-
func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts interface{}) error {
82+
func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts interface{}) error {
4483
o, ok := opts.(Options)
4584
if !ok {
4685
return errors.New("invalid options type")
@@ -51,9 +90,9 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
5190
})
5291
}
5392

54-
func (c *LoadCheck) run(ctx context.Context, cluster orchestration.Cluster, o Options) error {
93+
func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Options) error {
5594
if o.UploaderCount == 0 || len(o.UploadGroups) == 0 {
56-
return errors.New("no uploaders requested, quiting")
95+
return errors.New("no uploaders requested, quitting")
5796
}
5897

5998
if o.MaxCommittedDepth == 0 {
@@ -68,20 +107,34 @@ func (c *LoadCheck) run(ctx context.Context, cluster orchestration.Cluster, o Op
68107
c.logger.Infof("committed depth check wait time: %v", o.CommittedDepthCheckWait)
69108
c.logger.Infof("total duration: %s", o.Duration.String())
70109

71-
clients, err := cluster.NodesClients(ctx)
110+
rnd := random.PseudoGenerator(o.RndSeed)
111+
fullNodeClients, err := cluster.ShuffledFullNodeClients(ctx, rnd)
72112
if err != nil {
73-
return fmt.Errorf("get clients: %w", err)
113+
return fmt.Errorf("get shuffled full node clients: %w", err)
74114
}
75115

76-
test := &test{clients: clients, logger: c.logger}
116+
minNodes := min(o.UploaderCount, o.DownloaderCount)
117+
if len(fullNodeClients) == 0 || len(fullNodeClients) < minNodes {
118+
return fmt.Errorf("load check requires at least %d full nodes, got %d", minNodes, len(fullNodeClients))
119+
}
77120

78-
uploaders := selectNames(cluster, o.UploadGroups...)
79-
downloaders := selectNames(cluster, o.DownloadGroups...)
121+
test := test.NewTest(c.logger)
122+
123+
var downloaders []*bee.Client
124+
if o.DownloaderCount > 0 && len(o.DownloadGroups) > 0 {
125+
downloaders = fullNodeClients.FilterByNodeGroups(o.DownloadGroups)
126+
if len(downloaders) == 0 {
127+
return fmt.Errorf("no downloaders found in the specified node groups: %v", o.DownloadGroups)
128+
}
129+
if len(downloaders) < o.DownloaderCount {
130+
return fmt.Errorf("not enough downloaders found in the specified node groups: %v, requested %d, got %d", o.DownloadGroups, o.DownloaderCount, len(downloaders))
131+
}
132+
}
80133

81134
for i := 0; true; i++ {
82135
select {
83136
case <-ctx.Done():
84-
c.logger.Info("we are done")
137+
c.logger.Info("context done in iteration")
85138
return nil
86139
default:
87140
c.logger.Infof("starting iteration: #%d bytes (%.2f KB)", contentSize, float64(contentSize)/1024)
@@ -101,9 +154,16 @@ func (c *LoadCheck) run(ctx context.Context, cluster orchestration.Cluster, o Op
101154
continue
102155
}
103156

104-
txNames := pickRandom(o.UploaderCount, uploaders)
105-
106-
c.logger.Infof("uploader: %s", txNames)
157+
uploaders := fullNodeClients
158+
if o.UploaderCount > 0 && len(o.UploadGroups) > 0 {
159+
uploaders = fullNodeClients.FilterByNodeGroups(o.UploadGroups)
160+
if len(uploaders) == 0 {
161+
return fmt.Errorf("no uploaders found in the specified node groups: %v", o.UploadGroups)
162+
}
163+
if len(uploaders) < o.UploaderCount {
164+
return fmt.Errorf("not enough uploaders found in the specified node groups: %v, requested %d, got %d", o.UploadGroups, o.UploaderCount, len(uploaders))
165+
}
166+
}
107167

108168
var (
109169
upload sync.WaitGroup
@@ -112,39 +172,39 @@ func (c *LoadCheck) run(ctx context.Context, cluster orchestration.Cluster, o Op
112172

113173
upload.Add(1)
114174

115-
for _, txName := range txNames {
175+
for _, uploader := range uploaders[:o.UploaderCount] {
116176
go func() {
117177
defer once.Do(func() {
118178
upload.Done()
119179
}) // don't wait for all uploads
120180
for retries := 10; txDuration == 0 && retries > 0; retries-- {
121181
select {
122182
case <-ctx.Done():
123-
c.logger.Info("we are done")
183+
c.logger.Info("context done in retry")
124184
return
125185
default:
126186
}
127187

128-
if !c.checkCommittedDepth(ctx, test.clients[txName], o.MaxCommittedDepth, o.CommittedDepthCheckWait) {
188+
if !c.checkCommittedDepth(ctx, uploader, o.MaxCommittedDepth, o.CommittedDepthCheckWait) {
129189
return
130190
}
131191

132192
c.metrics.UploadAttempts.WithLabelValues(sizeLabel).Inc()
133193
var duration time.Duration
134-
c.logger.Infof("uploading to: %s", txName)
194+
c.logger.Infof("uploading to: %s", uploader.Name())
135195

136-
batchID, err := clients[txName].GetOrCreateMutableBatch(ctx, o.PostageTTL, o.PostageDepth, o.PostageLabel)
196+
batchID, err := uploader.GetOrCreateMutableBatch(ctx, o.PostageTTL, o.PostageDepth, o.PostageLabel)
137197
if err != nil {
138-
c.logger.Errorf("create new batch: %v", err)
198+
c.logger.Errorf("create new batch failed: %v", err)
139199
return
140200
}
141201

142-
c.logger.WithField("batch_id", batchID).Info("using batch")
202+
c.logger.WithField("batch_id", batchID).Infof("node %s: using batch", uploader.Name())
143203

144-
address, duration, err = test.upload(ctx, txName, txData, batchID)
204+
address, duration, err = test.Upload(ctx, uploader, txData, batchID)
145205
if err != nil {
146206
c.metrics.UploadErrors.WithLabelValues(sizeLabel).Inc()
147-
c.logger.Infof("upload failed: %v", err)
207+
c.logger.Errorf("upload failed: %v", err)
148208
c.logger.Infof("retrying in: %v", o.TxOnErrWait)
149209
time.Sleep(o.TxOnErrWait)
150210
return
@@ -161,19 +221,12 @@ func (c *LoadCheck) run(ctx context.Context, cluster orchestration.Cluster, o Op
161221
}
162222

163223
c.logger.Infof("sleeping for: %v seconds", o.NodesSyncWait.Seconds())
164-
time.Sleep(o.NodesSyncWait) // Wait for nodes to sync.
165-
166-
// pick a batch of downloaders
167-
rxNames := pickRandom(o.DownloaderCount, downloaders)
168-
c.logger.Infof("downloaders: %s", rxNames)
224+
time.Sleep(o.NodesSyncWait)
169225

170226
var wg sync.WaitGroup
171227

172-
for _, rxName := range rxNames {
173-
wg.Add(1)
174-
go func() {
175-
defer wg.Done()
176-
228+
for _, downloader := range downloaders {
229+
wg.Go(func() {
177230
var (
178231
rxDuration time.Duration
179232
rxData []byte
@@ -189,16 +242,15 @@ func (c *LoadCheck) run(ctx context.Context, cluster orchestration.Cluster, o Op
189242

190243
c.metrics.DownloadAttempts.WithLabelValues(sizeLabel).Inc()
191244

192-
rxData, rxDuration, err = test.download(ctx, rxName, address)
245+
rxData, rxDuration, err = test.Download(ctx, downloader, address)
193246
if err != nil {
194247
c.metrics.DownloadErrors.WithLabelValues(sizeLabel).Inc()
195-
c.logger.Infof("download failed: %v", err)
248+
c.logger.Errorf("download failed for size %d: %v", contentSize, err)
196249
c.logger.Infof("retrying in: %v", o.RxOnErrWait)
197250
time.Sleep(o.RxOnErrWait)
198251
}
199252
}
200253

201-
// download error, skip comprarison below
202254
if rxDuration == 0 {
203255
return
204256
}
@@ -210,10 +262,7 @@ func (c *LoadCheck) run(ctx context.Context, cluster orchestration.Cluster, o Op
210262

211263
rxLen, txLen := len(rxData), len(txData)
212264
if rxLen != txLen {
213-
c.logger.Infof("length mismatch: download length %d; upload length %d", rxLen, txLen)
214-
if txLen < rxLen {
215-
c.logger.Info("length mismatch: rx length is bigger then tx length")
216-
}
265+
c.logger.Errorf("length mismatch for size %d: downloaded %d bytes, uploaded %d bytes", contentSize, rxLen, txLen)
217266
return
218267
}
219268

@@ -227,8 +276,6 @@ func (c *LoadCheck) run(ctx context.Context, cluster orchestration.Cluster, o Op
227276
return
228277
}
229278

230-
// We want to update the metrics when no error has been
231-
// encountered in order to avoid counter mismatch.
232279
c.metrics.UploadDuration.WithLabelValues(sizeLabel).Observe(txDuration.Seconds())
233280
c.metrics.DownloadDuration.WithLabelValues(sizeLabel).Observe(rxDuration.Seconds())
234281
if txDuration.Seconds() > 0 {
@@ -239,7 +286,7 @@ func (c *LoadCheck) run(ctx context.Context, cluster orchestration.Cluster, o Op
239286
downloadThroughput := float64(contentSize) / rxDuration.Seconds()
240287
c.metrics.DownloadThroughput.WithLabelValues(sizeLabel).Set(downloadThroughput)
241288
}
242-
}()
289+
})
243290
}
244291

245292
wg.Wait()
@@ -248,11 +295,11 @@ func (c *LoadCheck) run(ctx context.Context, cluster orchestration.Cluster, o Op
248295
return nil
249296
}
250297

251-
func (c *LoadCheck) checkCommittedDepth(ctx context.Context, client *bee.Client, maxDepth uint8, wait time.Duration) bool {
298+
func (c *Check) checkCommittedDepth(ctx context.Context, client *bee.Client, maxDepth uint8, wait time.Duration) bool {
252299
for {
253300
statusResp, err := client.Status(ctx)
254301
if err != nil {
255-
c.logger.Infof("error getting state: %v", err)
302+
c.logger.Errorf("error getting state: %v", err)
256303
return false
257304
}
258305

@@ -270,43 +317,6 @@ func (c *LoadCheck) checkCommittedDepth(ctx context.Context, client *bee.Client,
270317
}
271318
}
272319

273-
func pickRandom(count int, peers []string) (names []string) {
274-
seq := randomIntSeq(count, len(peers))
275-
for _, i := range seq {
276-
names = append(names, peers[i])
277-
}
278-
279-
return names
280-
}
281-
282-
func selectNames(c orchestration.Cluster, names ...string) (selected []string) {
283-
for _, name := range names {
284-
ng, err := c.NodeGroup(name)
285-
if err != nil {
286-
panic(err)
287-
}
288-
selected = append(selected, ng.NodesSorted()...)
289-
}
290-
291-
rand.Shuffle(len(selected), func(i, j int) {
292-
tmp := selected[i]
293-
selected[i] = selected[j]
294-
selected[j] = tmp
295-
})
296-
297-
return selected
298-
}
299-
300-
func randomIntSeq(size, ceiling int) (out []int) {
301-
r := make(map[int]struct{}, size)
302-
303-
for len(r) < size {
304-
r[rand.Intn(ceiling)] = struct{}{}
305-
}
306-
307-
for k := range r {
308-
out = append(out, k)
309-
}
310-
311-
return out
320+
func (c *Check) Report() []prometheus.Collector {
321+
return c.metrics.Report()
312322
}

0 commit comments

Comments
 (0)