Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ checks:
download-timeout: 1m
iteration-wait: 5m
duration: 10m
r-level: 2
timeout: 11m
type: smoke
ci-load:
Expand Down
6 changes: 6 additions & 0 deletions pkg/bee/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"net/url"
"strconv"
"strings"

"github.com/ethersphere/bee/v2/pkg/file/redundancy"
)

const (
Expand All @@ -32,6 +34,7 @@ const (
swarmFeedIndexNextHeader = "Swarm-Feed-Index-Next"
swarmIndexDocumentHeader = "Swarm-Index-Document"
swarmErrorDocumentHeader = "Swarm-Error-Document"
redundancyLevelHeader = "Swarm-Redundancy-Level"
)

// Client manages communication with the Bee API.
Expand Down Expand Up @@ -223,6 +226,9 @@ func (c *Client) requestDataGetHeader(ctx context.Context, method, path string,
if opts != nil && opts.Cache != nil {
req.Header.Set(swarmCacheDownloadHeader, strconv.FormatBool(*opts.Cache))
}
if opts != nil && opts.RLevel != redundancy.NONE {
req.Header.Set(redundancyLevelHeader, strconv.Itoa(int(opts.RLevel)))
}
if opts != nil && opts.RedundancyFallbackMode != nil {
req.Header.Set(swarmRedundancyFallbackMode, strconv.FormatBool(*opts.RedundancyFallbackMode))
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/bee/api/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"strconv"

"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/swarm"
)

Expand Down Expand Up @@ -34,6 +35,10 @@ func (b *BytesService) Upload(ctx context.Context, data io.Reader, o UploadOptio
}
h.Add(deferredUploadHeader, strconv.FormatBool(!o.Direct))
h.Add(postageStampBatchHeader, o.BatchID)
if o.RLevel != redundancy.NONE {
h.Add(redundancyLevelHeader, strconv.Itoa(int(o.RLevel)))
}

err := b.client.requestWithHeader(ctx, http.MethodPost, "/"+apiVersion+"/bytes", h, data, &resp)
return resp, err
}
7 changes: 6 additions & 1 deletion pkg/bee/api/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package api

import "github.com/ethersphere/bee/v2/pkg/swarm"
import (
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/swarm"
)

type UploadOptions struct {
Act bool
Expand All @@ -9,6 +12,7 @@ type UploadOptions struct {
BatchID string
Direct bool
ActHistoryAddress swarm.Address
RLevel redundancy.Level

// Dirs
IndexDocument string
Expand All @@ -21,6 +25,7 @@ type DownloadOptions struct {
ActPublicKey *swarm.Address
ActTimestamp *uint64
Cache *bool
RLevel redundancy.Level
RedundancyFallbackMode *bool
OnlyRootChunk *bool
}
5 changes: 3 additions & 2 deletions pkg/check/load/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/beekeeper/pkg/bee"
"github.com/ethersphere/beekeeper/pkg/beekeeper"
Expand Down Expand Up @@ -203,7 +204,7 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option

c.logger.WithField("batch_id", batchID).Infof("node %s: using batch", uploader.Name())

address, duration, err = test.Upload(ctx, uploader, txData, batchID)
address, duration, err = test.Upload(ctx, uploader, txData, batchID, redundancy.NONE)
if err != nil {
c.metrics.UploadErrors.WithLabelValues(sizeLabel).Inc()
c.logger.Errorf("upload failed: %v", err)
Expand Down Expand Up @@ -246,7 +247,7 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option

c.metrics.DownloadAttempts.WithLabelValues(sizeLabel).Inc()

rxData, rxDuration, err = test.Download(ctx, downloader, address)
rxData, rxDuration, err = test.Download(ctx, downloader, address, redundancy.NONE)
if err != nil {
c.metrics.DownloadErrors.WithLabelValues(sizeLabel).Inc()
c.logger.Errorf("download failed for size %d: %v", contentSize, err)
Expand Down
9 changes: 7 additions & 2 deletions pkg/check/smoke/smoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"fmt"
"time"

"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/swarm"

"github.com/ethersphere/beekeeper/pkg/beekeeper"
"github.com/ethersphere/beekeeper/pkg/logging"
"github.com/ethersphere/beekeeper/pkg/orchestration"
Expand All @@ -33,6 +35,7 @@ type Options struct {
UploadTimeout time.Duration
DownloadTimeout time.Duration
IterationWait time.Duration
RLevel redundancy.Level
}

// NewDefaultOptions returns new default options
Expand All @@ -51,6 +54,7 @@ func NewDefaultOptions() Options {
UploadTimeout: 60 * time.Minute,
DownloadTimeout: 60 * time.Minute,
IterationWait: 5 * time.Minute,
RLevel: redundancy.NONE,
}
}

Expand Down Expand Up @@ -90,6 +94,7 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option
c.logger.Infof("upload timeout: %s", o.UploadTimeout.String())
c.logger.Infof("download timeout: %s", o.DownloadTimeout.String())
c.logger.Infof("total duration: %s", o.Duration.String())
c.logger.Infof("redundancy level: %d", o.RLevel)

rnd := random.PseudoGenerator(o.RndSeed)

Expand Down Expand Up @@ -177,7 +182,7 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option
txCtx, txCancel = context.WithTimeout(ctx, o.UploadTimeout)

c.metrics.UploadAttempts.WithLabelValues(sizeLabel, uploader.Name()).Inc()
address, txDuration, err = test.Upload(txCtx, uploader, txData, batchID)
address, txDuration, err = test.Upload(txCtx, uploader, txData, batchID, o.RLevel)
if err != nil {
c.metrics.UploadErrors.WithLabelValues(sizeLabel, uploader.Name()).Inc()
c.logger.Errorf("upload failed for size %d: %v", contentSize, err)
Expand Down Expand Up @@ -220,7 +225,7 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option
c.metrics.DownloadAttempts.WithLabelValues(sizeLabel, downloader.Name()).Inc()

rxCtx, rxCancel = context.WithTimeout(ctx, o.DownloadTimeout)
rxData, rxDuration, err = test.Download(rxCtx, downloader, address)
rxData, rxDuration, err = test.Download(rxCtx, downloader, address, o.RLevel)
if err != nil {
c.metrics.DownloadErrors.WithLabelValues(sizeLabel, downloader.Name()).Inc()
c.logger.Errorf("download failed for size %d: %v", contentSize, err)
Expand Down
15 changes: 15 additions & 0 deletions pkg/config/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"reflect"
"time"

beeRedundancy "github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/beekeeper/pkg/beekeeper"
"github.com/ethersphere/beekeeper/pkg/check/act"
"github.com/ethersphere/beekeeper/pkg/check/balances"
Expand Down Expand Up @@ -418,6 +419,7 @@ var Checks = map[string]CheckType{
RxOnErrWait *time.Duration `yaml:"rx-on-err-wait"`
NodesSyncWait *time.Duration `yaml:"nodes-sync-wait"`
Duration *time.Duration `yaml:"duration"`
RLevel *uint8 `yaml:"r-level"`
})

if err := check.Options.Decode(checkOpts); err != nil {
Expand Down Expand Up @@ -745,6 +747,19 @@ func applyCheckConfig(global CheckGlobalConfig, local, opts any) (err error) {
ov.FieldByName(fieldName).Set(fieldValue)
}
}
case "RLevel":
if !lv.Field(i).IsNil() { // set locally
fieldValue := lv.FieldByName(fieldName).Elem()
level := uint8(fieldValue.Uint())
rLevel := beeRedundancy.Level(level)
ft, ok := ot.FieldByName(fieldName)
if ok {
v := reflect.ValueOf(rLevel)
if v.Type().AssignableTo(ft.Type) {
ov.FieldByName(fieldName).Set(v)
}
}
}
Comment on lines +750 to +762
Copy link

Copilot AI Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing validation for the redundancy level value. The configuration accepts any uint8 value (0-255), but only values 0-4 are valid according to the PR description. Values outside this range will silently create invalid redundancy.Level values. Consider adding validation in the applyCheckConfig function to ensure the value is within the valid range of 0-4.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally omitted validation in Beekeeper.
The Bee API validates redundancy levels, so invalid values (e.g., 5+) are rejected with clear errors.
This keeps Beekeeper future-proof: if Bee adds level 5 or higher, Beekeeper supports it without code changes.

default:
if lv.Field(i).IsNil() {
fmt.Printf("field %s not set, using default value\n", fieldName)
Expand Down
19 changes: 15 additions & 4 deletions pkg/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/beekeeper/pkg/bee"
"github.com/ethersphere/beekeeper/pkg/bee/api"
Expand All @@ -21,10 +22,10 @@ type test struct {
logger logging.Logger
}

func (t *test) Upload(ctx context.Context, bee *bee.Client, data []byte, batchID string) (swarm.Address, time.Duration, error) {
func (t *test) Upload(ctx context.Context, bee *bee.Client, data []byte, batchID string, rLevel redundancy.Level) (swarm.Address, time.Duration, error) {
t.logger.Infof("node %s: uploading %d bytes, batch id %s", bee.Name(), len(data), batchID)
start := time.Now()
addr, err := bee.UploadBytes(ctx, data, api.UploadOptions{Pin: false, BatchID: batchID, Direct: true})
addr, err := bee.UploadBytes(ctx, data, api.UploadOptions{Pin: false, BatchID: batchID, Direct: true, RLevel: rLevel})
if err != nil {
return swarm.ZeroAddress, 0, fmt.Errorf("upload to node %s: %w", bee.Name(), err)
}
Expand All @@ -35,11 +36,21 @@ func (t *test) Upload(ctx context.Context, bee *bee.Client, data []byte, batchID
return addr, txDuration, nil
}

func (t *test) Download(ctx context.Context, bee *bee.Client, addr swarm.Address) ([]byte, time.Duration, error) {
func (t *test) Download(ctx context.Context, bee *bee.Client, addr swarm.Address, rLevel redundancy.Level) ([]byte, time.Duration, error) {
t.logger.Infof("node %s: downloading address %s", bee.Name(), addr)

start := time.Now()
data, err := bee.DownloadBytes(ctx, addr, nil)

var downloadOpts *api.DownloadOptions
if rLevel != redundancy.NONE {
fallbackMode := true
downloadOpts = &api.DownloadOptions{
RLevel: rLevel,
RedundancyFallbackMode: &fallbackMode,
}
}

data, err := bee.DownloadBytes(ctx, addr, downloadOpts)
if err != nil {
return nil, 0, fmt.Errorf("download from node %s: %w", bee.Name(), err)
}
Expand Down