Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions pkg/bee/api/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (
"io"
"net/http"
"strconv"
"time"

"github.com/ethersphere/bee/v2/pkg/cac"
"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/swarm"
Expand Down Expand Up @@ -69,14 +67,8 @@ func (f *FeedService) CreateRootManifest(ctx context.Context, signer crypto.Sign
return &response, nil
}

// UpdateWithReference updates a feed with a reference
func (f *FeedService) UpdateWithReference(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, addr swarm.Address, o UploadOptions) (*SocResponse, error) {
ts := make([]byte, 8)
binary.BigEndian.PutUint64(ts, uint64(time.Now().Unix()))
ch, err := cac.New(append(append([]byte{}, ts...), addr.Bytes()...))
if err != nil {
return nil, err
}
// UpdateWithRootChunk updates a feed with a root chunk
func (f *FeedService) UpdateWithRootChunk(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, ch swarm.Chunk, o UploadOptions) (*SocResponse, error) {
ownerHex, err := ownerFromSigner(signer)
if err != nil {
return nil, err
Expand Down
10 changes: 5 additions & 5 deletions pkg/bee/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,14 +910,14 @@ func (c *Client) UploadCollection(ctx context.Context, f *File, o api.UploadOpti
func (c *Client) DownloadManifestFile(ctx context.Context, a swarm.Address, path string) (size int64, hash []byte, err error) {
r, err := c.api.Dirs.Download(ctx, a, path)
if err != nil {
return 0, nil, fmt.Errorf("download manifest file %s: %w", path, err)
return 0, nil, fmt.Errorf("download manifest file `%s`: %w", path, err)
}
defer r.Close()

h := fileHasher()
size, err = io.Copy(h, r)
if err != nil {
return 0, nil, fmt.Errorf("download manifest file %s: %w", path, err)
return 0, nil, fmt.Errorf("download manifest file `%s`: %w", path, err)
}

return size, h.Sum(nil), nil
Expand Down Expand Up @@ -1009,9 +1009,9 @@ func (c *Client) CreateRootFeedManifest(ctx context.Context, signer crypto.Signe
return c.api.Feed.CreateRootManifest(ctx, signer, topic, o)
}

// UpdateFeedWithReference updates a feed with a reference
func (c *Client) UpdateFeedWithReference(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, addr swarm.Address, o api.UploadOptions) (*api.SocResponse, error) {
return c.api.Feed.UpdateWithReference(ctx, signer, topic, i, addr, o)
// UpdateFeedWithRootChunk updates a feed with a root chunk
func (c *Client) UpdateFeedWithRootChunk(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, ch swarm.Chunk, o api.UploadOptions) (*api.SocResponse, error) {
return c.api.Feed.UpdateWithRootChunk(ctx, signer, topic, i, ch, o)
}

// FindFeedUpdate finds the latest update for a feed
Expand Down
84 changes: 55 additions & 29 deletions pkg/check/feed/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

"github.com/ethersphere/bee/v2/pkg/cac"
"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/beekeeper/pkg/bee"
Expand Down Expand Up @@ -58,9 +59,18 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int

if o.RootRef != "" {
c.logger.Infof("running availability check")
return c.checkAvailability(ctx, cluster, o)
if err := c.checkAvailability(ctx, cluster, o); err != nil {
return fmt.Errorf("availability check: %w", err)
}
return nil
}

c.logger.Infof("running feed check")
if err := c.feedCheck(ctx, cluster, o); err != nil {
return fmt.Errorf("feed check: %w", err)
}
return c.feedCheck(ctx, cluster, o)

return nil
}

func (c *Check) checkAvailability(ctx context.Context, cluster orchestration.Cluster, o Options) error {
Expand All @@ -69,70 +79,71 @@ func (c *Check) checkAvailability(ctx context.Context, cluster orchestration.Clu
return fmt.Errorf("invalid root ref: %w", err)
}

nodeNames := cluster.FullNodeNames()
nodeName := nodeNames[0]
clients, err := cluster.NodesClients(ctx)
clients, err := cluster.ShuffledFullNodeClients(ctx, random.PseudoGenerator(time.Now().UnixNano()))
if err != nil {
return err
return fmt.Errorf("node clients: %w", err)
}

if len(clients) < 1 {
return fmt.Errorf("availability check requires at least 1 full node")
}

client := clients[nodeName]
_, _, err = client.DownloadFile(ctx, ref, nil)
_, _, err = clients[0].DownloadFile(ctx, ref, nil)
if err != nil {
return err
return fmt.Errorf("download root feed: %w", err)
}

return nil
}

// feedCheck creates a root feed manifest, makes a series of updates to the feed
// and verifies that the updates are retrievable via another node.
func (c *Check) feedCheck(ctx context.Context, cluster orchestration.Cluster, o Options) error {
rnd := random.PseudoGenerator(time.Now().UnixNano())
names := cluster.FullNodeNames()
perm := rnd.Perm(len(names))

if len(names) < 2 {
return fmt.Errorf("not enough nodes to run feed check")
clients, err := cluster.ShuffledFullNodeClients(ctx, random.PseudoGenerator(time.Now().UnixNano()))
if err != nil {
return fmt.Errorf("node clients: %w", err)
}

clients, err := cluster.NodesClients(ctx)
if err != nil {
return err
if len(clients) < 2 {
return fmt.Errorf("feed check requires at least 2 full nodes")
}
upClient := clients[names[perm[0]]]
downClient := clients[names[perm[1]]]

upClient := clients[0]
downClient := clients[1]

c.logger.Infof("upload client: %s", upClient.Name())

batchID, err := upClient.GetOrCreateMutableBatch(ctx, o.PostageTTL, o.PostageDepth, o.PostageLabel)
if err != nil {
return err
return fmt.Errorf("get or create mutable batch: %w", err)
}

privKey, err := crypto.GenerateSecp256k1Key()
if err != nil {
return err
return fmt.Errorf("generate private key: %w", err)
}

signer := crypto.NewDefaultSigner(privKey)
topic, err := crypto.LegacyKeccak256([]byte("my-topic"))
if err != nil {
return err
return fmt.Errorf("topic hash: %w", err)
}

// create root
createManifestRes, err := upClient.CreateRootFeedManifest(ctx, signer, topic, api.UploadOptions{BatchID: batchID})
if err != nil {
return err
}

c.logger.Infof("node %s: manifest created", upClient.Name())
c.logger.Infof("reference: %s", createManifestRes.Reference)
c.logger.Infof("owner: %s", createManifestRes.Owner)
c.logger.Infof("topic: %s", createManifestRes.Topic)

// make updates
for i := 0; i < o.NUpdates; i++ {
for i := range o.NUpdates {
time.Sleep(3 * time.Second)

data := fmt.Sprintf("update-%d", i)
fName := fmt.Sprintf("file-%d", i)
file := bee.NewBufferFile(fName, bytes.NewBuffer([]byte(data)))
Expand All @@ -141,13 +152,26 @@ func (c *Check) feedCheck(ctx context.Context, cluster orchestration.Cluster, o
Direct: true,
})
if err != nil {
return err
return fmt.Errorf("upload file `%s`: %w", fName, err)
}

// download root chunk of file
rChData, err := upClient.DownloadChunk(ctx, file.Address(), "", nil)
if err != nil {
return fmt.Errorf("download root chunk: %w", err)
}
ref := file.Address()
socRes, err := upClient.UpdateFeedWithReference(ctx, signer, topic, uint64(i), ref, api.UploadOptions{BatchID: batchID})

// make chunk from byte array rChData
rCh, err := cac.NewWithDataSpan(rChData)
if err != nil {
return fmt.Errorf("create chunk: %w", err)
}

socRes, err := upClient.UpdateFeedWithRootChunk(ctx, signer, topic, uint64(i), rCh, api.UploadOptions{BatchID: batchID})
if err != nil {
return err
return fmt.Errorf("update feed with root chunk: %w", err)
}

c.logger.Infof("node %s: feed updated", upClient.Name())
c.logger.Infof("soc reference: %s", socRes.Reference)
c.logger.Infof("wrapped reference: %s", file.Address())
Expand All @@ -159,7 +183,7 @@ func (c *Check) feedCheck(ctx context.Context, cluster orchestration.Cluster, o
c.logger.Infof("download client: %s", downClient.Name())
update, err := downClient.FindFeedUpdate(ctx, signer, topic, nil)
if err != nil {
return err
return fmt.Errorf("find feed update: %w", err)
}

c.logger.Infof("node %s: feed update found", downClient.Name())
Expand All @@ -175,9 +199,11 @@ func (c *Check) feedCheck(ctx context.Context, cluster orchestration.Cluster, o
if err != nil {
return fmt.Errorf("download root feed: %w", err)
}

lastUpdateData := fmt.Sprintf("update-%d", o.NUpdates-1)
if string(d) != lastUpdateData {
return fmt.Errorf("expected file content to be %s, got %s", lastUpdateData, string(d))
}

return nil
}
Loading