diff --git a/pkg/bee/api/feed.go b/pkg/bee/api/feed.go index 17096c01..0e576771 100644 --- a/pkg/bee/api/feed.go +++ b/pkg/bee/api/feed.go @@ -8,9 +8,7 @@ import ( "io" "net/http" "strconv" - "time" - "github.com/ethersphere/bee/v2/pkg/cac" "github.com/ethersphere/bee/v2/pkg/crypto" "github.com/ethersphere/bee/v2/pkg/soc" "github.com/ethersphere/bee/v2/pkg/swarm" @@ -69,14 +67,8 @@ func (f *FeedService) CreateRootManifest(ctx context.Context, signer crypto.Sign return &response, nil } -// UpdateWithReference updates a feed with a reference -func (f *FeedService) UpdateWithReference(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, addr swarm.Address, o UploadOptions) (*SocResponse, error) { - ts := make([]byte, 8) - binary.BigEndian.PutUint64(ts, uint64(time.Now().Unix())) - ch, err := cac.New(append(append([]byte{}, ts...), addr.Bytes()...)) - if err != nil { - return nil, err - } +// UpdateWithRootChunk updates a feed with a root chunk +func (f *FeedService) UpdateWithRootChunk(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, ch swarm.Chunk, o UploadOptions) (*SocResponse, error) { ownerHex, err := ownerFromSigner(signer) if err != nil { return nil, err diff --git a/pkg/bee/client.go b/pkg/bee/client.go index 25df7e62..1d4045a7 100644 --- a/pkg/bee/client.go +++ b/pkg/bee/client.go @@ -910,14 +910,14 @@ func (c *Client) UploadCollection(ctx context.Context, f *File, o api.UploadOpti func (c *Client) DownloadManifestFile(ctx context.Context, a swarm.Address, path string) (size int64, hash []byte, err error) { r, err := c.api.Dirs.Download(ctx, a, path) if err != nil { - return 0, nil, fmt.Errorf("download manifest file %s: %w", path, err) + return 0, nil, fmt.Errorf("download manifest file `%s`: %w", path, err) } defer r.Close() h := fileHasher() size, err = io.Copy(h, r) if err != nil { - return 0, nil, fmt.Errorf("download manifest file %s: %w", path, err) + return 0, nil, fmt.Errorf("download manifest file `%s`: %w", path, err) } return size, h.Sum(nil), nil @@ -1009,9 +1009,9 @@ func (c *Client) CreateRootFeedManifest(ctx context.Context, signer crypto.Signe return c.api.Feed.CreateRootManifest(ctx, signer, topic, o) } -// UpdateFeedWithReference updates a feed with a reference -func (c *Client) UpdateFeedWithReference(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, addr swarm.Address, o api.UploadOptions) (*api.SocResponse, error) { - return c.api.Feed.UpdateWithReference(ctx, signer, topic, i, addr, o) +// UpdateFeedWithRootChunk updates a feed with a root chunk +func (c *Client) UpdateFeedWithRootChunk(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, ch swarm.Chunk, o api.UploadOptions) (*api.SocResponse, error) { + return c.api.Feed.UpdateWithRootChunk(ctx, signer, topic, i, ch, o) } // FindFeedUpdate finds the latest update for a feed diff --git a/pkg/check/feed/feed.go b/pkg/check/feed/feed.go index ec3f3389..94f2caa4 100644 --- a/pkg/check/feed/feed.go +++ b/pkg/check/feed/feed.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "github.com/ethersphere/bee/v2/pkg/cac" "github.com/ethersphere/bee/v2/pkg/crypto" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/ethersphere/beekeeper/pkg/bee" @@ -58,9 +59,18 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int if o.RootRef != "" { c.logger.Infof("running availability check") - return c.checkAvailability(ctx, cluster, o) + if err := c.checkAvailability(ctx, cluster, o); err != nil { + return fmt.Errorf("availability check: %w", err) + } + return nil + } + + c.logger.Infof("running feed check") + if err := c.feedCheck(ctx, cluster, o); err != nil { + return fmt.Errorf("feed check: %w", err) } - return c.feedCheck(ctx, cluster, o) + + return nil } func (c *Check) checkAvailability(ctx context.Context, cluster orchestration.Cluster, o Options) error { @@ -69,55 +79,54 @@ func (c *Check) checkAvailability(ctx context.Context, cluster orchestration.Clu return fmt.Errorf("invalid root ref: %w", err) } - nodeNames := cluster.FullNodeNames() - nodeName := nodeNames[0] - clients, err := cluster.NodesClients(ctx) + clients, err := cluster.ShuffledFullNodeClients(ctx, random.PseudoGenerator(time.Now().UnixNano())) if err != nil { - return err + return fmt.Errorf("node clients: %w", err) + } + + if len(clients) < 1 { + return fmt.Errorf("availability check requires at least 1 full node") } - client := clients[nodeName] - _, _, err = client.DownloadFile(ctx, ref, nil) + _, _, err = clients[0].DownloadFile(ctx, ref, nil) if err != nil { - return err + return fmt.Errorf("download root feed: %w", err) } + return nil } // feedCheck creates a root feed manifest, makes a series of updates to the feed // and verifies that the updates are retrievable via another node. func (c *Check) feedCheck(ctx context.Context, cluster orchestration.Cluster, o Options) error { - rnd := random.PseudoGenerator(time.Now().UnixNano()) - names := cluster.FullNodeNames() - perm := rnd.Perm(len(names)) - - if len(names) < 2 { - return fmt.Errorf("not enough nodes to run feed check") + clients, err := cluster.ShuffledFullNodeClients(ctx, random.PseudoGenerator(time.Now().UnixNano())) + if err != nil { + return fmt.Errorf("node clients: %w", err) } - clients, err := cluster.NodesClients(ctx) - if err != nil { - return err + if len(clients) < 2 { + return fmt.Errorf("feed check requires at least 2 full nodes") } - upClient := clients[names[perm[0]]] - downClient := clients[names[perm[1]]] + + upClient := clients[0] + downClient := clients[1] c.logger.Infof("upload client: %s", upClient.Name()) batchID, err := upClient.GetOrCreateMutableBatch(ctx, o.PostageTTL, o.PostageDepth, o.PostageLabel) if err != nil { - return err + return fmt.Errorf("get or create mutable batch: %w", err) } privKey, err := crypto.GenerateSecp256k1Key() if err != nil { - return err + return fmt.Errorf("generate private key: %w", err) } signer := crypto.NewDefaultSigner(privKey) topic, err := crypto.LegacyKeccak256([]byte("my-topic")) if err != nil { - return err + return fmt.Errorf("topic hash: %w", err) } // create root @@ -125,14 +134,16 @@ func (c *Check) feedCheck(ctx context.Context, cluster orchestration.Cluster, o if err != nil { return err } + c.logger.Infof("node %s: manifest created", upClient.Name()) c.logger.Infof("reference: %s", createManifestRes.Reference) c.logger.Infof("owner: %s", createManifestRes.Owner) c.logger.Infof("topic: %s", createManifestRes.Topic) // make updates - for i := 0; i < o.NUpdates; i++ { + for i := range o.NUpdates { time.Sleep(3 * time.Second) + data := fmt.Sprintf("update-%d", i) fName := fmt.Sprintf("file-%d", i) file := bee.NewBufferFile(fName, bytes.NewBuffer([]byte(data))) @@ -141,13 +152,26 @@ func (c *Check) feedCheck(ctx context.Context, cluster orchestration.Cluster, o Direct: true, }) if err != nil { - return err + return fmt.Errorf("upload file `%s`: %w", fName, err) + } + + // download root chunk of file + rChData, err := upClient.DownloadChunk(ctx, file.Address(), "", nil) + if err != nil { + return fmt.Errorf("download root chunk: %w", err) } - ref := file.Address() - socRes, err := upClient.UpdateFeedWithReference(ctx, signer, topic, uint64(i), ref, api.UploadOptions{BatchID: batchID}) + + // make chunk from byte array rChData + rCh, err := cac.NewWithDataSpan(rChData) + if err != nil { + return fmt.Errorf("create chunk: %w", err) + } + + socRes, err := upClient.UpdateFeedWithRootChunk(ctx, signer, topic, uint64(i), rCh, api.UploadOptions{BatchID: batchID}) if err != nil { - return err + return fmt.Errorf("update feed with root chunk: %w", err) } + c.logger.Infof("node %s: feed updated", upClient.Name()) c.logger.Infof("soc reference: %s", socRes.Reference) c.logger.Infof("wrapped reference: %s", file.Address()) @@ -159,7 +183,7 @@ func (c *Check) feedCheck(ctx context.Context, cluster orchestration.Cluster, o c.logger.Infof("download client: %s", downClient.Name()) update, err := downClient.FindFeedUpdate(ctx, signer, topic, nil) if err != nil { - return err + return fmt.Errorf("find feed update: %w", err) } c.logger.Infof("node %s: feed update found", downClient.Name()) @@ -175,9 +199,11 @@ func (c *Check) feedCheck(ctx context.Context, cluster orchestration.Cluster, o if err != nil { return fmt.Errorf("download root feed: %w", err) } + lastUpdateData := fmt.Sprintf("update-%d", o.NUpdates-1) if string(d) != lastUpdateData { return fmt.Errorf("expected file content to be %s, got %s", lastUpdateData, string(d)) } + return nil } diff --git a/pkg/check/manifest/manifest.go b/pkg/check/manifest/manifest.go index bb787f75..eef9b07a 100644 --- a/pkg/check/manifest/manifest.go +++ b/pkg/check/manifest/manifest.go @@ -10,6 +10,7 @@ import ( "math/rand" "time" + "github.com/ethersphere/bee/v2/pkg/cac" "github.com/ethersphere/bee/v2/pkg/crypto" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/ethersphere/beekeeper/pkg/bee" @@ -93,12 +94,12 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int func (c *Check) checkWithoutSubDirs(ctx context.Context, rnd *rand.Rand, o Options, upClient *bee.Client, downClient *bee.Client) error { files, err := generateFiles(rnd, o.FilesInCollection, o.MaxPathnameLength) if err != nil { - return err + return fmt.Errorf("generate files: %w", err) } tarReader, err := tarFiles(files) if err != nil { - return err + return fmt.Errorf("tar files: %w", err) } tarFile := bee.NewBufferFile("", tarReader) @@ -114,7 +115,7 @@ func (c *Check) checkWithoutSubDirs(ctx context.Context, rnd *rand.Rand, o Optio for _, file := range files { if err := c.downloadAndVerify(ctx, downClient, tarFile.Address(), &file, bee.File{}); err != nil { - return err + return fmt.Errorf("download and verify file: %w", err) } } return nil @@ -123,13 +124,13 @@ func (c *Check) checkWithoutSubDirs(ctx context.Context, rnd *rand.Rand, o Optio func (c *Check) checkWithSubDirs(ctx context.Context, rnd *rand.Rand, o Options, upClient *bee.Client, downClient *bee.Client) error { privKey, err := crypto.GenerateSecp256k1Key() if err != nil { - return err + return fmt.Errorf("generate private key: %w", err) } signer := crypto.NewDefaultSigner(privKey) topic, err := crypto.LegacyKeccak256([]byte("my-website")) if err != nil { - return err + return fmt.Errorf("topic: %w", err) } batchID, err := upClient.GetOrCreateMutableBatch(ctx, o.PostageTTL, o.PostageDepth, o.PostageLabel) @@ -140,78 +141,108 @@ func (c *Check) checkWithSubDirs(ctx context.Context, rnd *rand.Rand, o Options, rootFeedRef, err := upClient.CreateRootFeedManifest(ctx, signer, topic, api.UploadOptions{BatchID: batchID}) if err != nil { - return err + return fmt.Errorf("create root feed manifest: %w", err) } c.logger.Infof("root feed reference: %s", rootFeedRef.Reference) + time.Sleep(3 * time.Second) paths := []string{"index.html", "assets/styles/styles.css", "assets/styles/images/image.png", "error.html"} files, err := generateFilesWithPaths(rnd, paths, int(o.MaxPathnameLength)) if err != nil { - return err + return fmt.Errorf("generate files with paths: %w", err) } tarReader, err := tarFiles(files) if err != nil { - return err + return fmt.Errorf("tar initial files: %w", err) } + tarFile := bee.NewBufferFile("", tarReader) if err := upClient.UploadCollection(ctx, &tarFile, api.UploadOptions{BatchID: batchID, IndexDocument: "index.html"}); err != nil { - return err + return fmt.Errorf("upload initial collection: %w", err) } c.logger.Infof("collection uploaded: %s", tarFile.Address()) + time.Sleep(3 * time.Second) + rChData, err := upClient.DownloadChunk(ctx, tarFile.Address(), "", nil) + if err != nil { + return fmt.Errorf("download chunk: %w", err) + } + + // make chunk from byte array rChData + rCh, err := cac.NewWithDataSpan(rChData) + if err != nil { + return fmt.Errorf("create chunk from data: %w", err) + } + c.logger.Infof("rChData downloaded: chunk data %v bytes", len(rChData)) + // push first version of website to the feed - ref, err := upClient.UpdateFeedWithReference(ctx, signer, topic, 0, tarFile.Address(), api.UploadOptions{BatchID: batchID}) + ref, err := upClient.UpdateFeedWithRootChunk(ctx, signer, topic, 0, rCh, api.UploadOptions{BatchID: batchID}) if err != nil { - return err + return fmt.Errorf("update feed with root chunk: %w", err) } c.logger.Infof("feed updated: %s", ref.Reference) // download root (index.html) from the feed err = c.downloadAndVerify(ctx, downClient, rootFeedRef.Reference, nil, files[0]) if err != nil { - return err + return fmt.Errorf("download and verify initial index document: %w", err) } // update website files files, err = generateFilesWithPaths(rnd, paths, int(o.MaxPathnameLength)) if err != nil { - return err + return fmt.Errorf("generate files with paths: %w", err) } tarReader, err = tarFiles(files) if err != nil { - return err + return fmt.Errorf("tar updated files: %w", err) } + tarFile = bee.NewBufferFile("", tarReader) if err := upClient.UploadCollection(ctx, &tarFile, api.UploadOptions{BatchID: batchID, IndexDocument: "index.html"}); err != nil { return err } c.logger.Infof("collection uploaded: %s", tarFile.Address()) + time.Sleep(3 * time.Second) - // push 2nd version of website to the feed - ref, err = upClient.UpdateFeedWithReference(ctx, signer, topic, 1, tarFile.Address(), api.UploadOptions{BatchID: batchID}) + // Download Root Chunk of the new collection + rChData, err = upClient.DownloadChunk(ctx, tarFile.Address(), "", nil) if err != nil { return err } + + rCh, err = cac.NewWithDataSpan(rChData) + if err != nil { + return fmt.Errorf("create chunk from data: %w", err) + } + c.logger.Infof("feed root chunk downloaded: %d bytes", len(rChData)) + + // push 2nd version of website to the feed + ref, err = upClient.UpdateFeedWithRootChunk(ctx, signer, topic, 1, rCh, api.UploadOptions{BatchID: batchID}) + if err != nil { + return fmt.Errorf("update feed with root chunk: %w", err) + } c.logger.Infof("feed updated: %s", ref.Reference) // download updated index.html from the feed err = c.downloadAndVerify(ctx, downClient, rootFeedRef.Reference, nil, files[0]) if err != nil { - return err + return fmt.Errorf("download and verify updated index document: %w", err) } // download other paths and compare for i := 0; i < len(files); i++ { err = c.downloadAndVerify(ctx, downClient, tarFile.Address(), &files[i], files[0]) if err != nil { - return err + return fmt.Errorf("download and verify file: %w", err) } } + return nil } @@ -248,7 +279,7 @@ func (c *Check) downloadAndVerify(ctx context.Context, client *bee.Client, addre } } - return fmt.Errorf("failed getting manifest file after too many retries") + return fmt.Errorf("failed getting manifest file '%s' after too many retries", fName) } func generateFilesWithPaths(r *rand.Rand, paths []string, maxSize int) ([]bee.File, error) {