diff --git a/dht.go b/dht.go index 2ee2894eb..1c4006f2c 100644 --- a/dht.go +++ b/dht.go @@ -8,13 +8,14 @@ import ( "sync" "time" + periodicproc "github.com/jbenet/goprocess/periodic" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/routing" - + "github.com/libp2p/go-libp2p-kad-dht/persist" "go.opencensus.io/tag" "golang.org/x/xerrors" @@ -37,6 +38,10 @@ import ( var logger = logging.Logger("dht") +// NumBootstrapQueries defines the number of random dht queries to do to +// collect members of the routing table. +const NumBootstrapQueries = 5 + const BaseConnMgrScore = 5 // IpfsDHT is an implementation of Kademlia with S/Kademlia modifications. @@ -72,8 +77,15 @@ type IpfsDHT struct { autoRefresh bool rtRefreshQueryTimeout time.Duration rtRefreshPeriod time.Duration + bootstrapCfg opts.BootstrapConfig triggerRtRefresh chan chan<- error + seedsProposer persist.SeedsProposer + seederRTSizeTarget int + seederDialTimeout time.Duration + seederConcurrentDials int + totalSeederTimeout time.Duration + maxRecordAge time.Duration // Allows disabling dht subsystems. These should _only_ be set on @@ -94,22 +106,30 @@ var ( // New creates a new DHT with the specified host and options. func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) { - var cfg opts.Options + cfg := &opts.Options{} cfg.BucketSize = KValue if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil { return nil, err } - dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize) - dht.autoRefresh = cfg.RoutingTable.AutoRefresh - dht.rtRefreshPeriod = cfg.RoutingTable.RefreshPeriod - dht.rtRefreshQueryTimeout = cfg.RoutingTable.RefreshQueryTimeout - dht.maxRecordAge = cfg.MaxRecordAge - dht.enableProviders = cfg.EnableProviders - dht.enableValues = cfg.EnableValues + // set snapshotter & fallback peers if not set + snapshotter := cfg.Persistence.Snapshotter + if snapshotter == nil { + s, err := persist.NewDatastoreSnapshotter(cfg.Datastore, persist.DefaultSnapshotNS) + // should never happen + if err != nil { + err = fmt.Errorf("failed to initialize the default datastore backed snapshotter: %w", err) + logger.Error(err) + return nil, err + } + snapshotter = s + } - // register for network notifs. - dht.host.Network().Notify((*netNotifiee)(dht)) + if len(cfg.Persistence.FallbackPeers) == 0 { + cfg.Persistence.FallbackPeers = getDefaultBootstrapPeerIDs() + } + + dht := makeDHT(ctx, h, cfg) dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error { // remove ourselves from network notifs. @@ -117,8 +137,28 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er return nil }) + // fetch the last snapshot & try to seed RT + candidates, err := snapshotter.Load() + if err != nil { + logger.Warningf("error while loading snapshot of DHT routing table: %s, cannot seed dht", err) + } else if err := dht.seedRoutingTable(candidates, cfg.Persistence.FallbackPeers); err != nil { + logger.Warningf("error while seeding candidates to the routing table: %s", err) + } + + // schedule periodic snapshots + sproc := periodicproc.Tick(cfg.Persistence.SnapshotInterval, func(proc goprocess.Process) { + logger.Debugf("storing snapshot of DHT routing table") + err := snapshotter.Store(dht.routingTable) + if err != nil { + logger.Warningf("error while storing snapshot of DHT routing table snapshot: %s", err) + } + }) + dht.proc.AddChild(sproc) + + // register for network notifs. + dht.host.Network().Notify((*netNotifiee)(dht)) + dht.proc.AddChild(dht.providers.Process()) - dht.Validator = cfg.Validator if !cfg.Client { for _, p := range cfg.Protocols { @@ -152,9 +192,9 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT return dht } -func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID, bucketSize int) *IpfsDHT { +func makeDHT(ctx context.Context, h host.Host, cfg *opts.Options) *IpfsDHT { self := kb.ConvertPeerID(h.ID()) - rt := kb.NewRoutingTable(bucketSize, self, time.Minute, h.Peerstore()) + rt := kb.NewRoutingTable(cfg.BucketSize, self, time.Minute, h.Peerstore()) cmgr := h.ConnManager() rt.PeerAdded = func(p peer.ID) { @@ -167,22 +207,38 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p } dht := &IpfsDHT{ - datastore: dstore, + datastore: cfg.Datastore, self: h.ID(), peerstore: h.Peerstore(), host: h, strmap: make(map[peer.ID]*messageSender), ctx: ctx, - providers: providers.NewProviderManager(ctx, h.ID(), dstore), + providers: providers.NewProviderManager(ctx, h.ID(), cfg.Datastore), birth: time.Now(), routingTable: rt, - protocols: protocols, - bucketSize: bucketSize, + protocols: cfg.Protocols, + bucketSize: cfg.BucketSize, triggerRtRefresh: make(chan chan<- error), } dht.ctx = dht.newContextWithLocalTags(ctx) + dht.Validator = cfg.Validator + dht.autoRefresh = cfg.RoutingTable.AutoRefresh + dht.rtRefreshPeriod = cfg.RoutingTable.RefreshPeriod + dht.rtRefreshQueryTimeout = cfg.RoutingTable.RefreshQueryTimeout + + dht.maxRecordAge = cfg.MaxRecordAge + dht.enableProviders = cfg.EnableProviders + dht.enableValues = cfg.EnableValues + dht.bootstrapCfg = cfg.BootstrapConfig + + dht.seedsProposer = cfg.Persistence.SeedsProposer + dht.seederDialTimeout = cfg.Persistence.SeederDialTimeout + dht.seederConcurrentDials = cfg.Persistence.SeederConcurrentDials + dht.seederRTSizeTarget = cfg.Persistence.SeederRTSizeTarget + dht.totalSeederTimeout = cfg.Persistence.TotalSeederTimeout + return dht } @@ -193,7 +249,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p writeResp := func(errorChan chan error, err error) { select { case <-proc.Closing(): - case errorChan <- errChan: + case errorChan <- err: } close(errorChan) } @@ -203,7 +259,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p case req := <-dht.rtRecoveryChan: if dht.routingTable.Size() == 0 { logger.Infof("rt recovery proc: received request with reqID=%s, RT is empty. initiating recovery", req.id) - // TODO Call Seeder with default bootstrap peers here once #383 is merged + // TODO Call SeedsProposer with default bootstrap peers here once #383 is merged if dht.routingTable.Size() > 0 { logger.Infof("rt recovery proc: successfully recovered RT for reqID=%s, RT size is now %d", req.id, dht.routingTable.Size()) go writeResp(req.errorChan, nil) @@ -223,7 +279,6 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p // putValueToPeer stores the given key/value pair at the peer 'p' func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error { - pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0) pmes.Record = rec rpmes, err := dht.sendRequest(ctx, p, pmes) diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 7078765e0..91d1416d1 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -8,6 +8,7 @@ import ( multierror "github.com/hashicorp/go-multierror" process "github.com/jbenet/goprocess" processctx "github.com/jbenet/goprocess/context" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/routing" "github.com/multiformats/go-multiaddr" _ "github.com/multiformats/go-multiaddr-dns" @@ -43,6 +44,19 @@ func init() { } } +func getDefaultBootstrapPeerIDs() []peer.ID { + var defaultBootstrapPeerIDs []peer.ID + for i := range DefaultBootstrapPeers { + info, err := peer.AddrInfoFromP2pAddr(DefaultBootstrapPeers[i]) + if err != nil { + logger.Errorf("failed to get peerID for peer with multiaddress %s: error is %s", DefaultBootstrapPeers[i].String(), err) + continue + } + defaultBootstrapPeerIDs = append(defaultBootstrapPeerIDs, info.ID) + } + return defaultBootstrapPeerIDs +} + // Start the refresh worker. func (dht *IpfsDHT) startRefreshing() error { // scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period diff --git a/dht_rt_seeder.go b/dht_rt_seeder.go new file mode 100644 index 000000000..ac805b596 --- /dev/null +++ b/dht_rt_seeder.go @@ -0,0 +1,110 @@ +package dht + +import ( + "context" + "sync" + + "github.com/libp2p/go-libp2p-core/peer" +) + +func (dht *IpfsDHT) seedRoutingTable(candidates, fallbacks []peer.ID) error { + seederCtx, cancel := context.WithTimeout(dht.ctx, dht.totalSeederTimeout) + defer cancel() + + // filter out peers that are either NOT in the peer store OR already in the RT + findEligible := func(peers []peer.ID) []peer.ID { + var eligiblePeers []peer.ID + for _, p := range peers { + + if dht.routingTable.Find(p) == p { + logger.Info("discarding candidate as it is already in the RT: %s", p) + continue + } + + if addrs := dht.host.Peerstore().Addrs(p); len(addrs) == 0 { + logger.Infof("discarding candidate as we no longer have addresses: %s", p) + continue + } + + eligiblePeers = append(eligiblePeers, p) + } + return eligiblePeers + } + + // result of a dial attempt + type result struct { + p peer.ID + err error + } + + // rate-limit dials + semaphore := make(chan struct{}, dht.seederConcurrentDials) + + // attempts to dial to a given peer to verify it's available + dialFn := func(ctx context.Context, p peer.ID, res chan<- result) { + childCtx, cancel := context.WithTimeout(ctx, dht.seederDialTimeout) + defer cancel() + _, err := dht.host.Network().DialPeer(childCtx, p) + select { + case <-ctx.Done(): // caller has already hung up & gone away + case res <- result{p, err}: + } + } + + // ask the proposer to start proposing peers & write them on the peer channel + peersChan := dht.seedsProposer.Propose(seederCtx, dht.routingTable, findEligible(candidates), findEligible(fallbacks)) + + resCh := make(chan result) // dial results. + + // start dialing to the peers received on the result channel + go func() { + defer close(resCh) + + var wg sync.WaitGroup + for p := range peersChan { + select { + case <-seederCtx.Done(): + return + default: + // start dialing + semaphore <- struct{}{} + wg.Add(1) + go func(p peer.ID, res chan<- result) { + dialFn(seederCtx, p, res) + <-semaphore + wg.Done() + }(p, resCh) + } + } + wg.Wait() + + }() + +LOOP: + for { + select { + case res, hasMore := <-resCh: + if !hasMore { + logger.Infof("dht rt seeder: finished seeding RT with proposed peer set; RT size is now %d ", + dht.routingTable.Size()) + break LOOP + } + if res.err != nil { + logger.Infof("dht rt seeder: discarded proposed peer due to dial error; peer ID: %s, err: %s", res.p, res.err) + } else { + if _, err := dht.routingTable.Update(res.p); err != nil { + logger.Warningf("dht rt seeder: failed to add proposed peer to routing table; peer ID: %s, err: %s", res.p, err) + } + if dht.routingTable.Size() >= dht.seederRTSizeTarget { + break LOOP + } + } + + case <-seederCtx.Done(): + logger.Info("dht rt seeder: finishing as we have exceeded the seeder timeout") + break LOOP + } + } + + return nil +} diff --git a/dht_rt_seeder_test.go b/dht_rt_seeder_test.go new file mode 100644 index 000000000..a95f57fbb --- /dev/null +++ b/dht_rt_seeder_test.go @@ -0,0 +1,125 @@ +package dht + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/test" + opts "github.com/libp2p/go-libp2p-kad-dht/opts" + "github.com/libp2p/go-libp2p-kad-dht/persist" + kb "github.com/libp2p/go-libp2p-kbucket" + swarmt "github.com/libp2p/go-libp2p-swarm/testing" + bhost "github.com/libp2p/go-libp2p/p2p/host/basic" + "github.com/stretchr/testify/require" +) + +// mockSeedsProposer returns all the peers passed to it +type mockSeedsProposer struct { +} + +func (m *mockSeedsProposer) Propose(ctx context.Context, rt *kb.RoutingTable, candidates []peer.ID, fallback []peer.ID) chan peer.ID { + pChan := make(chan peer.ID) + + go func() { + defer close(pChan) + + for _, p := range append(candidates, fallback...) { + pChan <- p + } + }() + + return pChan + +} + +var _ persist.SeedsProposer = (*mockSeedsProposer)(nil) + +func TestRTSeeder(t *testing.T) { + testCases := map[string]struct { + nTotalCandidates int // snapshotted candidate list + + nCandidatesNotInPeerStore int // candidates which do not exist in the peerstore + + nCandidatesAlreadyInRT int // candidates which already exist in the RT + + nFallbacks int // fallback list + + nCandidatesNotAlive int // candidates that are "not-diallable"/not alive + + seederTarget int // RT size target of the seeder + + expectedNumPeersInRoutingTable int // number of peers we expect in the routing table after seeding is complete + }{ + "Only Candidates": {10, 1, 1, 0, 1, 9, 8}, + "Candidates + Fallbacks": {10, 2, 2, 7, 4, 20, 11}, + "Only Fallbacks": {10, 2, 1, 9, 7, 11, 10}, + "Empty Candidates": {0, 0, 0, 5, 0, 5, 5}, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + for name, testcase := range testCases { + // create host for self + self := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) + + // create candidate hosts & add them to the peer store + candidateHosts := make([]bhost.BasicHost, testcase.nTotalCandidates) + candidatePids := make([]peer.ID, testcase.nTotalCandidates) + + for i := 0; i < testcase.nTotalCandidates; i++ { + h := *bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) + candidateHosts[i] = h + candidatePids[i] = h.ID() + self.Peerstore().AddAddrs(candidatePids[i], h.Addrs(), 5*time.Minute) + } + + // remove candidates from peerstore + for i := 0; i < testcase.nCandidatesNotInPeerStore; i++ { + self.Peerstore().ClearAddrs(candidatePids[i]) + } + + // disconnect the number of peers required + for i := testcase.nCandidatesNotInPeerStore; i < testcase.nCandidatesNotAlive+testcase.nCandidatesNotInPeerStore; i++ { + require.NoError(t, candidateHosts[i].Close()) + } + + // create fallback hosts & add them to the peerstore + fallbackHosts := make([]bhost.BasicHost, testcase.nFallbacks) + fallbackPids := make([]peer.ID, testcase.nFallbacks) + for i := 0; i < testcase.nFallbacks; i++ { + h := *bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) + fallbackHosts[i] = h + fallbackPids[i] = h.ID() + self.Peerstore().AddAddrs(fallbackPids[i], h.Addrs(), 5*time.Minute) + } + + // crete dht instance with the mock seedsproposer + // make sure the default fallback peers are not in the peerstore so rt remains empty + newDht, err := New(ctx, self, opts.SeedsProposer(&mockSeedsProposer{}), opts.FallbackPeers([]peer.ID{test.RandPeerIDFatal(t)}), + opts.SeederRTSizeTarget(testcase.seederTarget)) + require.NoError(t, err) + defer newDht.Close() + require.True(t, newDht.routingTable.Size() == 0) + + // add candidates to RT + for i := testcase.nCandidatesNotInPeerStore + testcase.nCandidatesNotAlive; i < testcase.nCandidatesNotInPeerStore+testcase.nCandidatesAlreadyInRT+testcase.nCandidatesNotAlive; i++ { + _, err := newDht.routingTable.Update(candidatePids[i]) + require.NoError(t, err) + } + + // assert routing table has been seeded with the expected number of peers + require.NoError(t, newDht.seedRoutingTable(candidatePids, fallbackPids)) + require.Equalf(t, testcase.expectedNumPeersInRoutingTable, newDht.routingTable.Size(), "for test case %s, rt should have %d peers, but has %d", + name, testcase.expectedNumPeersInRoutingTable, newDht.routingTable.Size()) + + // cleanup + require.NoError(t, self.Close()) + allHosts := append(fallbackHosts, candidateHosts...) + for _, h := range allHosts { + require.NoError(t, h.Close()) + } + } +} diff --git a/dht_test.go b/dht_test.go index 8458e948c..e94e06121 100644 --- a/dht_test.go +++ b/dht_test.go @@ -12,9 +12,11 @@ import ( "testing" "time" + "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" + "github.com/libp2p/go-libp2p-kad-dht/persist" "github.com/multiformats/go-multistream" "golang.org/x/xerrors" @@ -26,6 +28,7 @@ import ( pb "github.com/libp2p/go-libp2p-kad-dht/pb" "github.com/ipfs/go-cid" + dssync "github.com/ipfs/go-datastore/sync" u "github.com/ipfs/go-ipfs-util" kb "github.com/libp2p/go-libp2p-kbucket" "github.com/libp2p/go-libp2p-record" @@ -398,6 +401,67 @@ func TestSearchValue(t *testing.T) { } } +func TestSnapshottingAndSeeding(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sdstore := dssync.MutexWrap(datastore.NewMapDatastore()) + snapShotter, err := persist.NewDatastoreSnapshotter(sdstore, "rand") + assert.NoError(t, err) + snapShotterOpt := opts.Snapshotter(snapShotter, 500*time.Millisecond) + + // start 4 other dht's we can connect to & add to our RT + dhts := setupDHTS(t, ctx, 4) + defer func() { + for i := 0; i < 4; i++ { + dhts[i].Close() + defer dhts[i].host.Close() + } + }() + + // connect them with each other + connect(t, ctx, dhts[0], dhts[1]) + connect(t, ctx, dhts[1], dhts[2]) + connect(t, ctx, dhts[1], dhts[3]) + + // create dht with snapshotter & connect it to one of the above dht's + selfDht := setupDHT(ctx, t, false, snapShotterOpt) + assert.True(t, selfDht.routingTable.Size() == 0) + defer selfDht.Close() + connect(t, ctx, selfDht, dhts[0]) + + // bootstrap till we are connected to all the other dht's + for selfDht.routingTable.Size() != 4 { + require.NoError(t, <-selfDht.RefreshRoutingTable()) + } + // sanity check + assert.True(t, selfDht.routingTable.Size() == 4) + + // assert snapshot & close dht + time.Sleep(600 * time.Millisecond) // wait for one snapshot + h := selfDht.host + peerIDs, err := snapShotter.Load() + assert.NoError(t, err) + assert.Len(t, peerIDs, 4) + for _, d := range dhts { + assert.Contains(t, peerIDs, d.self) + } + assert.NoError(t, selfDht.Close()) + + // now start a new dht with a seedsProposer & ensure it's routing table gets populated + sp := persist.NewRandomSeedsProposer() + newDht, err := New(ctx, h, snapShotterOpt, opts.SeedsProposer(sp), opts.SeederRTSizeTarget(3)) + assert.NoError(t, err) + defer newDht.Close() + assert.True(t, newDht.routingTable.Size() == 3) + + // once more with a different target + sp = persist.NewRandomSeedsProposer() + newDht, err = New(ctx, h, snapShotterOpt, opts.SeedsProposer(sp), opts.SeederRTSizeTarget(4)) + assert.NoError(t, err) + defer newDht.Close() + assert.True(t, newDht.routingTable.Size() == 4) +} + func TestGetValues(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/go.mod b/go.mod index a2adca197..6c67ae3cd 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/multiformats/go-multiaddr v0.2.0 github.com/multiformats/go-multiaddr-dns v0.2.0 github.com/multiformats/go-multistream v0.1.0 + github.com/pkg/errors v0.8.1 github.com/stretchr/testify v1.4.0 go.opencensus.io v0.22.2 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 diff --git a/go.sum b/go.sum index 02c10381d..59b666448 100644 --- a/go.sum +++ b/go.sum @@ -56,6 +56,8 @@ github.com/golang/protobuf v1.3.0 h1:kbxbvI4Un1LUWKxufD+BiE6AEExYYgkQLQmLFqA1LFk github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= diff --git a/opts/options.go b/opts/options.go index 70d6db069..2b0f23083 100644 --- a/opts/options.go +++ b/opts/options.go @@ -6,7 +6,9 @@ import ( ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" + "github.com/libp2p/go-libp2p-kad-dht/persist" "github.com/libp2p/go-libp2p-record" ) @@ -19,12 +21,35 @@ var ( DefaultProtocols = []protocol.ID{ProtocolDHT} ) +// BootstrapConfig specifies parameters used for bootstrapping the DHT. +type BootstrapConfig struct { + BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it + Timeout time.Duration // how long to wait for a bootstrap query to run + RoutingTableScanInterval time.Duration // how often to scan the RT for k-buckets that haven't been queried since the given period + SelfQueryInterval time.Duration // how often to query for self +} + +type PersistConfig struct { + Snapshotter persist.Snapshotter + SeedsProposer persist.SeedsProposer + SnapshotInterval time.Duration + FallbackPeers []peer.ID + + SeederDialTimeout time.Duration // grace period for one dial attempt by the seeder + SeederConcurrentDials int // number of peers the seeder will dial simultaneously + SeederRTSizeTarget int // target number of peers we want in the RT before seeder stops + TotalSeederTimeout time.Duration // grace period for the whole seeding proces +} + +var DefaultSnapshotInterval = 5 * time.Minute + // Options is a structure containing all the options that can be used when constructing a DHT. type Options struct { Datastore ds.Batching Validator record.Validator Client bool Protocols []protocol.ID + Persistence *PersistConfig BucketSize int MaxRecordAge time.Duration EnableProviders bool @@ -35,6 +60,7 @@ type Options struct { RefreshPeriod time.Duration AutoRefresh bool } + BootstrapConfig BootstrapConfig } // Apply applies the given options to this Option @@ -66,6 +92,15 @@ var Defaults = func(o *Options) error { o.RoutingTable.AutoRefresh = true o.MaxRecordAge = time.Hour * 36 + o.Persistence = new(PersistConfig) + o.Persistence.SnapshotInterval = DefaultSnapshotInterval + + o.Persistence.SeederDialTimeout = 5 * time.Second + o.Persistence.SeederConcurrentDials = 50 + o.Persistence.SeederRTSizeTarget = 30 + o.Persistence.TotalSeederTimeout = 1 * time.Minute + o.Persistence.SeedsProposer = persist.NewRandomSeedsProposer() + return nil } @@ -91,6 +126,46 @@ func RoutingTableRefreshPeriod(period time.Duration) Option { } } +func SeedsProposer(sp persist.SeedsProposer) Option { + return func(o *Options) error { + o.Persistence.SeedsProposer = sp + return nil + } +} + +// SeederParams are the params to configure the Dht seeder. Please take a look at the +// doc for the Persistence config +func SeederParams(peerDialTimeout, totalSeederTimeout time.Duration, nConcurrentDials int) Option { + return func(o *Options) error { + o.Persistence.SeederDialTimeout = peerDialTimeout + o.Persistence.SeederConcurrentDials = nConcurrentDials + o.Persistence.TotalSeederTimeout = totalSeederTimeout + return nil + } +} + +func SeederRTSizeTarget(rtSizeTarget int) Option { + return func(o *Options) error { + o.Persistence.SeederRTSizeTarget = rtSizeTarget + return nil + } +} + +func Snapshotter(snpshttr persist.Snapshotter, interval time.Duration) Option { + return func(o *Options) error { + o.Persistence.Snapshotter = snpshttr + o.Persistence.SnapshotInterval = interval + return nil + } +} + +func FallbackPeers(fallback []peer.ID) Option { + return func(o *Options) error { + o.Persistence.FallbackPeers = fallback + return nil + } +} + // Datastore configures the DHT to use the specified datastore. // // Defaults to an in-memory (temporary) map. diff --git a/pb/dht.pb.go b/pb/dht.pb.go index 5f94efeb4..4810fa2c6 100644 --- a/pb/dht.pb.go +++ b/pb/dht.pb.go @@ -260,44 +260,106 @@ func (m *Message_Peer) GetConnection() Message_ConnectionType { return Message_NOT_CONNECTED } +// Encapsulates a routing table snapshot for persistence. Not to be transmitted over the wire. +type RoutingTableSnapshot struct { + // The peers that were members of the routing table. + Peers [][]byte `protobuf:"bytes,1,rep,name=peers,proto3" json:"peers,omitempty"` + // The timestamp when this snapshot was taken. + Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RoutingTableSnapshot) Reset() { *m = RoutingTableSnapshot{} } +func (m *RoutingTableSnapshot) String() string { return proto.CompactTextString(m) } +func (*RoutingTableSnapshot) ProtoMessage() {} +func (*RoutingTableSnapshot) Descriptor() ([]byte, []int) { + return fileDescriptor_616a434b24c97ff4, []int{1} +} +func (m *RoutingTableSnapshot) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *RoutingTableSnapshot) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_RoutingTableSnapshot.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *RoutingTableSnapshot) XXX_Merge(src proto.Message) { + xxx_messageInfo_RoutingTableSnapshot.Merge(m, src) +} +func (m *RoutingTableSnapshot) XXX_Size() int { + return m.Size() +} +func (m *RoutingTableSnapshot) XXX_DiscardUnknown() { + xxx_messageInfo_RoutingTableSnapshot.DiscardUnknown(m) +} + +var xxx_messageInfo_RoutingTableSnapshot proto.InternalMessageInfo + +func (m *RoutingTableSnapshot) GetPeers() [][]byte { + if m != nil { + return m.Peers + } + return nil +} + +func (m *RoutingTableSnapshot) GetTimestamp() int64 { + if m != nil { + return m.Timestamp + } + return 0 +} + func init() { proto.RegisterEnum("dht.pb.Message_MessageType", Message_MessageType_name, Message_MessageType_value) proto.RegisterEnum("dht.pb.Message_ConnectionType", Message_ConnectionType_name, Message_ConnectionType_value) proto.RegisterType((*Message)(nil), "dht.pb.Message") proto.RegisterType((*Message_Peer)(nil), "dht.pb.Message.Peer") + proto.RegisterType((*RoutingTableSnapshot)(nil), "dht.pb.RoutingTableSnapshot") } func init() { proto.RegisterFile("dht.proto", fileDescriptor_616a434b24c97ff4) } var fileDescriptor_616a434b24c97ff4 = []byte{ - // 428 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x52, 0xc1, 0x6e, 0x9b, 0x40, - 0x10, 0xed, 0x02, 0x76, 0xe3, 0x01, 0x93, 0xcd, 0x28, 0x07, 0x94, 0x4a, 0x16, 0xf2, 0x89, 0x1e, - 0x02, 0x12, 0x95, 0x7a, 0xe8, 0xa1, 0x92, 0x0b, 0x34, 0xb2, 0x94, 0x62, 0x6b, 0xeb, 0xa4, 0x47, - 0xcb, 0xc0, 0xca, 0x41, 0xa5, 0x5e, 0x04, 0x24, 0x95, 0xbf, 0xb0, 0x3d, 0xf6, 0x13, 0x2a, 0x7f, - 0x49, 0x05, 0x84, 0x16, 0xfb, 0xd0, 0xd3, 0xbe, 0x37, 0xf3, 0xde, 0xce, 0xdb, 0xd1, 0xc2, 0x28, - 0x79, 0xa8, 0xec, 0xbc, 0x10, 0x95, 0xc0, 0x61, 0x03, 0xa3, 0x2b, 0x77, 0x9b, 0x56, 0x0f, 0x8f, - 0x91, 0x1d, 0x8b, 0x6f, 0x4e, 0x96, 0x46, 0xb9, 0x9b, 0x3b, 0x5b, 0x71, 0xdd, 0xa2, 0xeb, 0x82, - 0xc7, 0xa2, 0x48, 0x9c, 0x3c, 0x72, 0x5a, 0xd4, 0x7a, 0xa7, 0x3f, 0x14, 0x78, 0xf9, 0x89, 0x97, - 0xe5, 0x66, 0xcb, 0xd1, 0x01, 0xa5, 0xda, 0xe7, 0xdc, 0x20, 0x26, 0xb1, 0x74, 0xf7, 0x95, 0xdd, - 0x5e, 0x6b, 0x3f, 0xb7, 0xbb, 0x73, 0xb5, 0xcf, 0x39, 0x6b, 0x84, 0x68, 0xc1, 0x79, 0x9c, 0x3d, - 0x96, 0x15, 0x2f, 0x6e, 0xf9, 0x13, 0xcf, 0xd8, 0xe6, 0xbb, 0x01, 0x26, 0xb1, 0x06, 0xec, 0xb4, - 0x8c, 0x14, 0xe4, 0xaf, 0x7c, 0x6f, 0x48, 0x26, 0xb1, 0x34, 0x56, 0x43, 0x7c, 0x0d, 0xc3, 0x36, - 0x88, 0x21, 0x9b, 0xc4, 0x52, 0xdd, 0x0b, 0xbb, 0xcb, 0x15, 0xd9, 0xac, 0x41, 0xec, 0x59, 0x80, - 0x6f, 0x41, 0x8d, 0x33, 0x51, 0xf2, 0x62, 0xc9, 0x79, 0x51, 0x1a, 0x67, 0xa6, 0x6c, 0xa9, 0xee, - 0xe5, 0x69, 0xbc, 0xba, 0xc9, 0xfa, 0x42, 0x7c, 0x07, 0xe3, 0xbc, 0x10, 0x4f, 0x69, 0xd2, 0x39, - 0x47, 0xff, 0x71, 0x1e, 0x4b, 0xaf, 0x32, 0x50, 0x6a, 0x80, 0x3a, 0x48, 0x69, 0xd2, 0x6c, 0x44, - 0x63, 0x52, 0x9a, 0xe0, 0x25, 0x0c, 0x36, 0x49, 0x52, 0x94, 0x86, 0x64, 0xca, 0x96, 0xc6, 0x5a, - 0x82, 0xef, 0x01, 0x62, 0xb1, 0xdb, 0xf1, 0xb8, 0x4a, 0xc5, 0xae, 0x79, 0x90, 0xee, 0x4e, 0x4e, - 0xc7, 0x78, 0x7f, 0x15, 0xcd, 0x0a, 0x7b, 0x8e, 0x69, 0x0a, 0x6a, 0x6f, 0xbb, 0x38, 0x86, 0xd1, - 0xf2, 0x6e, 0xb5, 0xbe, 0x9f, 0xdd, 0xde, 0x05, 0xf4, 0x45, 0x4d, 0x6f, 0x82, 0x8e, 0x12, 0xa4, - 0xa0, 0xcd, 0x7c, 0x7f, 0xbd, 0x64, 0x8b, 0xfb, 0xb9, 0x1f, 0x30, 0x2a, 0xe1, 0x05, 0x8c, 0x6b, - 0x41, 0x57, 0xf9, 0x4c, 0xe5, 0xda, 0xf3, 0x71, 0x1e, 0xfa, 0xeb, 0x70, 0xe1, 0x07, 0x54, 0xc1, - 0x33, 0x50, 0x96, 0xf3, 0xf0, 0x86, 0x0e, 0xa6, 0x5f, 0x40, 0x3f, 0x0e, 0x52, 0xbb, 0xc3, 0xc5, - 0x6a, 0xed, 0x2d, 0xc2, 0x30, 0xf0, 0x56, 0x81, 0xdf, 0x4e, 0xfc, 0x47, 0x09, 0x9e, 0x83, 0xea, - 0xcd, 0xc2, 0x4e, 0x41, 0x25, 0x44, 0xd0, 0xbd, 0x59, 0xd8, 0x73, 0x51, 0xf9, 0x83, 0xf6, 0xf3, - 0x30, 0x21, 0xbf, 0x0e, 0x13, 0xf2, 0xfb, 0x30, 0x21, 0xd1, 0xb0, 0xf9, 0x5e, 0x6f, 0xfe, 0x04, - 0x00, 0x00, 0xff, 0xff, 0xf4, 0x3c, 0x3f, 0x3f, 0xa7, 0x02, 0x00, 0x00, + // 472 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x52, 0xcd, 0x6e, 0x9b, 0x40, + 0x18, 0xec, 0x82, 0xed, 0xc6, 0x9f, 0x7f, 0x42, 0x56, 0x3e, 0xa0, 0xb4, 0xb2, 0x90, 0x4f, 0xf4, + 0x10, 0x2c, 0x51, 0xa9, 0x87, 0x1e, 0x2a, 0xb9, 0x40, 0x23, 0x57, 0x29, 0xb6, 0x36, 0x24, 0x3d, + 0x5a, 0xfc, 0xac, 0x6c, 0x54, 0xcc, 0xae, 0x60, 0x9d, 0xca, 0x4f, 0xd8, 0x1e, 0xfb, 0x08, 0x95, + 0x9f, 0xa4, 0x62, 0x09, 0x8d, 0xe3, 0x43, 0x4e, 0xcc, 0x7c, 0x3b, 0xc3, 0xce, 0x8e, 0x3e, 0xe8, + 0x26, 0x1b, 0x61, 0xf1, 0x82, 0x09, 0x86, 0x3b, 0x12, 0x46, 0x97, 0xf6, 0x3a, 0x15, 0x9b, 0x5d, + 0x64, 0xc5, 0x6c, 0x3b, 0xcd, 0xd2, 0x88, 0xdb, 0x7c, 0xba, 0x66, 0x57, 0x35, 0xba, 0x2a, 0x68, + 0xcc, 0x8a, 0x64, 0xca, 0xa3, 0x69, 0x8d, 0x6a, 0xef, 0xe4, 0x57, 0x0b, 0x5e, 0x7f, 0xa3, 0x65, + 0x19, 0xae, 0x29, 0x9e, 0x42, 0x4b, 0xec, 0x39, 0xd5, 0x91, 0x81, 0xcc, 0xa1, 0xfd, 0xc6, 0xaa, + 0x7f, 0x6b, 0x3d, 0x1e, 0x37, 0xdf, 0x60, 0xcf, 0x29, 0x91, 0x42, 0x6c, 0xc2, 0x79, 0x9c, 0xed, + 0x4a, 0x41, 0x8b, 0x1b, 0xfa, 0x40, 0x33, 0x12, 0xfe, 0xd4, 0xc1, 0x40, 0x66, 0x9b, 0x9c, 0x8e, + 0xb1, 0x06, 0xea, 0x0f, 0xba, 0xd7, 0x15, 0x03, 0x99, 0x7d, 0x52, 0x41, 0xfc, 0x0e, 0x3a, 0x75, + 0x10, 0x5d, 0x35, 0x90, 0xd9, 0xb3, 0x2f, 0xac, 0x26, 0x57, 0x64, 0x11, 0x89, 0xc8, 0xa3, 0x00, + 0x7f, 0x80, 0x5e, 0x9c, 0xb1, 0x92, 0x16, 0x4b, 0x4a, 0x8b, 0x52, 0x3f, 0x33, 0x54, 0xb3, 0x67, + 0x8f, 0x4e, 0xe3, 0x55, 0x87, 0xe4, 0x58, 0x88, 0x3f, 0xc2, 0x80, 0x17, 0xec, 0x21, 0x4d, 0x1a, + 0x67, 0xf7, 0x05, 0xe7, 0x73, 0xe9, 0x65, 0x06, 0xad, 0x0a, 0xe0, 0x21, 0x28, 0x69, 0x22, 0x1b, + 0xe9, 0x13, 0x25, 0x4d, 0xf0, 0x08, 0xda, 0x61, 0x92, 0x14, 0xa5, 0xae, 0x18, 0xaa, 0xd9, 0x27, + 0x35, 0xc1, 0x9f, 0x00, 0x62, 0x96, 0xe7, 0x34, 0x16, 0x29, 0xcb, 0xe5, 0x83, 0x86, 0xf6, 0xf8, + 0xf4, 0x1a, 0xe7, 0xbf, 0x42, 0x56, 0x78, 0xe4, 0x98, 0xa4, 0xd0, 0x3b, 0x6a, 0x17, 0x0f, 0xa0, + 0xbb, 0xbc, 0x0b, 0x56, 0xf7, 0xb3, 0x9b, 0x3b, 0x4f, 0x7b, 0x55, 0xd1, 0x6b, 0xaf, 0xa1, 0x08, + 0x6b, 0xd0, 0x9f, 0xb9, 0xee, 0x6a, 0x49, 0x16, 0xf7, 0x73, 0xd7, 0x23, 0x9a, 0x82, 0x2f, 0x60, + 0x50, 0x09, 0x9a, 0xc9, 0xad, 0xa6, 0x56, 0x9e, 0x2f, 0x73, 0xdf, 0x5d, 0xf9, 0x0b, 0xd7, 0xd3, + 0x5a, 0xf8, 0x0c, 0x5a, 0xcb, 0xb9, 0x7f, 0xad, 0xb5, 0x27, 0xdf, 0x61, 0xf8, 0x3c, 0x48, 0xe5, + 0xf6, 0x17, 0xc1, 0xca, 0x59, 0xf8, 0xbe, 0xe7, 0x04, 0x9e, 0x5b, 0xdf, 0xf8, 0x44, 0x11, 0x3e, + 0x87, 0x9e, 0x33, 0xf3, 0x1b, 0x85, 0xa6, 0x60, 0x0c, 0x43, 0x67, 0xe6, 0x1f, 0xb9, 0x34, 0x75, + 0xf2, 0x15, 0x46, 0x84, 0xed, 0x44, 0x9a, 0xaf, 0x83, 0x30, 0xca, 0xe8, 0x6d, 0x1e, 0xf2, 0x72, + 0xc3, 0x44, 0xd5, 0x18, 0x97, 0xed, 0xa3, 0xba, 0x31, 0x49, 0xf0, 0x5b, 0xe8, 0x8a, 0x74, 0x4b, + 0x4b, 0x11, 0x6e, 0xb9, 0x5c, 0x0b, 0x95, 0x3c, 0x0d, 0x3e, 0xf7, 0x7f, 0x1f, 0xc6, 0xe8, 0xcf, + 0x61, 0x8c, 0xfe, 0x1e, 0xc6, 0x28, 0xea, 0xc8, 0x55, 0x7d, 0xff, 0x2f, 0x00, 0x00, 0xff, 0xff, + 0x53, 0xb2, 0x24, 0x7a, 0xf3, 0x02, 0x00, 0x00, } func (m *Message) Marshal() (dAtA []byte, err error) { @@ -432,6 +494,47 @@ func (m *Message_Peer) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *RoutingTableSnapshot) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *RoutingTableSnapshot) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *RoutingTableSnapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.Timestamp != 0 { + i = encodeVarintDht(dAtA, i, uint64(m.Timestamp)) + i-- + dAtA[i] = 0x10 + } + if len(m.Peers) > 0 { + for iNdEx := len(m.Peers) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Peers[iNdEx]) + copy(dAtA[i:], m.Peers[iNdEx]) + i = encodeVarintDht(dAtA, i, uint64(len(m.Peers[iNdEx]))) + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + func encodeVarintDht(dAtA []byte, offset int, v uint64) int { offset -= sovDht(v) base := offset @@ -506,6 +609,27 @@ func (m *Message_Peer) Size() (n int) { return n } +func (m *RoutingTableSnapshot) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Peers) > 0 { + for _, b := range m.Peers { + l = len(b) + n += 1 + l + sovDht(uint64(l)) + } + } + if m.Timestamp != 0 { + n += 1 + sovDht(uint64(m.Timestamp)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + func sovDht(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -881,6 +1005,111 @@ func (m *Message_Peer) Unmarshal(dAtA []byte) error { } return nil } +func (m *RoutingTableSnapshot) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDht + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: RoutingTableSnapshot: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: RoutingTableSnapshot: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Peers", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDht + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthDht + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthDht + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Peers = append(m.Peers, make([]byte, postIndex-iNdEx)) + copy(m.Peers[len(m.Peers)-1], dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) + } + m.Timestamp = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDht + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Timestamp |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipDht(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthDht + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthDht + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipDht(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 @@ -938,9 +1167,6 @@ func skipDht(dAtA []byte) (n int, err error) { return 0, ErrInvalidLengthDht } iNdEx += length - if iNdEx < 0 { - return 0, ErrInvalidLengthDht - } case 3: depth++ case 4: @@ -953,6 +1179,9 @@ func skipDht(dAtA []byte) (n int, err error) { default: return 0, fmt.Errorf("proto: illegal wireType %d", wireType) } + if iNdEx < 0 { + return 0, ErrInvalidLengthDht + } if depth == 0 { return iNdEx, nil } diff --git a/pb/dht.proto b/pb/dht.proto index 4d2d1fd2e..9610f1a52 100644 --- a/pb/dht.proto +++ b/pb/dht.proto @@ -69,3 +69,12 @@ message Message { // GET_VALUE, ADD_PROVIDER, GET_PROVIDERS repeated Peer providerPeers = 9; } + +// Encapsulates a routing table snapshot for persistence. Not to be transmitted over the wire. +message RoutingTableSnapshot { + // The peers that were members of the routing table. + repeated bytes peers = 1; + + // The timestamp when this snapshot was taken. + int64 timestamp = 2; +} \ No newline at end of file diff --git a/persist/interfaces.go b/persist/interfaces.go new file mode 100644 index 000000000..5df5344be --- /dev/null +++ b/persist/interfaces.go @@ -0,0 +1,34 @@ +package persist + +import ( + "context" + + peer "github.com/libp2p/go-libp2p-core/peer" + kbucket "github.com/libp2p/go-libp2p-kbucket" + + log "github.com/ipfs/go-log" +) + +var logSnapshot = log.Logger("dht/snapshot") +var logSeedProposer = log.Logger("dht/seeds-proposer") + +// A SeedsProposer proposes a set of eligible peers from a given set of candidates & fallback peers +// for seeding the RT. +type SeedsProposer interface { + // Propose takes an optional set of candidates from a snapshot (or nil if none could be loaded), + // and a set of fallback peers, and it returns a channel of peers that can be + // used to seed the given routing table. + // Returns an empty channel if it has no proposals. + // Note: Seeding a routing table with the eligible peers will work only if the the dht uses a persistent peerstore across restarts. + // This is because we can recover metadata for the candidate peers only if the peerstore was/is persistent. + Propose(ctx context.Context, rt *kbucket.RoutingTable, candidates []peer.ID, fallback []peer.ID) chan peer.ID +} + +// A Snapshotter provides the ability to save and restore a routing table from a persistent medium. +type Snapshotter interface { + // Load recovers a snapshot from storage, and returns candidates to integrate in a fresh routing table. + Load() ([]peer.ID, error) + + // Store persists the current state of the routing table. + Store(rt *kbucket.RoutingTable) error +} diff --git a/persist/seeds_proposer.go b/persist/seeds_proposer.go new file mode 100644 index 000000000..1c554ca2c --- /dev/null +++ b/persist/seeds_proposer.go @@ -0,0 +1,51 @@ +package persist + +import ( + "context" + "math/rand" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-kbucket" +) + +type randomSeedsProposer struct{} + +var _ SeedsProposer = (*randomSeedsProposer)(nil) + +// NewRandomSeedsProposer returns a SeedsProposer that proposes seeds for the routing table +// It proposes random seeds from the supplied candidates & fallback peer set, prioritizing candidates over fallbacks +// The fallback peers are guaranteed to exist in the peerstore. +func NewRandomSeedsProposer() SeedsProposer { + return &randomSeedsProposer{} +} + +func (rs *randomSeedsProposer) Propose(ctx context.Context, rt *kbucket.RoutingTable, candidates []peer.ID, fallback []peer.ID) chan peer.ID { + peerChan := make(chan peer.ID) + + go func() { + defer close(peerChan) + + // return if both candidates and fallback peers are empty + if len(candidates) == 0 && len(fallback) == 0 { + logSeedProposer.Info("not returning any proposals as both candidate & fallback peer set is empty") + return + } + + // copy the candidates & shuffle to randomize seeding + cpy := make([]peer.ID, len(candidates)) + copy(cpy, candidates) + rand.Shuffle(len(cpy), func(i, j int) { + cpy[i], cpy[j] = cpy[j], cpy[i] + }) + + for _, p := range append(cpy, fallback...) { + select { + case <-ctx.Done(): + return + case peerChan <- p: + } + } + }() + + return peerChan +} diff --git a/persist/seeds_proposer_test.go b/persist/seeds_proposer_test.go new file mode 100644 index 000000000..cf738539c --- /dev/null +++ b/persist/seeds_proposer_test.go @@ -0,0 +1,61 @@ +package persist + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/test" + kb "github.com/libp2p/go-libp2p-kbucket" + "github.com/libp2p/go-libp2p-peerstore" + "github.com/stretchr/testify/require" +) + +func TestRandomSeedsProposer(t *testing.T) { + testCases := map[string]struct { + nTotalCandidates int // snapshotted candidate list + nFallbacks int // fallback list + + expectedNumPeersInProposal int // number of proposals we expect + }{ + "No Proposals -> candidate & fallback sets are empty": {0, 0, 0}, + + "Success -> only candidates": {4, 0, 4}, + + "Success -> candidates + fallbacks": {4, 5, 9}, + + "Success -> only fallbacks": {0, 6, 6}, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + for name, testcase := range testCases { + + // create candidate hosts & add them to the peer store + candidatePids := make([]peer.ID, testcase.nTotalCandidates) + for i := 0; i < testcase.nTotalCandidates; i++ { + candidatePids[i] = test.RandPeerIDFatal(t) + } + + fallbackPids := make([]peer.ID, testcase.nFallbacks) + for i := 0; i < testcase.nFallbacks; i++ { + fallbackPids[i] = test.RandPeerIDFatal(t) + } + + // create RT & fill it with required number of peers + rt := kb.NewRoutingTable(50, kb.ConvertPeerID(test.RandPeerIDFatal(t)), time.Hour, peerstore.NewMetrics()) + + // run seeds proposer & assert + rs := NewRandomSeedsProposer() + pChan := rs.Propose(ctx, rt, candidatePids, fallbackPids) + + nPeers := 0 + for _ = range pChan { + nPeers++ + } + require.Equalf(t, testcase.expectedNumPeersInProposal, nPeers, "expected %d peers in proposal, but got only %d for test %s", + testcase.expectedNumPeersInProposal, nPeers, name) + } +} diff --git a/persist/snapshot.go b/persist/snapshot.go new file mode 100644 index 000000000..0b594ddbb --- /dev/null +++ b/persist/snapshot.go @@ -0,0 +1,92 @@ +package persist + +import ( + "errors" + "time" + + ds "github.com/ipfs/go-datastore" + nsds "github.com/ipfs/go-datastore/namespace" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-kad-dht/pb" + kb "github.com/libp2p/go-libp2p-kbucket" +) + +var ( + dsSnapshotKey = ds.NewKey("routing_table") +) + +var DefaultSnapshotNS = "/kad-dht/snapshot" + +type dsSnapshotter struct { + ds.Datastore +} + +var _ Snapshotter = (*dsSnapshotter)(nil) + +// NewDatastoreSnapshotter returns a Snapshotter backed by a datastore, under the specified non-optional namespace. +func NewDatastoreSnapshotter(dstore ds.Datastore, namespace string) (Snapshotter, error) { + if dstore == nil { + return nil, errors.New("datastore is nil when creating a datastore snapshotter") + } + if namespace == "" { + return nil, errors.New("blank namespace when creating a datastore snapshotter") + } + dstore = nsds.Wrap(dstore, ds.NewKey(namespace)) + return &dsSnapshotter{dstore}, nil +} + +func (dsp *dsSnapshotter) Load() (result []peer.ID, err error) { + val, err := dsp.Get(dsSnapshotKey) + + switch err { + case nil: + case ds.ErrNotFound: + return nil, nil + default: + return nil, err + } + + s := &dht_pb.RoutingTableSnapshot{} + if err := s.Unmarshal(val); err != nil { + return nil, err + } + + var pid peer.ID + for _, id := range s.Peers { + if err := pid.Unmarshal(id); err != nil { + logSnapshot.Warningf("encountered invalid peer ID while restoring routing table snapshot; err: %s", err) + continue + } + result = append(result, pid) + } + return result, err +} + +func (dsp *dsSnapshotter) Store(rt *kb.RoutingTable) error { + var data [][]byte + for _, p := range rt.ListPeers() { + id, err := p.MarshalBinary() + if err != nil { + logSnapshot.Warningf("encountered error while adding peer to routing table snapshot; skipping; err: %s", p, err) + continue + } + data = append(data, id) + } + + snap := dht_pb.RoutingTableSnapshot{ + Peers: data, + Timestamp: time.Now().Unix(), + } + + bytes, err := snap.Marshal() + if err != nil { + return err + } + + if err := dsp.Put(dsSnapshotKey, bytes); err != nil { + return err + } + + // flush to disk + return dsp.Sync(dsSnapshotKey) +} diff --git a/persist/snapshot_test.go b/persist/snapshot_test.go new file mode 100644 index 000000000..3c23688b8 --- /dev/null +++ b/persist/snapshot_test.go @@ -0,0 +1,65 @@ +package persist + +import ( + "testing" + "time" + + kb "github.com/libp2p/go-libp2p-kbucket" + "github.com/libp2p/go-libp2p-peerstore" + + ds "github.com/ipfs/go-datastore" + "github.com/libp2p/go-libp2p-core/test" + "github.com/stretchr/testify/assert" +) + +func TestNewDatastoreSnapshotter(t *testing.T) { + _, err := NewDatastoreSnapshotter(nil, "rand") + assert.Error(t, err) + _, err = NewDatastoreSnapshotter(ds.NewMapDatastore(), "") + assert.Error(t, err) + _, err = NewDatastoreSnapshotter(ds.NewMapDatastore(), "rand") + assert.NoError(t, err) +} + +func TestStoreAndLoad(t *testing.T) { + nBuckets := 5 + nPeers := 2 + + // create a routing table with nBuckets & nPeers in each bucket + local := test.RandPeerIDFatal(t) + rt := kb.NewRoutingTable(nPeers, kb.ConvertPeerID(local), time.Hour, peerstore.NewMetrics()) + for i := 0; i < nBuckets; i++ { + for j := 0; j < nPeers; j++ { + for { + if p := test.RandPeerIDFatal(t); kb.CommonPrefixLen(kb.ConvertPeerID(local), kb.ConvertPeerID(p)) == i { + rt.Update(p) + break + } + } + } + } + + assert.Lenf(t, rt.Buckets, nBuckets, "test setup failed..should have %d buckets", nBuckets) + assert.Len(t, rt.ListPeers(), nBuckets*nPeers, "test setup failed..should have %d peers", nBuckets*nPeers) + + // create a snapshotter with an in-memory ds + snapshotter, err := NewDatastoreSnapshotter(ds.NewMapDatastore(), "test") + assert.NoError(t, err) + + // store snapshot + assert.NoError(t, snapshotter.Store(rt)) + + // load snapshot & verify it is as expected + peers, err := snapshotter.Load() + assert.NoError(t, err) + assert.Lenf(t, peers, nBuckets*nPeers, "should have got %d peers but got only %d", nBuckets*nPeers, len(peers)) + + assert.Equal(t, rt.ListPeers(), peers, "peers obtained from the stored snapshot do match the peers in the routing table") + + // Load an empty snapshot + snapshotter, err = NewDatastoreSnapshotter(ds.NewMapDatastore(), "test") + assert.NoError(t, err) + peers, err = snapshotter.Load() + assert.NoError(t, err) + assert.Empty(t, peers) +}