Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ checks:
max-pathname-length: 64
postage-amount: 1000
postage-depth: 17
timeout: 5m
timeout: 30m
type: manifest
networkavailability:
options:
Expand Down
2 changes: 1 addition & 1 deletion config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ checks:
max-pathname-length: 64
postage-amount: 1000
postage-depth: 17
timeout: 5m
timeout: 30m
type: manifest
ci-pingpong:
options:
Expand Down
2 changes: 1 addition & 1 deletion config/public-testnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ checks:
max-pathname-length: 64
postage-amount: 140000000
postage-depth: 17
timeout: 5m
timeout: 30m
type: manifest
pt-pss:
options:
Expand Down
6 changes: 6 additions & 0 deletions pkg/bee/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
swarmSocSignatureHeader = "Swarm-Soc-Signature"
swarmFeedIndexHeader = "Swarm-Feed-Index"
swarmFeedIndexNextHeader = "Swarm-Feed-Index-Next"
swarmIndexDocumentHeader = "Swarm-Index-Document"
swarmErrorDocumentHeader = "Swarm-Error-Document"
)

var userAgent = "beekeeper/" + beekeeper.Version
Expand Down Expand Up @@ -340,6 +342,10 @@ type UploadOptions struct {
BatchID string
Direct bool
ActHistoryAddress swarm.Address

// Dirs
IndexDocument string
ErrorDocument string
}

type DownloadOptions struct {
Expand Down
7 changes: 7 additions & 0 deletions pkg/bee/api/dirs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ func (s *DirsService) Upload(ctx context.Context, data io.Reader, size int64, o
header.Set("swarm-collection", "True")
header.Set(postageStampBatchHeader, o.BatchID)

if o.IndexDocument != "" {
header.Set(swarmIndexDocumentHeader, o.IndexDocument)
}
if o.ErrorDocument != "" {
header.Set(swarmErrorDocumentHeader, o.ErrorDocument)
}

err = s.client.requestWithHeader(ctx, http.MethodPost, "/"+apiVersion+"/bzz", header, data, &resp)

return
Expand Down
202 changes: 164 additions & 38 deletions pkg/check/manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"math/rand"
"time"

"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/beekeeper/pkg/bee"
"github.com/ethersphere/beekeeper/pkg/bee/api"
"github.com/ethersphere/beekeeper/pkg/beekeeper"
Expand Down Expand Up @@ -58,23 +59,38 @@ func NewCheck(logger logging.Logger) beekeeper.Action {
}
}

var errManifest = errors.New("manifest data mismatch")

func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts interface{}) (err error) {
o, ok := opts.(Options)
if !ok {
return fmt.Errorf("invalid options type")
}

rnd := random.PseudoGenerator(o.Seed)
clients, err := cluster.ShuffledFullNodeClients(ctx, rnd)
if err != nil {
return fmt.Errorf("node clients shuffle: %w", err)
}

c.logger.Infof("Seed: %d", o.Seed)
if len(clients) < 2 {
return fmt.Errorf("not enough nodes to run manifest check")
}
upClient := clients[0]
downClient := clients[1]

overlays, err := cluster.FlattenOverlays(ctx)
err = c.checkWithoutSubDirs(ctx, rnd, o, upClient, downClient)
if err != nil {
return err
return fmt.Errorf("check without subdirs: %w", err)
}

err = c.checkWithSubDirs(ctx, rnd, o, upClient, downClient)
if err != nil {
return fmt.Errorf("check with subdirs: %w", err)
}

return nil
}

func (c *Check) checkWithoutSubDirs(ctx context.Context, rnd *rand.Rand, o Options, upClient *bee.Client, downClient *bee.Client) error {
files, err := generateFiles(rnd, o.FilesInCollection, o.MaxPathnameLength)
if err != nil {
return err
Expand All @@ -86,55 +102,167 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int
}

tarFile := bee.NewBufferFile("", tarReader)
clients, err := cluster.NodesClients(ctx)
batchID, err := upClient.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("node %s: batch id %w", upClient.Name(), err)
}
c.logger.Infof("node %s: batch id %s", upClient.Name(), batchID)

if err := upClient.UploadCollection(ctx, &tarFile, api.UploadOptions{BatchID: batchID}); err != nil {
return fmt.Errorf("node %d: %w", 0, err)
}

for _, file := range files {
if err := c.downloadAndVerify(ctx, downClient, tarFile.Address(), &file, bee.File{}); err != nil {
return err
}
}
return nil
}

func (c *Check) checkWithSubDirs(ctx context.Context, rnd *rand.Rand, o Options, upClient *bee.Client, downClient *bee.Client) error {
privKey, err := crypto.GenerateSecp256k1Key()
if err != nil {
return err
}

sortedNodes := cluster.FullNodeNames()
node := sortedNodes[0]
signer := crypto.NewDefaultSigner(privKey)
topic, err := crypto.LegacyKeccak256([]byte("my-website"))
if err != nil {
return err
}

client := clients[node]
batchID, err := upClient.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("node %s: batch id %w", upClient.Name(), err)
}
c.logger.Infof("node %s: batch id %s", upClient.Name(), batchID)

batchID, err := client.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
rootFeedRef, err := upClient.CreateRootFeedManifest(ctx, signer, topic, api.UploadOptions{BatchID: batchID})
if err != nil {
return fmt.Errorf("node %s: batch id %w", node, err)
return err
}
c.logger.Infof("node %s: batch id %s", node, batchID)
c.logger.Infof("root feed reference: %s", rootFeedRef.Reference)
time.Sleep(3 * time.Second)

if err := client.UploadCollection(ctx, &tarFile, api.UploadOptions{BatchID: batchID}); err != nil {
return fmt.Errorf("node %d: %w", 0, err)
paths := []string{"index.html", "assets/styles/styles.css", "assets/styles/images/image.png", "error.html"}
files, err := generateFilesWithPaths(rnd, paths, int(o.MaxPathnameLength))
if err != nil {
return err
}

lastNode := sortedNodes[len(sortedNodes)-1]
try := 0
tarReader, err := tarFiles(files)
if err != nil {
return err
}
tarFile := bee.NewBufferFile("", tarReader)
if err := upClient.UploadCollection(ctx, &tarFile, api.UploadOptions{BatchID: batchID, IndexDocument: "index.html"}); err != nil {
return err
}
c.logger.Infof("collection uploaded: %s", tarFile.Address())
time.Sleep(3 * time.Second)

DOWNLOAD:
time.Sleep(5 * time.Second)
try++
if try > 5 {
return errors.New("failed getting manifest files after too many retries")
// push first version of website to the feed
ref, err := upClient.UpdateFeedWithReference(ctx, signer, topic, 0, tarFile.Address(), api.UploadOptions{BatchID: batchID})
if err != nil {
return err
}
c.logger.Infof("feed updated: %s", ref.Reference)

// download root (index.html) from the feed
err = c.downloadAndVerify(ctx, downClient, rootFeedRef.Reference, nil, files[0])
if err != nil {
return err
}

// update website files
files, err = generateFilesWithPaths(rnd, paths, int(o.MaxPathnameLength))
if err != nil {
return err
}

tarReader, err = tarFiles(files)
if err != nil {
return err
}
tarFile = bee.NewBufferFile("", tarReader)
if err := upClient.UploadCollection(ctx, &tarFile, api.UploadOptions{BatchID: batchID, IndexDocument: "index.html"}); err != nil {
return err
}
c.logger.Infof("collection uploaded: %s", tarFile.Address())
time.Sleep(3 * time.Second)

for i, file := range files {
node := clients[lastNode]
// push 2nd version of website to the feed
ref, err = upClient.UpdateFeedWithReference(ctx, signer, topic, 1, tarFile.Address(), api.UploadOptions{BatchID: batchID})
if err != nil {
return err
}
c.logger.Infof("feed updated: %s", ref.Reference)

size, hash, err := node.DownloadManifestFile(ctx, tarFile.Address(), file.Name())
// download updated index.html from the feed
err = c.downloadAndVerify(ctx, downClient, rootFeedRef.Reference, nil, files[0])
if err != nil {
return err
}

// download other paths and compare
for i := 0; i < len(files); i++ {
err = c.downloadAndVerify(ctx, downClient, tarFile.Address(), &files[i], files[0])
if err != nil {
c.logger.Infof("Node %s. Error retrieving file: %v", lastNode, err)
goto DOWNLOAD
return err
}
}
return nil
}

if !bytes.Equal(file.Hash(), hash) {
c.logger.Infof("Node %s. File %d not retrieved successfully. Uploaded size: %d Downloaded size: %d Node: %s File: %s/%s", lastNode, i, file.Size(), size, overlays[lastNode].String(), tarFile.Address().String(), file.Name())
return errManifest
// downloadAndVerify retrieves a file from the given address using the specified client.
// If the file parameter is nil, it downloads the index file in the collection.
// Then it verifies the hash of the downloaded file against the expected hash.
func (c *Check) downloadAndVerify(ctx context.Context, client *bee.Client, address swarm.Address, file *bee.File, indexFile bee.File) error {
expectedHash := indexFile.Hash()
fName := ""
if file != nil {
fName = file.Name()
expectedHash = file.Hash()
}
c.logger.Infof("downloading file: %s/%s", address, fName)

for i := 0; i < 10; i++ {
select {
case <-time.After(5 * time.Second):
_, hash, err := client.DownloadManifestFile(ctx, address, fName)
if err != nil {
c.logger.Infof("node %s: error retrieving file: %s", client.Name(), err.Error())
continue
}

c.logger.Infof("want hash: %s, got hash: %s", hex.EncodeToString(expectedHash), hex.EncodeToString(hash))
if !bytes.Equal(expectedHash, hash) {
c.logger.Infof("node %s: file hash does not match.", client.Name())
continue
}
c.logger.Infof("node %s: file retrieved successfully", client.Name())
return nil
case <-ctx.Done():
return ctx.Err()
}

c.logger.Infof("Node %s. File %d retrieved successfully. Node: %s File: %s/%s", lastNode, i, overlays[lastNode].String(), tarFile.Address().String(), file.Name())
try = 0 // reset the retry counter for the next file
}

return nil
return fmt.Errorf("failed getting manifest file after too many retries")
}

func generateFilesWithPaths(r *rand.Rand, paths []string, maxSize int) ([]bee.File, error) {
files := make([]bee.File, len(paths))
for i, path := range paths {
size := int64(r.Intn(maxSize)) + 1
file := bee.NewRandomFile(r, path, size)
err := file.CalculateHash()
if err != nil {
return nil, err
}
files[i] = file
}
return files, nil
}

func generateFiles(r *rand.Rand, filesCount int, maxPathnameLength int32) ([]bee.File, error) {
Expand Down Expand Up @@ -171,6 +299,8 @@ func tarFiles(files []bee.File) (*bytes.Buffer, error) {
var buf bytes.Buffer
tw := tar.NewWriter(&buf)

defer tw.Close()

for _, file := range files {
// create tar header and write it
hdr := &tar.Header{
Expand All @@ -193,9 +323,5 @@ func tarFiles(files []bee.File) (*bytes.Buffer, error) {
}
}

if err := tw.Close(); err != nil {
return nil, err
}

return &buf, nil
}
1 change: 1 addition & 0 deletions pkg/orchestration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Cluster interface {
FlattenSettlements(ctx context.Context) (settlements NodeGroupSettlements, err error)
FlattenTopologies(ctx context.Context) (topologies map[string]bee.Topology, err error)
FullNodeNames() (names []string)
ShuffledFullNodeClients(ctx context.Context, r *rand.Rand) ([]*bee.Client, error)
GlobalReplicationFactor(ctx context.Context, a swarm.Address) (grf int, err error)
LightNodeNames() (names []string)
Name() string
Expand Down
15 changes: 15 additions & 0 deletions pkg/orchestration/k8s/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,21 @@ func (c *Cluster) FullNodeNames() (names []string) {
return
}

// ShuffledFullNodeClients returns a list of full node clients shuffled
func (c *Cluster) ShuffledFullNodeClients(ctx context.Context, r *rand.Rand) ([]*bee.Client, error) {
var res []*bee.Client
for _, node := range c.Nodes() {
cfg := node.Config()
if cfg.FullNode && !cfg.BootnodeMode {
res = append(res, node.Client())
}
}
r.Shuffle(len(res), func(i, j int) {
res[i], res[j] = res[j], res[i]
})
return res, nil
}

// NodesClients returns map of node's clients in the cluster excluding stopped nodes
func (c *Cluster) NodesClients(ctx context.Context) (map[string]*bee.Client, error) {
clients := make(map[string]*bee.Client)
Expand Down