Skip to content

Commit 81c573a

Browse files
authored
fix: gsoc via pushsync (#465)
* fix: force gsoc via pushsync * chore: find actual closet client * fix: cond * fix: remove uncessary overlay call * fix: client * fix: remove bin check
1 parent a2803bf commit 81c573a

File tree

4 files changed

+83
-7
lines changed

4 files changed

+83
-7
lines changed

pkg/bee/client.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ import (
77
"errors"
88
"fmt"
99
"io"
10+
"math"
1011
"math/big"
1112
"net/http"
1213
"net/url"
14+
"slices"
1315
"sync"
1416
"time"
1517

@@ -339,6 +341,41 @@ func (c *Client) Ping(ctx context.Context, node swarm.Address) (rtt string, err
339341
return r.RTT, nil
340342
}
341343

344+
// ClosestPeer returns the address of the peer closest to the node in the provided bin.
345+
func (c *Client) ClosestPeer(ctx context.Context, binId uint8, skipList []swarm.Address) (swarm.Address, error) {
346+
t, err := c.Topology(ctx)
347+
if err != nil {
348+
return swarm.ZeroAddress, err
349+
}
350+
351+
bin := t.Bins[fmt.Sprintf("bin_%d", binId)]
352+
if len(bin.ConnectedPeers) == 0 {
353+
return swarm.ZeroAddress, nil
354+
}
355+
356+
overlay, err := c.Overlay(ctx)
357+
if err != nil {
358+
return swarm.ZeroAddress, err
359+
}
360+
361+
closest := swarm.ZeroAddress
362+
minProx := uint8(math.MaxUint8)
363+
for _, peer := range bin.ConnectedPeers {
364+
skip := slices.ContainsFunc(skipList, func(addr swarm.Address) bool {
365+
return addr.Equal(peer.Address)
366+
})
367+
if skip {
368+
continue
369+
}
370+
prox := swarm.Proximity(overlay.Bytes(), peer.Address.Bytes())
371+
if prox < minProx {
372+
minProx = prox
373+
closest = peer.Address
374+
}
375+
}
376+
return closest, nil
377+
}
378+
342379
// PingStreamMsg represents message sent over the PingStream channel
343380
type PingStreamMsg struct {
344381
Node swarm.Address

pkg/check/gsoc/gsoc.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/ethersphere/beekeeper/pkg/beekeeper"
1919
"github.com/ethersphere/beekeeper/pkg/logging"
2020
"github.com/ethersphere/beekeeper/pkg/orchestration"
21+
"github.com/ethersphere/beekeeper/pkg/random"
2122
"github.com/ethersphere/beekeeper/pkg/wslistener"
2223
"golang.org/x/sync/errgroup"
2324
)
@@ -65,18 +66,21 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int
6566
return fmt.Errorf("invalid options type")
6667
}
6768

68-
fullNodeNames := cluster.FullNodeNames()
69-
clients, err := cluster.NodesClients(ctx)
69+
rnd := random.PseudoGenerator(time.Now().UnixNano())
70+
fullNodeClients, err := cluster.ShuffledFullNodeClients(ctx, rnd)
7071
if err != nil {
7172
return err
7273
}
7374

74-
if len(fullNodeNames) < 2 {
75+
if len(fullNodeClients) < 2 {
7576
return fmt.Errorf("gsoc test require at least 2 full nodes")
7677
}
7778

78-
uploadClient := clients[fullNodeNames[0]]
79-
listenClient := clients[fullNodeNames[1]]
79+
uploadClient := fullNodeClients[0]
80+
listenClient, err := cluster.ClosestFullNodeClient(ctx, uploadClient)
81+
if err != nil {
82+
return err
83+
}
8084

8185
batches := make([]string, 2)
8286
for i := 0; i < 2; i++ {
@@ -92,14 +96,14 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int
9296
c.logger.Infof("send messages with different postage batches sequentially...")
9397
err = run(ctx, uploadClient, listenClient, batches, c.logger, false)
9498
if err != nil {
95-
return err
99+
return fmt.Errorf("sequential: %w", err)
96100
}
97101
c.logger.Infof("done")
98102

99103
c.logger.Infof("send messages with different postage batches parallel...")
100104
err = run(ctx, uploadClient, listenClient, batches, c.logger, true)
101105
if err != nil {
102-
return err
106+
return fmt.Errorf("parallel: %w", err)
103107
}
104108
c.logger.Infof("done")
105109

pkg/orchestration/cluster.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type Cluster interface {
3636
ShuffledFullNodeClients(ctx context.Context, r *rand.Rand) ([]*bee.Client, error)
3737
Size() (size int)
3838
Topologies(ctx context.Context) (topologies ClusterTopologies, err error)
39+
ClosestFullNodeClient(ctx context.Context, s *bee.Client) (*bee.Client, error)
3940
}
4041

4142
// ClusterOptions represents Bee cluster options

pkg/orchestration/k8s/cluster.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,3 +443,37 @@ func (c *Cluster) FlattenTopologies(ctx context.Context) (topologies map[string]
443443

444444
return
445445
}
446+
447+
// ClosestFullNodeClient returns the closest full node client to the supplied client.
448+
func (c *Cluster) ClosestFullNodeClient(ctx context.Context, s *bee.Client) (*bee.Client, error) {
449+
addrToNode := make(map[string]orchestration.Node)
450+
for _, n := range c.Nodes() {
451+
res, err := n.Client().Addresses(ctx)
452+
if err != nil {
453+
return nil, err
454+
}
455+
addrToNode[res.Overlay.String()] = n
456+
}
457+
458+
var skipList []swarm.Address
459+
const maxBin = 32
460+
for b := 0; b < maxBin; b++ {
461+
addr, err := s.ClosestPeer(ctx, uint8(b), skipList)
462+
if err != nil {
463+
return nil, err
464+
}
465+
node, ok := addrToNode[addr.String()]
466+
if !ok {
467+
continue
468+
}
469+
cfg := node.Config()
470+
// closet peer is not a full node. Check other peers in the same bin
471+
if !cfg.FullNode || cfg.BootnodeMode {
472+
skipList = append(skipList, addr)
473+
b--
474+
continue
475+
}
476+
return node.Client(), nil
477+
}
478+
return nil, fmt.Errorf("cannot find closest fullnode")
479+
}

0 commit comments

Comments
 (0)