Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ checks:
max-pathname-length: 64
postage-amount: 1000
postage-depth: 17
timeout: 5m
timeout: 30m
type: manifest
networkavailability:
options:
Expand Down
2 changes: 1 addition & 1 deletion config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ checks:
max-pathname-length: 64
postage-amount: 1000
postage-depth: 17
timeout: 5m
timeout: 30m
type: manifest
ci-pingpong:
options:
Expand Down
2 changes: 1 addition & 1 deletion config/public-testnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ checks:
max-pathname-length: 64
postage-amount: 140000000
postage-depth: 17
timeout: 5m
timeout: 30m
type: manifest
pt-pss:
options:
Expand Down
6 changes: 6 additions & 0 deletions pkg/bee/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
swarmSocSignatureHeader = "Swarm-Soc-Signature"
swarmFeedIndexHeader = "Swarm-Feed-Index"
swarmFeedIndexNextHeader = "Swarm-Feed-Index-Next"
swarmIndexDocumentHeader = "Swarm-Index-Document"
swarmErrorDocumentHeader = "Swarm-Error-Document"
)

var userAgent = "beekeeper/" + beekeeper.Version
Expand Down Expand Up @@ -340,6 +342,10 @@ type UploadOptions struct {
BatchID string
Direct bool
ActHistoryAddress swarm.Address

// Dirs
IndexDocument string
ErrorDocument string
}

type DownloadOptions struct {
Expand Down
7 changes: 7 additions & 0 deletions pkg/bee/api/dirs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ func (s *DirsService) Upload(ctx context.Context, data io.Reader, size int64, o
header.Set("swarm-collection", "True")
header.Set(postageStampBatchHeader, o.BatchID)

if o.IndexDocument != "" {
header.Set(swarmIndexDocumentHeader, o.IndexDocument)
}
if o.ErrorDocument != "" {
header.Set(swarmErrorDocumentHeader, o.ErrorDocument)
}

err = s.client.requestWithHeader(ctx, http.MethodPost, "/"+apiVersion+"/bzz", header, data, &resp)

return
Expand Down
195 changes: 162 additions & 33 deletions pkg/check/manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"math/rand"
"time"

"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/beekeeper/pkg/bee"
"github.com/ethersphere/beekeeper/pkg/bee/api"
"github.com/ethersphere/beekeeper/pkg/beekeeper"
Expand Down Expand Up @@ -67,14 +69,31 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int
}

rnd := random.PseudoGenerator(o.Seed)
clients, err := cluster.ShuffledFullNodeClients(ctx, rnd)
if err != nil {
return fmt.Errorf("node clients shuffle: %w", err)
}

c.logger.Infof("Seed: %d", o.Seed)
if len(clients) < 2 {
return fmt.Errorf("not enough nodes to run feed check")
}
upClient := clients[0]
downClient := clients[1]

overlays, err := cluster.FlattenOverlays(ctx)
err = c.checkWithoutSubDirs(ctx, rnd, o, upClient, downClient)
if err != nil {
return err
return fmt.Errorf("check without subdirs: %w", err)
}

err = c.checkWithSubDirs(ctx, rnd, o, upClient, downClient)
if err != nil {
return fmt.Errorf("check with subdirs: %w", err)
}

return nil
}

func (c *Check) checkWithoutSubDirs(ctx context.Context, rnd *rand.Rand, o Options, upClient *bee.Client, downClient *bee.Client) error {
files, err := generateFiles(rnd, o.FilesInCollection, o.MaxPathnameLength)
if err != nil {
return err
Expand All @@ -86,57 +105,169 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int
}

tarFile := bee.NewBufferFile("", tarReader)
clients, err := cluster.NodesClients(ctx)
batchID, err := upClient.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("node %s: batch id %w", upClient.Name(), err)
}
c.logger.Infof("node %s: batch id %s", upClient.Name(), batchID)

if err := upClient.UploadCollection(ctx, &tarFile, api.UploadOptions{BatchID: batchID}); err != nil {
return fmt.Errorf("node %d: %w", 0, err)
}

for _, file := range files {
if err := c.download(downClient, tarFile.Address(), &file, bee.File{}); err != nil {
return err
}
}
return nil
}

func (c *Check) checkWithSubDirs(ctx context.Context, rnd *rand.Rand, o Options, upClient *bee.Client, downClient *bee.Client) error {
privKey, err := crypto.GenerateSecp256k1Key()
if err != nil {
return err
}

sortedNodes := cluster.FullNodeNames()
node := sortedNodes[0]
signer := crypto.NewDefaultSigner(privKey)
topic, err := crypto.LegacyKeccak256([]byte("my-website"))
if err != nil {
return err
}

client := clients[node]
batchID, err := upClient.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("node %s: batch id %w", upClient.Name(), err)
}
c.logger.Infof("node %s: batch id %s", upClient.Name(), batchID)

batchID, err := client.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
rootFeedRef, err := upClient.CreateRootFeedManifest(ctx, signer, topic, api.UploadOptions{BatchID: batchID})
if err != nil {
return fmt.Errorf("node %s: batch id %w", node, err)
return err
}
c.logger.Infof("node %s: batch id %s", node, batchID)
c.logger.Infof("root feed reference: %s", rootFeedRef.Reference)
time.Sleep(3 * time.Second)

if err := client.UploadCollection(ctx, &tarFile, api.UploadOptions{BatchID: batchID}); err != nil {
return fmt.Errorf("node %d: %w", 0, err)
paths := []string{"index.html", "assets/styles/styles.css", "assets/styles/images/image.png", "error.html"}
files, err := generateFilesWithPaths(rnd, paths, int(o.MaxPathnameLength))
if err != nil {
return err
}

tarReader, err := tarFiles(files)
if err != nil {
return err
}
tarFile := bee.NewBufferFile("", tarReader)
if err := upClient.UploadCollection(ctx, &tarFile, api.UploadOptions{BatchID: batchID, IndexDocument: "index.html"}); err != nil {
return err
}
c.logger.Infof("collection uploaded: %s", tarFile.Address())
time.Sleep(3 * time.Second)

// push first version of website to the feed
ref, err := upClient.UpdateFeedWithReference(ctx, signer, topic, 0, tarFile.Address(), api.UploadOptions{BatchID: batchID})
if err != nil {
return err
}
c.logger.Infof("feed updated: %s", ref.Reference)

// download root (index.html) from the feed
err = c.download(downClient, rootFeedRef.Reference, nil, files[0])
if err != nil {
return err
}

// update website files
files, err = generateFilesWithPaths(rnd, paths, int(o.MaxPathnameLength))
if err != nil {
return err
}

lastNode := sortedNodes[len(sortedNodes)-1]
try := 0
tarReader, err = tarFiles(files)
if err != nil {
return err
}
tarFile = bee.NewBufferFile("", tarReader)
if err := upClient.UploadCollection(ctx, &tarFile, api.UploadOptions{BatchID: batchID, IndexDocument: "index.html"}); err != nil {
return err
}
time.Sleep(3 * time.Second)

DOWNLOAD:
time.Sleep(5 * time.Second)
try++
if try > 5 {
return errors.New("failed getting manifest files after too many retries")
// push 2nd version of website to the feed
ref, err = upClient.UpdateFeedWithReference(ctx, signer, topic, 1, tarFile.Address(), api.UploadOptions{BatchID: batchID})
if err != nil {
return err
}
c.logger.Infof("feed updated: %s", ref.Reference)

for i, file := range files {
node := clients[lastNode]
// download updated index.html from the feed
err = c.download(downClient, rootFeedRef.Reference, nil, files[0])
if err != nil {
return err
}

size, hash, err := node.DownloadManifestFile(ctx, tarFile.Address(), file.Name())
// download other paths and compare
for i := 0; i < len(files); i++ {
err = c.download(downClient, tarFile.Address(), &files[i], files[0])
if err != nil {
c.logger.Infof("Node %s. Error retrieving file: %v", lastNode, err)
goto DOWNLOAD
return err
}
}
return nil
}

if !bytes.Equal(file.Hash(), hash) {
c.logger.Infof("Node %s. File %d not retrieved successfully. Uploaded size: %d Downloaded size: %d Node: %s File: %s/%s", lastNode, i, file.Size(), size, overlays[lastNode].String(), tarFile.Address().String(), file.Name())
return errManifest
// download retrieves a file from the given address using the specified client.
// If the file parameter is nil, it downloads the index file in the collection.
func (c *Check) download(client *bee.Client, address swarm.Address, file *bee.File, indexFile bee.File) error {
fName := ""
if file != nil {
fName = file.Name()
}
c.logger.Infof("downloading file: %s/%s", address, fName)

var hash []byte
for i := 0; i < 5; i++ {
_, h, err := client.DownloadManifestFile(context.Background(), address, fName)
hash = h
if err == nil {
break
}
c.logger.Infof("node %s. Error retrieving file: %s", client.Name(), err.Error())
time.Sleep(5 * time.Second)
}
if hash == nil {
return fmt.Errorf("failed getting manifest files after too many retries")
}

c.logger.Infof("Node %s. File %d retrieved successfully. Node: %s File: %s/%s", lastNode, i, overlays[lastNode].String(), tarFile.Address().String(), file.Name())
try = 0 // reset the retry counter for the next file
expectedHash := indexFile.Hash()
if file != nil {
expectedHash = file.Hash()
}

if !bytes.Equal(expectedHash, hash) {
c.logger.Infof("node %s: file hash does not match", client.Name())
return errManifest
}

c.logger.Infof("node %s: file retrieved successfully", client.Name())
return nil
}

func generateFilesWithPaths(r *rand.Rand, paths []string, maxSize int) ([]bee.File, error) {
files := make([]bee.File, len(paths))
for i, path := range paths {
size := int64(r.Intn(maxSize)) + 1
file := bee.NewRandomFile(r, path, size)
err := file.CalculateHash()
if err != nil {
return nil, err
}
files[i] = file
}
return files, nil
}

func generateFiles(r *rand.Rand, filesCount int, maxPathnameLength int32) ([]bee.File, error) {
files := make([]bee.File, filesCount)

Expand Down Expand Up @@ -171,6 +302,8 @@ func tarFiles(files []bee.File) (*bytes.Buffer, error) {
var buf bytes.Buffer
tw := tar.NewWriter(&buf)

defer tw.Close()

for _, file := range files {
// create tar header and write it
hdr := &tar.Header{
Expand All @@ -193,9 +326,5 @@ func tarFiles(files []bee.File) (*bytes.Buffer, error) {
}
}

if err := tw.Close(); err != nil {
return nil, err
}

return &buf, nil
}
1 change: 1 addition & 0 deletions pkg/orchestration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Cluster interface {
FlattenSettlements(ctx context.Context) (settlements NodeGroupSettlements, err error)
FlattenTopologies(ctx context.Context) (topologies map[string]bee.Topology, err error)
FullNodeNames() (names []string)
ShuffledFullNodeClients(ctx context.Context, r *rand.Rand) ([]*bee.Client, error)
GlobalReplicationFactor(ctx context.Context, a swarm.Address) (grf int, err error)
LightNodeNames() (names []string)
Name() string
Expand Down
16 changes: 16 additions & 0 deletions pkg/orchestration/k8s/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,22 @@ func (c *Cluster) FullNodeNames() (names []string) {
return
}

// ShuffledFullNodeClients returns a list of full node clients shuffled
func (c *Cluster) ShuffledFullNodeClients(ctx context.Context, r *rand.Rand) ([]*bee.Client, error) {
cls, err := c.NodesClients(ctx)
if err != nil {
return nil, err
}
var res []*bee.Client
for _, cl := range cls {
res = append(res, cl)
}
r.Shuffle(len(res), func(i, j int) {
res[i], res[j] = res[j], res[i]
})
return res, nil
}

// NodesClients returns map of node's clients in the cluster excluding stopped nodes
func (c *Cluster) NodesClients(ctx context.Context) (map[string]*bee.Client, error) {
clients := make(map[string]*bee.Client)
Expand Down
Loading