Skip to content

Commit 6618097

Browse files
janoszelig
authored andcommitted
swarm: snapshot load improvement (#18220)
* swarm/network: Hive - do not notify peer if discovery is disabled * p2p/simulations: validate all connections on loading a snapshot * p2p/simulations: track all connections in on snapshot loading * p2p/simulations: add snapshotLoadTimeout variable * p2p/simulations: ignore control events in snapshot load * p2p/simulations: simplify event loop synchronization * p2p/simulations: return already connected error from Load function * p2p/simulations: log warning on snapshot loading disconnection
1 parent de39513 commit 6618097

File tree

3 files changed

+76
-2
lines changed

3 files changed

+76
-2
lines changed

p2p/simulations/network.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"bytes"
2121
"context"
2222
"encoding/json"
23+
"errors"
2324
"fmt"
2425
"sync"
2526
"time"
@@ -705,8 +706,11 @@ func (net *Network) snapshot(addServices []string, removeServices []string) (*Sn
705706
return snap, nil
706707
}
707708

709+
var snapshotLoadTimeout = 120 * time.Second
710+
708711
// Load loads a network snapshot
709712
func (net *Network) Load(snap *Snapshot) error {
713+
// Start nodes.
710714
for _, n := range snap.Nodes {
711715
if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil {
712716
return err
@@ -718,6 +722,69 @@ func (net *Network) Load(snap *Snapshot) error {
718722
return err
719723
}
720724
}
725+
726+
// Prepare connection events counter.
727+
allConnected := make(chan struct{}) // closed when all connections are established
728+
done := make(chan struct{}) // ensures that the event loop goroutine is terminated
729+
defer close(done)
730+
731+
// Subscribe to event channel.
732+
// It needs to be done outside of the event loop goroutine (created below)
733+
// to ensure that the event channel is blocking before connect calls are made.
734+
events := make(chan *Event)
735+
sub := net.Events().Subscribe(events)
736+
defer sub.Unsubscribe()
737+
738+
go func() {
739+
// Expected number of connections.
740+
total := len(snap.Conns)
741+
// Set of all established connections from the snapshot, not other connections.
742+
// Key array element 0 is the connection One field value, and element 1 connection Other field.
743+
connections := make(map[[2]enode.ID]struct{}, total)
744+
745+
for {
746+
select {
747+
case e := <-events:
748+
// Ignore control events as they do not represent
749+
// connect or disconnect (Up) state change.
750+
if e.Control {
751+
continue
752+
}
753+
// Detect only connection events.
754+
if e.Type != EventTypeConn {
755+
continue
756+
}
757+
connection := [2]enode.ID{e.Conn.One, e.Conn.Other}
758+
// Nodes are still not connected or have been disconnected.
759+
if !e.Conn.Up {
760+
// Delete the connection from the set of established connections.
761+
// This will prevent false positive in case disconnections happen.
762+
delete(connections, connection)
763+
log.Warn("load snapshot: unexpected disconnection", "one", e.Conn.One, "other", e.Conn.Other)
764+
continue
765+
}
766+
// Check that the connection is from the snapshot.
767+
for _, conn := range snap.Conns {
768+
if conn.One == e.Conn.One && conn.Other == e.Conn.Other {
769+
// Add the connection to the set of established connections.
770+
connections[connection] = struct{}{}
771+
if len(connections) == total {
772+
// Signal that all nodes are connected.
773+
close(allConnected)
774+
return
775+
}
776+
777+
break
778+
}
779+
}
780+
case <-done:
781+
// Load function returned, terminate this goroutine.
782+
return
783+
}
784+
}
785+
}()
786+
787+
// Start connecting.
721788
for _, conn := range snap.Conns {
722789

723790
if !net.GetNode(conn.One).Up || !net.GetNode(conn.Other).Up {
@@ -729,6 +796,14 @@ func (net *Network) Load(snap *Snapshot) error {
729796
return err
730797
}
731798
}
799+
800+
select {
801+
// Wait until all connections from the snapshot are established.
802+
case <-allConnected:
803+
// Make sure that we do not wait forever.
804+
case <-time.After(snapshotLoadTimeout):
805+
return errors.New("snapshot connections not established")
806+
}
732807
return nil
733808
}
734809

swarm/network/hive.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ func (h *Hive) Run(p *BzzPeer) error {
165165
// otherwise just send depth to new peer
166166
dp.NotifyDepth(depth)
167167
}
168+
NotifyPeer(p.BzzAddr, h.Kademlia)
168169
}
169-
NotifyPeer(p.BzzAddr, h.Kademlia)
170170
defer h.Off(dp)
171171
return dp.Run(dp.HandleMsg)
172172
}

swarm/network/simulation/kademlia_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ func TestWaitTillHealthy(t *testing.T) {
3333
"bzz": func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) {
3434
addr := network.NewAddr(ctx.Config.Node())
3535
hp := network.NewHiveParams()
36-
hp.Discovery = false
3736
config := &network.BzzConfig{
3837
OverlayAddr: addr.Over(),
3938
UnderlayAddr: addr.Under(),

0 commit comments

Comments
 (0)