Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions pkg/bee/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"errors"
"fmt"
"io"
"math"
"math/big"
"net/http"
"net/url"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -339,6 +341,41 @@ func (c *Client) Ping(ctx context.Context, node swarm.Address) (rtt string, err
return r.RTT, nil
}

// ClosestPeer returns the address of the peer closest to the node in the provided bin.
func (c *Client) ClosestPeer(ctx context.Context, binId uint8, skipList []swarm.Address) (swarm.Address, error) {
t, err := c.Topology(ctx)
if err != nil {
return swarm.ZeroAddress, err
}

bin := t.Bins[fmt.Sprintf("bin_%d", binId)]
if len(bin.ConnectedPeers) == 0 {
return swarm.ZeroAddress, nil
}

overlay, err := c.Overlay(ctx)
if err != nil {
return swarm.ZeroAddress, err
}

closest := swarm.ZeroAddress
minProx := uint8(math.MaxUint8)
for _, peer := range bin.ConnectedPeers {
skip := slices.ContainsFunc(skipList, func(addr swarm.Address) bool {
return addr.Equal(peer.Address)
})
if skip {
continue
}
prox := swarm.Proximity(overlay.Bytes(), peer.Address.Bytes())
if prox < minProx {
minProx = prox
closest = peer.Address
}
}
return closest, nil
}

// PingStreamMsg represents message sent over the PingStream channel
type PingStreamMsg struct {
Node swarm.Address
Expand Down
18 changes: 11 additions & 7 deletions pkg/check/gsoc/gsoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ethersphere/beekeeper/pkg/beekeeper"
"github.com/ethersphere/beekeeper/pkg/logging"
"github.com/ethersphere/beekeeper/pkg/orchestration"
"github.com/ethersphere/beekeeper/pkg/random"
"github.com/ethersphere/beekeeper/pkg/wslistener"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -65,18 +66,21 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int
return fmt.Errorf("invalid options type")
}

fullNodeNames := cluster.FullNodeNames()
clients, err := cluster.NodesClients(ctx)
rnd := random.PseudoGenerator(time.Now().UnixNano())
fullNodeClients, err := cluster.ShuffledFullNodeClients(ctx, rnd)
if err != nil {
return err
}

if len(fullNodeNames) < 2 {
if len(fullNodeClients) < 2 {
return fmt.Errorf("gsoc test require at least 2 full nodes")
}

uploadClient := clients[fullNodeNames[0]]
listenClient := clients[fullNodeNames[1]]
uploadClient := fullNodeClients[0]
listenClient, err := cluster.ClosestFullNodeClient(ctx, uploadClient)
if err != nil {
return err
}

batches := make([]string, 2)
for i := 0; i < 2; i++ {
Expand All @@ -92,14 +96,14 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int
c.logger.Infof("send messages with different postage batches sequentially...")
err = run(ctx, uploadClient, listenClient, batches, c.logger, false)
if err != nil {
return err
return fmt.Errorf("sequential: %w", err)
}
c.logger.Infof("done")

c.logger.Infof("send messages with different postage batches parallel...")
err = run(ctx, uploadClient, listenClient, batches, c.logger, true)
if err != nil {
return err
return fmt.Errorf("parallel: %w", err)
}
c.logger.Infof("done")

Expand Down
1 change: 1 addition & 0 deletions pkg/orchestration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Cluster interface {
ShuffledFullNodeClients(ctx context.Context, r *rand.Rand) ([]*bee.Client, error)
Size() (size int)
Topologies(ctx context.Context) (topologies ClusterTopologies, err error)
ClosestFullNodeClient(ctx context.Context, s *bee.Client) (*bee.Client, error)
}

// ClusterOptions represents Bee cluster options
Expand Down
34 changes: 34 additions & 0 deletions pkg/orchestration/k8s/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,37 @@ func (c *Cluster) FlattenTopologies(ctx context.Context) (topologies map[string]

return
}

// ClosestFullNodeClient returns the closest full node client to the supplied client.
func (c *Cluster) ClosestFullNodeClient(ctx context.Context, s *bee.Client) (*bee.Client, error) {
addrToNode := make(map[string]orchestration.Node)
for _, n := range c.Nodes() {
res, err := n.Client().Addresses(ctx)
if err != nil {
return nil, err
}
addrToNode[res.Overlay.String()] = n
}

var skipList []swarm.Address
const maxBin = 32
for b := 0; b < maxBin; b++ {
addr, err := s.ClosestPeer(ctx, uint8(b), skipList)
if err != nil {
return nil, err
}
node, ok := addrToNode[addr.String()]
if !ok {
continue
}
cfg := node.Config()
// closet peer is not a full node. Check other peers in the same bin
if !cfg.FullNode || cfg.BootnodeMode {
skipList = append(skipList, addr)
b--
continue
}
return node.Client(), nil
}
return nil, fmt.Errorf("cannot find closest fullnode")
}