Skip to content

Commit 5ad0417

Browse files
fix: make hive bee 2.6.0 backward compatible (#5329)
Co-authored-by: Ljubisa Gacevic <ljubisa.rs@gmail.com>
1 parent eccd4f1 commit 5ad0417

File tree

7 files changed

+147
-32
lines changed

7 files changed

+147
-32
lines changed

pkg/hive/hive.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ var (
5050
)
5151

5252
type Service struct {
53-
streamer p2p.Streamer
53+
streamer p2p.Bee260CompatibilityStreamer
5454
addressBook addressbook.GetPutter
5555
addPeersHandler func(...swarm.Address)
5656
networkID uint64
@@ -67,7 +67,7 @@ type Service struct {
6767
overlay swarm.Address
6868
}
6969

70-
func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uint64, bootnode bool, allowPrivateCIDRs bool, overlay swarm.Address, logger log.Logger) *Service {
70+
func New(streamer p2p.Bee260CompatibilityStreamer, addressbook addressbook.GetPutter, networkID uint64, bootnode bool, allowPrivateCIDRs bool, overlay swarm.Address, logger log.Logger) *Service {
7171
svc := &Service{
7272
streamer: streamer,
7373
logger: logger.WithName(loggerName).Register(),
@@ -196,6 +196,8 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa
196196
continue
197197
}
198198

199+
advertisableUnderlays = p2p.FilterBee260CompatibleUnderlays(s.streamer.IsBee260(peer), advertisableUnderlays)
200+
199201
peersRequest.Peers = append(peersRequest.Peers, &pb.BzzAddress{
200202
Overlay: addr.Overlay.Bytes(),
201203
Underlay: bzz.SerializeUnderlays(advertisableUnderlays),

pkg/p2p/libp2p/internal/handshake/handshake.go

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func (s *Service) Handshake(ctx context.Context, stream p2p.Stream, peerMultiadd
153153

154154
w, r := protobuf.NewWriterAndReader(stream)
155155

156-
peerMultiaddrs = filterBee260CompatibleUnderlays(o.bee260compatibility, peerMultiaddrs)
156+
peerMultiaddrs = p2p.FilterBee260CompatibleUnderlays(o.bee260compatibility, peerMultiaddrs)
157157

158158
if err := w.WriteMsgWithContext(ctx, &pb.Syn{
159159
ObservedUnderlay: bzz.SerializeUnderlays(peerMultiaddrs),
@@ -208,7 +208,7 @@ func (s *Service) Handshake(ctx context.Context, stream p2p.Stream, peerMultiadd
208208
return a.Equal(b)
209209
})
210210

211-
advertisableUnderlays = filterBee260CompatibleUnderlays(o.bee260compatibility, advertisableUnderlays)
211+
advertisableUnderlays = p2p.FilterBee260CompatibleUnderlays(o.bee260compatibility, advertisableUnderlays)
212212

213213
bzzAddress, err := bzz.NewAddress(s.signer, advertisableUnderlays, s.overlay, s.networkID, s.nonce)
214214
if err != nil {
@@ -306,7 +306,7 @@ func (s *Service) Handle(ctx context.Context, stream p2p.Stream, peerMultiaddrs
306306
return a.Equal(b)
307307
})
308308

309-
advertisableUnderlays = filterBee260CompatibleUnderlays(o.bee260compatibility, advertisableUnderlays)
309+
advertisableUnderlays = p2p.FilterBee260CompatibleUnderlays(o.bee260compatibility, advertisableUnderlays)
310310

311311
bzzAddress, err := bzz.NewAddress(s.signer, advertisableUnderlays, s.overlay, s.networkID, s.nonce)
312312
if err != nil {
@@ -315,7 +315,7 @@ func (s *Service) Handle(ctx context.Context, stream p2p.Stream, peerMultiaddrs
315315

316316
welcomeMessage := s.GetWelcomeMessage()
317317

318-
peerMultiaddrs = filterBee260CompatibleUnderlays(o.bee260compatibility, peerMultiaddrs)
318+
peerMultiaddrs = p2p.FilterBee260CompatibleUnderlays(o.bee260compatibility, peerMultiaddrs)
319319

320320
if err := w.WriteMsgWithContext(ctx, &pb.SynAck{
321321
Syn: &pb.Syn{
@@ -395,18 +395,3 @@ func (s *Service) parseCheckAck(ack *pb.Ack) (*bzz.Address, error) {
395395

396396
return bzzAddress, nil
397397
}
398-
399-
// filterBee260CompatibleUnderlays select a single underlay to pass if
400-
// bee260compatibility is true. Otherwise it passes the unmodified underlays
401-
// slice. This function can be safely removed when bee version 2.6.0 is
402-
// deprecated.
403-
func filterBee260CompatibleUnderlays(bee260compatibility bool, underlays []ma.Multiaddr) []ma.Multiaddr {
404-
if !bee260compatibility {
405-
return underlays
406-
}
407-
underlay := bzz.SelectBestAdvertisedAddress(underlays, nil)
408-
if underlay == nil {
409-
return underlays
410-
}
411-
return []ma.Multiaddr{underlay}
412-
}

pkg/p2p/libp2p/libp2p.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,9 +1475,23 @@ func (s *Service) peerMultiaddrs(ctx context.Context, peerID libp2ppeer.ID) ([]m
14751475
return buildFullMAs(waitPeerAddrs(waitPeersCtx, s.host.Peerstore(), peerID), peerID)
14761476
}
14771477

1478+
// IsBee260 implements p2p.Bee260CompatibilityStreamer interface.
1479+
// It checks if a peer is running Bee version older than 2.7.0.
1480+
func (s *Service) IsBee260(overlay swarm.Address) bool {
1481+
peerID, found := s.peers.peerID(overlay)
1482+
if !found {
1483+
return false
1484+
}
1485+
return s.bee260BackwardCompatibility(peerID)
1486+
}
1487+
14781488
var version270 = *semver.Must(semver.NewVersion("2.7.0"))
14791489

14801490
func (s *Service) bee260BackwardCompatibility(peerID libp2ppeer.ID) bool {
1491+
if compat, found := s.peers.bee260(peerID); found {
1492+
return compat
1493+
}
1494+
14811495
userAgent := s.peerUserAgent(s.ctx, peerID)
14821496
p := strings.SplitN(userAgent, " ", 2)
14831497
if len(p) != 2 {
@@ -1496,6 +1510,7 @@ func (s *Service) bee260BackwardCompatibility(peerID libp2ppeer.ID) bool {
14961510
return false
14971511
}
14981512
result := vCore.LessThan(version270)
1513+
s.peers.setBee260(peerID, result)
14991514
return result
15001515
}
15011516

pkg/p2p/libp2p/peer.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@ import (
1818
)
1919

2020
type peerRegistry struct {
21-
underlays map[string]libp2ppeer.ID // map overlay address to underlay peer id
22-
overlays map[libp2ppeer.ID]swarm.Address // map underlay peer id to overlay address
23-
full map[libp2ppeer.ID]bool // map to track whether a node is full or light node (true=full)
24-
connections map[libp2ppeer.ID]map[network.Conn]struct{} // list of connections for safe removal on Disconnect notification
25-
streams map[libp2ppeer.ID]map[network.Stream]context.CancelFunc
26-
mu sync.RWMutex
21+
underlays map[string]libp2ppeer.ID // map overlay address to underlay peer id
22+
overlays map[libp2ppeer.ID]swarm.Address // map underlay peer id to overlay address
23+
full map[libp2ppeer.ID]bool // map to track whether a node is full or light node (true=full)
24+
bee260Compatibility map[libp2ppeer.ID]bool // map to track bee260 backward compatibility
25+
connections map[libp2ppeer.ID]map[network.Conn]struct{} // list of connections for safe removal on Disconnect notification
26+
streams map[libp2ppeer.ID]map[network.Stream]context.CancelFunc
27+
mu sync.RWMutex
2728

2829
//nolint:misspell
2930
disconnecter disconnecter // peerRegistry notifies libp2p on peer disconnection
@@ -36,11 +37,12 @@ type disconnecter interface {
3637

3738
func newPeerRegistry() *peerRegistry {
3839
return &peerRegistry{
39-
underlays: make(map[string]libp2ppeer.ID),
40-
overlays: make(map[libp2ppeer.ID]swarm.Address),
41-
full: make(map[libp2ppeer.ID]bool),
42-
connections: make(map[libp2ppeer.ID]map[network.Conn]struct{}),
43-
streams: make(map[libp2ppeer.ID]map[network.Stream]context.CancelFunc),
40+
underlays: make(map[string]libp2ppeer.ID),
41+
overlays: make(map[libp2ppeer.ID]swarm.Address),
42+
full: make(map[libp2ppeer.ID]bool),
43+
bee260Compatibility: make(map[libp2ppeer.ID]bool),
44+
connections: make(map[libp2ppeer.ID]map[network.Conn]struct{}),
45+
streams: make(map[libp2ppeer.ID]map[network.Stream]context.CancelFunc),
4446

4547
Notifiee: new(network.NoopNotifiee),
4648
}
@@ -81,6 +83,7 @@ func (r *peerRegistry) Disconnected(_ network.Network, c network.Conn) {
8183
}
8284
delete(r.streams, peerID)
8385
delete(r.full, peerID)
86+
delete(r.bee260Compatibility, peerID)
8487
r.mu.Unlock()
8588
r.disconnecter.disconnected(overlay)
8689

@@ -176,6 +179,19 @@ func (r *peerRegistry) fullnode(peerID libp2ppeer.ID) (bool, bool) {
176179
return full, found
177180
}
178181

182+
func (r *peerRegistry) bee260(peerID libp2ppeer.ID) (compat, found bool) {
183+
r.mu.RLock()
184+
defer r.mu.RUnlock()
185+
compat, found = r.bee260Compatibility[peerID]
186+
return compat, found
187+
}
188+
189+
func (r *peerRegistry) setBee260(peerID libp2ppeer.ID, compat bool) {
190+
r.mu.Lock()
191+
defer r.mu.Unlock()
192+
r.bee260Compatibility[peerID] = compat
193+
}
194+
179195
func (r *peerRegistry) isConnected(peerID libp2ppeer.ID, remoteAddr ma.Multiaddr) (swarm.Address, bool) {
180196
if remoteAddr == nil {
181197
return swarm.ZeroAddress, false
@@ -217,6 +233,7 @@ func (r *peerRegistry) remove(overlay swarm.Address) (found, full bool, peerID l
217233
delete(r.streams, peerID)
218234
full = r.full[peerID]
219235
delete(r.full, peerID)
236+
delete(r.bee260Compatibility, peerID)
220237
r.mu.Unlock()
221238

222239
return found, full, peerID

pkg/p2p/libp2p/version_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,3 +163,72 @@ func TestBee260BackwardCompatibility(t *testing.T) {
163163
})
164164
}
165165
}
166+
167+
func TestBee260Cache(t *testing.T) {
168+
t.Parallel()
169+
170+
ctx, cancel := context.WithCancel(context.Background())
171+
defer cancel()
172+
173+
swarmKey, err := crypto.GenerateSecp256k1Key()
174+
if err != nil {
175+
t.Fatal(err)
176+
}
177+
178+
overlay := swarm.RandAddress(t)
179+
addr := ":0"
180+
networkID := uint64(1)
181+
182+
statestore := mock.NewStateStore()
183+
defer statestore.Close()
184+
185+
s, err := New(ctx, crypto.NewDefaultSigner(swarmKey), networkID, overlay, addr, nil, statestore, nil, log.Noop, nil, Options{})
186+
if err != nil {
187+
t.Fatal(err)
188+
}
189+
defer s.Close()
190+
191+
libp2pPeerID, err := libp2ppeer.Decode("16Uiu2HAm3g4hXfCWTDhPBq3KkqpV3wGkPVgMJY3Jt8gGTYWiTWNZ")
192+
if err != nil {
193+
t.Fatal(err)
194+
}
195+
196+
// 1. Set user agent to 2.6.0 (compat = true)
197+
if err := s.host.Peerstore().Put(libp2pPeerID, "AgentVersion", "bee/2.6.0 go1.22.0 linux/amd64"); err != nil {
198+
t.Fatal(err)
199+
}
200+
201+
// 2. First call should calculate and cache it
202+
if !s.bee260BackwardCompatibility(libp2pPeerID) {
203+
t.Fatal("expected true for 2.6.0")
204+
}
205+
206+
// 3. Verify it's in the cache
207+
compat, found := s.peers.bee260(libp2pPeerID)
208+
if !found {
209+
t.Fatal("expected value to be in cache")
210+
}
211+
if !compat {
212+
t.Fatal("expected cached value to be true")
213+
}
214+
215+
// 4. Change user agent in peerstore to 2.7.0 (compat = false)
216+
// If caching works, bee260BackwardCompatibility should still return true
217+
if err := s.host.Peerstore().Put(libp2pPeerID, "AgentVersion", "bee/2.7.0 go1.23.0 linux/amd64"); err != nil {
218+
t.Fatal(err)
219+
}
220+
221+
if !s.bee260BackwardCompatibility(libp2pPeerID) {
222+
t.Fatal("expected true (cached value) even if peerstore changed")
223+
}
224+
225+
// 5. Clear cache (manually for testing)
226+
s.peers.mu.Lock()
227+
delete(s.peers.bee260Compatibility, libp2pPeerID)
228+
s.peers.mu.Unlock()
229+
230+
// 6. Now it should re-calculate and return false for 2.7.0
231+
if s.bee260BackwardCompatibility(libp2pPeerID) {
232+
t.Fatal("expected false for 2.7.0 after cache clear")
233+
}
234+
}

pkg/p2p/p2p.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,12 @@ type Streamer interface {
141141
NewStream(ctx context.Context, address swarm.Address, h Headers, protocol, version, stream string) (Stream, error)
142142
}
143143

144+
// Bee260CompatibilityStreamer is able to create a new Stream and check if a peer is running Bee 2.6.0.
145+
type Bee260CompatibilityStreamer interface {
146+
NewStream(ctx context.Context, address swarm.Address, h Headers, protocol, version, stream string) (Stream, error)
147+
IsBee260(address swarm.Address) bool
148+
}
149+
144150
type StreamerDisconnecter interface {
145151
Streamer
146152
Disconnecter
@@ -237,3 +243,18 @@ func (e *ChunkDeliveryError) Error() string {
237243
func NewChunkDeliveryError(msg string) error {
238244
return &ChunkDeliveryError{msg: msg}
239245
}
246+
247+
// FilterBee260CompatibleUnderlays select a single underlay to pass if
248+
// bee260compatibility is true. Otherwise it passes the unmodified underlays
249+
// slice. This function can be safely removed when bee version 2.6.0 is
250+
// deprecated.
251+
func FilterBee260CompatibleUnderlays(bee260compatibility bool, underlays []ma.Multiaddr) []ma.Multiaddr {
252+
if !bee260compatibility {
253+
return underlays
254+
}
255+
underlay := bzz.SelectBestAdvertisedAddress(underlays, nil)
256+
if underlay == nil {
257+
return underlays
258+
}
259+
return []ma.Multiaddr{underlay}
260+
}

pkg/p2p/streamtest/streamtest.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,12 @@ func (r *Recorder) WaitRecords(t *testing.T, addr swarm.Address, proto, version,
223223
return recs
224224
}
225225

226+
// IsBee260 implements p2p.Bee260CompatibilityStreamer interface.
227+
// It always returns false.
228+
func (r *Recorder) IsBee260(overlay swarm.Address) bool {
229+
return false
230+
}
231+
226232
type Record struct {
227233
in *record
228234
out *record

0 commit comments

Comments
 (0)