From 9d30fd58c2a6b4517df8a733c00cd2a0d46201d0 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Tue, 9 Sep 2025 22:08:30 +0200 Subject: [PATCH 01/12] provider: ResettableKeyStore --- dual/provider/options.go | 4 +- dual/provider/provider.go | 2 +- provider/datastore/keystore.go | 230 ++++++++++++++++++++++------ provider/datastore/keystore_test.go | 4 +- provider/options.go | 4 +- provider/provider.go | 2 +- 6 files changed, 193 insertions(+), 53 deletions(-) diff --git a/dual/provider/options.go b/dual/provider/options.go index f643ded12..a7ee5f41d 100644 --- a/dual/provider/options.go +++ b/dual/provider/options.go @@ -19,7 +19,7 @@ const ( ) type config struct { - keyStore *datastore.KeyStore + keyStore datastore.KeyStore reprovideInterval [2]time.Duration // [0] = LAN, [1] = WAN maxReprovideDelay [2]time.Duration @@ -87,7 +87,7 @@ var DefaultConfig = func(cfg *config) error { return nil } -func WithKeyStore(keyStore *datastore.KeyStore) Option { +func WithKeyStore(keyStore datastore.KeyStore) Option { return func(cfg *config) error { if keyStore == nil { return errors.New("provider config: keyStore cannot be nil") diff --git a/dual/provider/provider.go b/dual/provider/provider.go index 19ad184f7..6099b2c7f 100644 --- a/dual/provider/provider.go +++ b/dual/provider/provider.go @@ -19,7 +19,7 @@ type SweepingProvider struct { dht *dual.DHT LAN *provider.SweepingProvider WAN *provider.SweepingProvider - keyStore *datastore.KeyStore + keyStore datastore.KeyStore } // New creates a new SweepingProvider that manages provides and reprovides for diff --git a/provider/datastore/keystore.go b/provider/datastore/keystore.go index b2ceeb2a1..6d4760c86 100644 --- a/provider/datastore/keystore.go +++ b/provider/datastore/keystore.go @@ -3,9 +3,11 @@ package datastore import ( "context" "encoding/base64" + "errors" "fmt" "strings" "sync" + "sync/atomic" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" @@ -18,8 +20,19 @@ import ( "github.com/probe-lab/go-libdht/kad/key/bitstr" ) -// KeyStore indexes multihashes by their kademlia identifier. -type KeyStore struct { +// KeyStore provides thread-safe storage and retrieval of multihashes, indexed +// by their kademlia 256-bit identifier. +type KeyStore interface { + Put(context.Context, ...mh.Multihash) ([]mh.Multihash, error) + Get(context.Context, bitstr.Key) ([]mh.Multihash, error) + ContainsPrefix(context.Context, bitstr.Key) (bool, error) + Delete(context.Context, ...mh.Multihash) error + Empty(context.Context) error + Size(context.Context) (int, error) +} + +// keyStore indexes multihashes by their kademlia identifier. +type keyStore struct { lk sync.Mutex ds ds.Batching @@ -91,7 +104,7 @@ func WithBatchSize(size int) KeyStoreOption { } // NewKeyStore creates a new KeyStore backed by the provided datastore. -func NewKeyStore(d ds.Batching, opts ...KeyStoreOption) (*KeyStore, error) { +func NewKeyStore(d ds.Batching, opts ...KeyStoreOption) (KeyStore, error) { var cfg keyStoreCfg opts = append([]KeyStoreOption{KeyStoreDefaultCfg}, opts...) for i, o := range opts { @@ -99,7 +112,7 @@ func NewKeyStore(d ds.Batching, opts ...KeyStoreOption) (*KeyStore, error) { return nil, fmt.Errorf("KeyStore option %d failed: %w", i, err) } } - return &KeyStore{ + return &keyStore{ ds: d, base: ds.NewKey(cfg.base), prefixBits: cfg.prefixBits, @@ -107,34 +120,6 @@ func NewKeyStore(d ds.Batching, opts ...KeyStoreOption) (*KeyStore, error) { }, nil } -// ResetCids purges the KeyStore and repopulates it with the provided cids' -// multihashes. -func (s *KeyStore) ResetCids(ctx context.Context, keysChan <-chan cid.Cid) error { - s.lk.Lock() - defer s.lk.Unlock() - - err := s.emptyLocked(ctx) - if err != nil { - return fmt.Errorf("KeyStore empty failed during reset: %w", err) - } - keys := make([]mh.Multihash, 0, s.batchSize) - for c := range keysChan { - keys = append(keys, c.Hash()) - if len(keys) == cap(keys) { - _, err = s.putLocked(ctx, keys...) - if err != nil { - return fmt.Errorf("KeyStore put failed during reset: %w", err) - } - keys = keys[:0] - } - } - _, err = s.putLocked(ctx, keys...) - if err != nil { - return fmt.Errorf("KeyStore put failed during reset: %w", err) - } - return nil -} - // dsKey returns the datastore key for the provided binary key. // // The function creates a hierarchical datastore key by expanding bits into @@ -179,7 +164,7 @@ func dsKey[K kad.Key[K]](k K, prefixBits int, base ds.Key) ds.Key { // "base/bit0/bit1/.../bitN/base64url_suffix" // // Returns the reconstructed 256-bit key or an error if base64URL decoding fails. -func (s *KeyStore) decodeKey(dsk string) (bit256.Key, error) { +func (s *keyStore) decodeKey(dsk string) (bit256.Key, error) { dsk = dsk[len(s.base.String()):] // remove leading prefix bs := make([]byte, 32) // Extract individual bits from odd positions (skip '/' separators) @@ -204,7 +189,7 @@ type pair struct { // putLocked stores the provided keys while assuming s.lk is already held, and // returns the keys that weren't present already in the keystore. -func (s *KeyStore) putLocked(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihash, error) { +func (s *keyStore) putLocked(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihash, error) { seen := make(map[bit256.Key]struct{}, len(keys)) toPut := make([]pair, 0, len(keys)) @@ -252,7 +237,7 @@ func (s *KeyStore) putLocked(ctx context.Context, keys ...mh.Multihash) ([]mh.Mu // Put stores the provided keys in the underlying datastore, grouping them by // the first prefixLen bits. It returns only the keys that were not previously // persisted in the datastore (i.e., newly added keys). -func (s *KeyStore) Put(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihash, error) { +func (s *keyStore) Put(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihash, error) { if len(keys) == 0 { return nil, nil } @@ -264,7 +249,7 @@ func (s *KeyStore) Put(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihas // Get returns all keys whose bit256 representation matches the provided // prefix. -func (s *KeyStore) Get(ctx context.Context, prefix bitstr.Key) ([]mh.Multihash, error) { +func (s *keyStore) Get(ctx context.Context, prefix bitstr.Key) ([]mh.Multihash, error) { s.lk.Lock() defer s.lk.Unlock() @@ -300,7 +285,7 @@ func (s *KeyStore) Get(ctx context.Context, prefix bitstr.Key) ([]mh.Multihash, // ContainsPrefix reports whether the KeyStore currently holds at least one // multihash whose kademlia identifier (bit256.Key) starts with the provided // bit-prefix. -func (s *KeyStore) ContainsPrefix(ctx context.Context, prefix bitstr.Key) (bool, error) { +func (s *keyStore) ContainsPrefix(ctx context.Context, prefix bitstr.Key) (bool, error) { s.lk.Lock() defer s.lk.Unlock() @@ -337,8 +322,8 @@ func (s *KeyStore) ContainsPrefix(ctx context.Context, prefix bitstr.Key) (bool, // emptyLocked deletes all entries under the datastore prefix, assuming s.lk is // already held. -func (s *KeyStore) emptyLocked(ctx context.Context) error { - res, err := s.ds.Query(ctx, query.Query{Prefix: s.base.String(), KeysOnly: true}) +func emptyLocked(ctx context.Context, d ds.Batching, base ds.Key, batchSize int) error { + res, err := d.Query(ctx, query.Query{Prefix: base.String(), KeysOnly: true}) if err != nil { return err } @@ -348,7 +333,7 @@ func (s *KeyStore) emptyLocked(ctx context.Context) error { if len(keys) == 0 { return nil } - b, err := s.ds.Batch(ctx) + b, err := d.Batch(ctx) if err != nil { return nil } @@ -359,7 +344,7 @@ func (s *KeyStore) emptyLocked(ctx context.Context) error { } return b.Commit(ctx) } - keys := make([]ds.Key, 0, s.batchSize) + keys := make([]ds.Key, 0, batchSize) for r := range res.Next() { if r.Error != nil { return r.Error @@ -376,15 +361,15 @@ func (s *KeyStore) emptyLocked(ctx context.Context) error { } // Empty deletes all entries under the datastore prefix. -func (s *KeyStore) Empty(ctx context.Context) error { +func (s *keyStore) Empty(ctx context.Context) error { s.lk.Lock() defer s.lk.Unlock() - return s.emptyLocked(ctx) + return emptyLocked(ctx, s.ds, s.base, s.batchSize) } // Delete removes the given keys from datastore. -func (s *KeyStore) Delete(ctx context.Context, keys ...mh.Multihash) error { +func (s *keyStore) Delete(ctx context.Context, keys ...mh.Multihash) error { if len(keys) == 0 { return nil } @@ -409,7 +394,7 @@ func (s *KeyStore) Delete(ctx context.Context, keys ...mh.Multihash) error { // // The size is obtained by iterating over all keys in the underlying // datastore, so it may be expensive for large stores. -func (s *KeyStore) Size(ctx context.Context) (size int, err error) { +func (s *keyStore) Size(ctx context.Context) (size int, err error) { s.lk.Lock() defer s.lk.Unlock() @@ -429,3 +414,158 @@ func (s *KeyStore) Size(ctx context.Context) (size int, err error) { } return } + +var ErrResetInProgress = errors.New("reset already in progress") + +const ( + ready uint32 = iota + inProgress + cleanup +) + +// ResettableKeyStore is a KeyStore implementation that supports atomic reset +// operations. It maintains two alternate bases in the underlying datastore and +// can swap between them to provide atomic replacement of all stored keys. +// +// The reset operation allows replacing all stored multihashes with a new set +// without interrupting concurrent read/write operations. During a reset, new +// writes are duplicated to both the current and alternate storage bases to +// maintain consistency. +type ResettableKeyStore struct { + keyStore + + altBase ds.Key + // resetInProgress tracks the state of the reset operation: + // - 0: no reset in progress, ready for new reset + // - 1: reset in progress, writing to alternate base + // - 2: post reset cleanup + resetState atomic.Uint32 +} + +var _ KeyStore = (*ResettableKeyStore)(nil) + +// NewResettableKeyStore creates a new ResettableKeyStore backed by the +// provided datastore. It automatically adds "/0" and "/1" suffixes to the +// configured base prefix to create two alternate storage locations for atomic +// reset operations. +func NewResettableKeyStore(d ds.Batching, opts ...KeyStoreOption) (*ResettableKeyStore, error) { + var cfg keyStoreCfg + opts = append([]KeyStoreOption{KeyStoreDefaultCfg}, opts...) + for i, o := range opts { + if err := o(&cfg); err != nil { + return nil, fmt.Errorf("KeyStore option %d failed: %w", i, err) + } + } + return &ResettableKeyStore{ + keyStore: keyStore{ + ds: d, + base: ds.NewKey(cfg.base).ChildString("0"), + prefixBits: cfg.prefixBits, + batchSize: cfg.batchSize, + }, + altBase: ds.NewKey(cfg.base).ChildString("1"), + }, nil +} + +func batchPut(ctx context.Context, d ds.Batching, base ds.Key, prefixBits int, keys []mh.Multihash) error { + b, err := d.Batch(ctx) + if err != nil { + return err + } + for _, h := range keys { + dsk := dsKey(keyspace.MhToBit256(h), prefixBits, base) + if err := b.Put(ctx, dsk, h); err != nil { + return err + } + } + return b.Commit(ctx) +} + +// ResetCids atomically replaces all stored keys with the CIDs received from +// keysChan. The operation is thread-safe and non-blocking for concurrent reads +// and writes. +// +// During the reset: +// - New keys from keysChan are written to an alternate storage location +// - Concurrent Put operations are duplicated to both current and alternate +// locations +// - Once all keys are processed, storage locations are atomically swapped +// - The old storage location is cleaned up +// +// Returns ErrResetInProgress if another reset operation is already running. +// The operation can be cancelled via context, which will clean up partial +// state. +func (s *ResettableKeyStore) ResetCids(ctx context.Context, keysChan <-chan cid.Cid) error { + if !s.resetState.CompareAndSwap(ready, inProgress) { + return ErrResetInProgress + } + defer s.resetState.Store(ready) + + // Make sure the alternate base is empty + if err := emptyLocked(ctx, s.ds, s.altBase, s.batchSize); err != nil { + return fmt.Errorf("ResettableKeyStore empty failed during reset: %w", err) + } + + // Read all the keys from the channel and put them in batch at the alternate + // base + keys := make([]mh.Multihash, 0, s.batchSize) +loop: + for { + select { + case <-ctx.Done(): + // Context cancelled, abort reset and clean up alternate base + s.resetState.Store(cleanup) + if err := emptyLocked(ctx, s.ds, s.altBase, s.batchSize); err != nil { + return fmt.Errorf("ResettableKeyStore empty failed during reset: %w", err) + } + return nil + case c, ok := <-keysChan: + if !ok { + break loop + } + keys = append(keys, c.Hash()) + if len(keys) == cap(keys) { + if err := batchPut(ctx, s.ds, s.altBase, s.prefixBits, keys); err != nil { + return fmt.Errorf("ResettableKeyStore put failed during reset: %w", err) + } + keys = keys[:0] + } + } + } + // Put final batch + if err := batchPut(ctx, s.ds, s.altBase, s.prefixBits, keys); err != nil { + return fmt.Errorf("ResettableKeyStore put failed during reset: %w", err) + } + + // Swap the bases. The base in use is now the one to which we just wrote. + s.lk.Lock() + oldBase := s.base + s.base = s.altBase + s.resetState.Store(cleanup) + s.lk.Unlock() + s.altBase = oldBase + + // Clean up the old base + if err := emptyLocked(ctx, s.ds, oldBase, s.batchSize); err != nil { + return fmt.Errorf("ResettableKeyStore empty failed during reset: %w", err) + } + + return nil +} + +// Put stores the provided keys in the underlying datastore, with special +// handling during reset operations. When a reset is in progress, keys are +// written to both the current storage location and the alternate location +// being prepared for the reset. +// +// This ensures that keys written during a reset operation remain available +// after the reset completes. Returns only the keys that were not previously +// persisted. +func (s *ResettableKeyStore) Put(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihash, error) { + if s.resetState.Load() == inProgress { + // Reset is in progress, write to alternate base in addition to current + // base. + batchPut(ctx, s.ds, s.altBase, s.prefixBits, keys) + } + return s.keyStore.Put(ctx, keys...) +} diff --git a/provider/datastore/keystore_test.go b/provider/datastore/keystore_test.go index 8276d55ab..b9f505ebe 100644 --- a/provider/datastore/keystore_test.go +++ b/provider/datastore/keystore_test.go @@ -151,7 +151,7 @@ func TestKeyStoreReset(t *testing.T) { ds := ds.NewMapDatastore() defer ds.Close() - store, err := NewKeyStore(ds) + store, err := NewResettableKeyStore(ds) if err != nil { t.Fatal(err) } @@ -289,7 +289,7 @@ func TestKeyStoreSize(t *testing.T) { } func TestDsKey(t *testing.T) { - s := KeyStore{ + s := keyStore{ base: ds.NewKey("/base/prefix"), prefixBits: 8, } diff --git a/provider/options.go b/provider/options.go index 891fbf1a1..f9ac39adb 100644 --- a/provider/options.go +++ b/provider/options.go @@ -41,7 +41,7 @@ type config struct { peerid peer.ID router KadClosestPeersRouter - keyStore *datastore.KeyStore + keyStore datastore.KeyStore msgSender pb.MessageSender selfAddrs func() []ma.Multiaddr @@ -272,7 +272,7 @@ func WithMaxProvideConnsPerWorker(n int) Option { // WithKeyStore defines the KeyStore used to keep track of the keys that need // to be reprovided. -func WithKeyStore(keyStore *datastore.KeyStore) Option { +func WithKeyStore(keyStore datastore.KeyStore) Option { return func(cfg *config) error { if keyStore == nil { return errors.New("reprovider config: multihash store cannot be nil") diff --git a/provider/provider.go b/provider/provider.go index 27d0fd200..0f5ff4641 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -117,7 +117,7 @@ type SweepingProvider struct { connectivity *connectivity.ConnectivityChecker - keyStore *datastore.KeyStore + keyStore datastore.KeyStore replicationFactor int From 172e8571435a249d1ac366dd9ed328ba91fa0ef7 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Wed, 10 Sep 2025 16:06:13 +0200 Subject: [PATCH 02/12] keystore: remove mutex --- provider/datastore/keystore.go | 448 ++++++++++-------- provider/datastore/keystore_test.go | 261 +++++----- provider/datastore/resettable_keystore.go | 291 ++++++++++++ .../datastore/resettable_keystore_test.go | 72 +++ provider/provider.go | 1 + provider/provider_test.go | 1 + 6 files changed, 733 insertions(+), 341 deletions(-) create mode 100644 provider/datastore/resettable_keystore.go create mode 100644 provider/datastore/resettable_keystore_test.go diff --git a/provider/datastore/keystore.go b/provider/datastore/keystore.go index 6d4760c86..46b4b2799 100644 --- a/provider/datastore/keystore.go +++ b/provider/datastore/keystore.go @@ -6,10 +6,7 @@ import ( "errors" "fmt" "strings" - "sync" - "sync/atomic" - "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" query "github.com/ipfs/go-datastore/query" "github.com/libp2p/go-libp2p-kad-dht/provider/internal/keyspace" @@ -20,6 +17,8 @@ import ( "github.com/probe-lab/go-libdht/kad/key/bitstr" ) +var ErrKeyStoreClosed = errors.New("keystore is closed") + // KeyStore provides thread-safe storage and retrieval of multihashes, indexed // by their kademlia 256-bit identifier. type KeyStore interface { @@ -29,16 +28,50 @@ type KeyStore interface { Delete(context.Context, ...mh.Multihash) error Empty(context.Context) error Size(context.Context) (int, error) + Close() error +} + +// operation types for the worker goroutine +type opType uint8 + +const ( + opPut opType = iota + opGet + opContainsPrefix + opDelete + opEmpty + opSize + lastOp +) + +// operation request sent to worker goroutine +type operation struct { + op opType + ctx context.Context + keys []mh.Multihash + prefix bitstr.Key + response chan<- operationResponse +} + +// response from worker goroutine +type operationResponse struct { + multihashes []mh.Multihash + found bool + size int + err error } // keyStore indexes multihashes by their kademlia identifier. type keyStore struct { - lk sync.Mutex - ds ds.Batching base ds.Key prefixBits int batchSize int + + // worker goroutine communication + requests chan operation + close chan struct{} + done chan struct{} } type keyStoreCfg struct { @@ -112,12 +145,21 @@ func NewKeyStore(d ds.Batching, opts ...KeyStoreOption) (KeyStore, error) { return nil, fmt.Errorf("KeyStore option %d failed: %w", i, err) } } - return &keyStore{ + + ks := &keyStore{ ds: d, base: ds.NewKey(cfg.base), prefixBits: cfg.prefixBits, batchSize: cfg.batchSize, - }, nil + requests: make(chan operation), + close: make(chan struct{}), + done: make(chan struct{}), + } + + // start worker goroutine + go ks.worker() + + return ks, nil } // dsKey returns the datastore key for the provided binary key. @@ -187,9 +229,50 @@ type pair struct { h mh.Multihash } -// putLocked stores the provided keys while assuming s.lk is already held, and +// worker processes operations sequentially in a single goroutine +func (s *keyStore) worker() { + defer close(s.done) + + for { + select { + case <-s.close: + return + case op := <-s.requests: + switch op.op { + case opPut: + newKeys, err := s.put(op.ctx, op.keys) + op.response <- operationResponse{multihashes: newKeys, err: err} + + case opGet: + keys, err := s.get(op.ctx, op.prefix) + op.response <- operationResponse{multihashes: keys, err: err} + + case opContainsPrefix: + found, err := s.containsPrefix(op.ctx, op.prefix) + op.response <- operationResponse{found: found, err: err} + + case opDelete: + err := s.delete(op.ctx, op.keys) + op.response <- operationResponse{err: err} + + case opEmpty: + err := empty(op.ctx, s.ds, s.base, s.batchSize) + op.response <- operationResponse{err: err} + + case opSize: + size, err := s.size(op.ctx) + op.response <- operationResponse{size: size, err: err} + + default: + op.response <- operationResponse{err: fmt.Errorf("unknown operation %d", op.op)} + } + } + } +} + +// put stores the provided keys while assuming s.lk is already held, and // returns the keys that weren't present already in the keystore. -func (s *keyStore) putLocked(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihash, error) { +func (s *keyStore) put(ctx context.Context, keys []mh.Multihash) ([]mh.Multihash, error) { seen := make(map[bit256.Key]struct{}, len(keys)) toPut := make([]pair, 0, len(keys)) @@ -234,25 +317,9 @@ func (s *keyStore) putLocked(ctx context.Context, keys ...mh.Multihash) ([]mh.Mu return newKeys, nil } -// Put stores the provided keys in the underlying datastore, grouping them by -// the first prefixLen bits. It returns only the keys that were not previously -// persisted in the datastore (i.e., newly added keys). -func (s *keyStore) Put(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihash, error) { - if len(keys) == 0 { - return nil, nil - } - s.lk.Lock() - defer s.lk.Unlock() - - return s.putLocked(ctx, keys...) -} - -// Get returns all keys whose bit256 representation matches the provided +// get returns all keys whose bit256 representation matches the provided // prefix. -func (s *keyStore) Get(ctx context.Context, prefix bitstr.Key) ([]mh.Multihash, error) { - s.lk.Lock() - defer s.lk.Unlock() - +func (s *keyStore) get(ctx context.Context, prefix bitstr.Key) ([]mh.Multihash, error) { dsk := dsKey(prefix, s.prefixBits, s.base).String() res, err := s.ds.Query(ctx, query.Query{Prefix: dsk}) if err != nil { @@ -282,13 +349,10 @@ func (s *keyStore) Get(ctx context.Context, prefix bitstr.Key) ([]mh.Multihash, return out, nil } -// ContainsPrefix reports whether the KeyStore currently holds at least one +// containsPrefix reports whether the KeyStore currently holds at least one // multihash whose kademlia identifier (bit256.Key) starts with the provided // bit-prefix. -func (s *keyStore) ContainsPrefix(ctx context.Context, prefix bitstr.Key) (bool, error) { - s.lk.Lock() - defer s.lk.Unlock() - +func (s *keyStore) containsPrefix(ctx context.Context, prefix bitstr.Key) (bool, error) { dsk := dsKey(prefix, s.prefixBits, s.base).String() longPrefix := prefix.BitLen() > s.prefixBits q := query.Query{Prefix: dsk, KeysOnly: true} @@ -320,9 +384,9 @@ func (s *keyStore) ContainsPrefix(ctx context.Context, prefix bitstr.Key) (bool, return false, nil } -// emptyLocked deletes all entries under the datastore prefix, assuming s.lk is +// empty deletes all entries under the datastore prefix, assuming s.lk is // already held. -func emptyLocked(ctx context.Context, d ds.Batching, base ds.Key, batchSize int) error { +func empty(ctx context.Context, d ds.Batching, base ds.Key, batchSize int) error { res, err := d.Query(ctx, query.Query{Prefix: base.String(), KeysOnly: true}) if err != nil { return err @@ -360,22 +424,8 @@ func emptyLocked(ctx context.Context, d ds.Batching, base ds.Key, batchSize int) return delBatch(keys) // delete remaining keys } -// Empty deletes all entries under the datastore prefix. -func (s *keyStore) Empty(ctx context.Context) error { - s.lk.Lock() - defer s.lk.Unlock() - - return emptyLocked(ctx, s.ds, s.base, s.batchSize) -} - -// Delete removes the given keys from datastore. -func (s *keyStore) Delete(ctx context.Context, keys ...mh.Multihash) error { - if len(keys) == 0 { - return nil - } - s.lk.Lock() - defer s.lk.Unlock() - +// delete removes the given keys from datastore. +func (s *keyStore) delete(ctx context.Context, keys []mh.Multihash) error { b, err := s.ds.Batch(ctx) if err != nil { return err @@ -390,14 +440,8 @@ func (s *keyStore) Delete(ctx context.Context, keys ...mh.Multihash) error { return b.Commit(ctx) } -// Size returns the number of keys currently stored in the KeyStore. -// -// The size is obtained by iterating over all keys in the underlying -// datastore, so it may be expensive for large stores. -func (s *keyStore) Size(ctx context.Context) (size int, err error) { - s.lk.Lock() - defer s.lk.Unlock() - +// size returns the number of keys currently stored in the KeyStore. +func (s *keyStore) size(ctx context.Context) (size int, err error) { q := query.Query{Prefix: s.base.String(), KeysOnly: true} res, err := s.ds.Query(ctx, q) if err != nil { @@ -415,157 +459,171 @@ func (s *keyStore) Size(ctx context.Context) (size int, err error) { return } -var ErrResetInProgress = errors.New("reset already in progress") - -const ( - ready uint32 = iota - inProgress - cleanup -) - -// ResettableKeyStore is a KeyStore implementation that supports atomic reset -// operations. It maintains two alternate bases in the underlying datastore and -// can swap between them to provide atomic replacement of all stored keys. -// -// The reset operation allows replacing all stored multihashes with a new set -// without interrupting concurrent read/write operations. During a reset, new -// writes are duplicated to both the current and alternate storage bases to -// maintain consistency. -type ResettableKeyStore struct { - keyStore - - altBase ds.Key - // resetInProgress tracks the state of the reset operation: - // - 0: no reset in progress, ready for new reset - // - 1: reset in progress, writing to alternate base - // - 2: post reset cleanup - resetState atomic.Uint32 -} - -var _ KeyStore = (*ResettableKeyStore)(nil) - -// NewResettableKeyStore creates a new ResettableKeyStore backed by the -// provided datastore. It automatically adds "/0" and "/1" suffixes to the -// configured base prefix to create two alternate storage locations for atomic -// reset operations. -func NewResettableKeyStore(d ds.Batching, opts ...KeyStoreOption) (*ResettableKeyStore, error) { - var cfg keyStoreCfg - opts = append([]KeyStoreOption{KeyStoreDefaultCfg}, opts...) - for i, o := range opts { - if err := o(&cfg); err != nil { - return nil, fmt.Errorf("KeyStore option %d failed: %w", i, err) - } +// Put stores the provided keys in the underlying datastore, grouping them by +// the first prefixLen bits. It returns only the keys that were not previously +// persisted in the datastore (i.e., newly added keys). +func (s *keyStore) Put(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihash, error) { + if len(keys) == 0 { + return nil, nil + } + response := make(chan operationResponse, 1) + select { + case s.requests <- operation{ + op: opPut, + ctx: ctx, + keys: keys, + response: response, + }: + case <-ctx.Done(): + return nil, ctx.Err() + case <-s.close: + return nil, ErrKeyStoreClosed + } + + select { + case resp := <-response: + return resp.multihashes, resp.err + case <-ctx.Done(): + return nil, ctx.Err() } - return &ResettableKeyStore{ - keyStore: keyStore{ - ds: d, - base: ds.NewKey(cfg.base).ChildString("0"), - prefixBits: cfg.prefixBits, - batchSize: cfg.batchSize, - }, - altBase: ds.NewKey(cfg.base).ChildString("1"), - }, nil } -func batchPut(ctx context.Context, d ds.Batching, base ds.Key, prefixBits int, keys []mh.Multihash) error { - b, err := d.Batch(ctx) - if err != nil { - return err - } - for _, h := range keys { - dsk := dsKey(keyspace.MhToBit256(h), prefixBits, base) - if err := b.Put(ctx, dsk, h); err != nil { - return err - } +// Get returns all keys whose bit256 representation matches the provided +// prefix. +func (s *keyStore) Get(ctx context.Context, prefix bitstr.Key) ([]mh.Multihash, error) { + response := make(chan operationResponse, 1) + select { + case s.requests <- operation{ + op: opGet, + ctx: ctx, + prefix: prefix, + response: response, + }: + case <-ctx.Done(): + return nil, ctx.Err() + case <-s.close: + return nil, ErrKeyStoreClosed + } + + select { + case resp := <-response: + return resp.multihashes, resp.err + case <-ctx.Done(): + return nil, ctx.Err() } - return b.Commit(ctx) } -// ResetCids atomically replaces all stored keys with the CIDs received from -// keysChan. The operation is thread-safe and non-blocking for concurrent reads -// and writes. -// -// During the reset: -// - New keys from keysChan are written to an alternate storage location -// - Concurrent Put operations are duplicated to both current and alternate -// locations -// - Once all keys are processed, storage locations are atomically swapped -// - The old storage location is cleaned up -// -// Returns ErrResetInProgress if another reset operation is already running. -// The operation can be cancelled via context, which will clean up partial -// state. -func (s *ResettableKeyStore) ResetCids(ctx context.Context, keysChan <-chan cid.Cid) error { - if !s.resetState.CompareAndSwap(ready, inProgress) { - return ErrResetInProgress +// ContainsPrefix reports whether the KeyStore currently holds at least one +// multihash whose kademlia identifier (bit256.Key) starts with the provided +// bit-prefix. +func (s *keyStore) ContainsPrefix(ctx context.Context, prefix bitstr.Key) (bool, error) { + response := make(chan operationResponse, 1) + select { + case s.requests <- operation{ + op: opContainsPrefix, + ctx: ctx, + prefix: prefix, + response: response, + }: + case <-ctx.Done(): + return false, ctx.Err() + case <-s.close: + return false, ErrKeyStoreClosed + } + + select { + case resp := <-response: + return resp.found, resp.err + case <-ctx.Done(): + return false, ctx.Err() } - defer s.resetState.Store(ready) +} - // Make sure the alternate base is empty - if err := emptyLocked(ctx, s.ds, s.altBase, s.batchSize); err != nil { - return fmt.Errorf("ResettableKeyStore empty failed during reset: %w", err) +// Empty deletes all entries under the datastore prefix. +func (s *keyStore) Empty(ctx context.Context) error { + response := make(chan operationResponse, 1) + select { + case s.requests <- operation{ + op: opEmpty, + ctx: ctx, + response: response, + }: + case <-ctx.Done(): + return ctx.Err() + case <-s.close: + return ErrKeyStoreClosed + } + + select { + case resp := <-response: + return resp.err + case <-ctx.Done(): + return ctx.Err() } +} - // Read all the keys from the channel and put them in batch at the alternate - // base - keys := make([]mh.Multihash, 0, s.batchSize) -loop: - for { - select { - case <-ctx.Done(): - // Context cancelled, abort reset and clean up alternate base - s.resetState.Store(cleanup) - if err := emptyLocked(ctx, s.ds, s.altBase, s.batchSize); err != nil { - return fmt.Errorf("ResettableKeyStore empty failed during reset: %w", err) - } - return nil - case c, ok := <-keysChan: - if !ok { - break loop - } - keys = append(keys, c.Hash()) - if len(keys) == cap(keys) { - if err := batchPut(ctx, s.ds, s.altBase, s.prefixBits, keys); err != nil { - return fmt.Errorf("ResettableKeyStore put failed during reset: %w", err) - } - keys = keys[:0] - } - } +// Delete removes the given keys from datastore. +func (s *keyStore) Delete(ctx context.Context, keys ...mh.Multihash) error { + if len(keys) == 0 { + return nil } - // Put final batch - if err := batchPut(ctx, s.ds, s.altBase, s.prefixBits, keys); err != nil { - return fmt.Errorf("ResettableKeyStore put failed during reset: %w", err) + response := make(chan operationResponse, 1) + select { + case s.requests <- operation{ + op: opDelete, + ctx: ctx, + keys: keys, + response: response, + }: + case <-ctx.Done(): + return ctx.Err() + case <-s.close: + return ErrKeyStoreClosed + } + + select { + case resp := <-response: + return resp.err + case <-ctx.Done(): + return ctx.Err() } +} - // Swap the bases. The base in use is now the one to which we just wrote. - s.lk.Lock() - oldBase := s.base - s.base = s.altBase - s.resetState.Store(cleanup) - s.lk.Unlock() - s.altBase = oldBase - - // Clean up the old base - if err := emptyLocked(ctx, s.ds, oldBase, s.batchSize); err != nil { - return fmt.Errorf("ResettableKeyStore empty failed during reset: %w", err) +// Size returns the number of keys currently stored in the KeyStore. +// +// The size is obtained by iterating over all keys in the underlying +// datastore, so it may be expensive for large stores. +func (s *keyStore) Size(ctx context.Context) (int, error) { + response := make(chan operationResponse, 1) + select { + case s.requests <- operation{ + op: opSize, + ctx: ctx, + response: response, + }: + case <-ctx.Done(): + return 0, ctx.Err() + case <-s.close: + return 0, ErrKeyStoreClosed + } + + select { + case resp := <-response: + return resp.size, resp.err + case <-ctx.Done(): + return 0, ctx.Err() } - - return nil } -// Put stores the provided keys in the underlying datastore, with special -// handling during reset operations. When a reset is in progress, keys are -// written to both the current storage location and the alternate location -// being prepared for the reset. -// -// This ensures that keys written during a reset operation remain available -// after the reset completes. Returns only the keys that were not previously -// persisted. -func (s *ResettableKeyStore) Put(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihash, error) { - if s.resetState.Load() == inProgress { - // Reset is in progress, write to alternate base in addition to current - // base. - batchPut(ctx, s.ds, s.altBase, s.prefixBits, keys) - } - return s.keyStore.Put(ctx, keys...) +// Close shuts down the worker goroutine and releases resources. +func (s *keyStore) Close() error { + select { + case <-s.close: + // Already closed + return nil + default: + close(s.close) + <-s.done + } + return nil } diff --git a/provider/datastore/keystore_test.go b/provider/datastore/keystore_test.go index b9f505ebe..66d787e0b 100644 --- a/provider/datastore/keystore_test.go +++ b/provider/datastore/keystore_test.go @@ -6,50 +6,55 @@ import ( "strings" "testing" - "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" - "github.com/ipfs/go-test/random" "github.com/libp2p/go-libp2p-kad-dht/provider/internal/keyspace" mh "github.com/multiformats/go-multihash" - "github.com/stretchr/testify/require" "github.com/probe-lab/go-libdht/kad/key" "github.com/probe-lab/go-libdht/kad/key/bit256" "github.com/probe-lab/go-libdht/kad/key/bitstr" + + "github.com/ipfs/go-test/random" + "github.com/stretchr/testify/require" ) -func TestKeyStoreStoreAndGet(t *testing.T) { - ds := ds.NewMapDatastore() - defer ds.Close() - store, err := NewKeyStore(ds) - if err != nil { - t.Fatal(err) - } +func TestKeyStorePutAndGet(t *testing.T) { + t.Run("KeyStore", func(t *testing.T) { + ds := ds.NewMapDatastore() + defer ds.Close() + store, err := NewKeyStore(ds) + require.NoError(t, err) + defer store.Close() + + testKeyStorePutAndGetImpl(t, store) + }) + + t.Run("ResettableKeyStore", func(t *testing.T) { + ds := ds.NewMapDatastore() + defer ds.Close() + store, err := NewResettableKeyStore(ds) + require.NoError(t, err) + defer store.Close() + testKeyStorePutAndGetImpl(t, store) + }) +} + +func testKeyStorePutAndGetImpl(t *testing.T, store KeyStore) { mhs := make([]mh.Multihash, 6) for i := range mhs { h, err := mh.Sum([]byte{byte(i)}, mh.SHA2_256, -1) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) mhs[i] = h } added, err := store.Put(context.Background(), mhs...) - if err != nil { - t.Fatal(err) - } - if len(added) != len(mhs) { - t.Fatalf("expected %d new hashes, got %d", len(mhs), len(added)) - } + require.NoError(t, err) + require.Len(t, added, len(mhs)) added, err = store.Put(context.Background(), mhs...) - if err != nil { - t.Fatal(err) - } - if len(added) != 0 { - t.Fatalf("expected no new hashes on second put, got %d", len(added)) - } + require.NoError(t, err) + require.Empty(t, added) for _, h := range mhs { prefix := bitstr.Key(key.BitString(keyspace.MhToBit256(h))[:6]) @@ -64,19 +69,13 @@ func TestKeyStoreStoreAndGet(t *testing.T) { break } } - if !found { - t.Fatalf("expected to find multihash %v for prefix %s", h, prefix) - } + require.True(t, found, "expected to find multihash %v for prefix %s", h, prefix) } p := bitstr.Key(key.BitString(keyspace.MhToBit256(mhs[0]))[:3]) res, err := store.Get(context.Background(), p) - if err != nil { - t.Fatal(err) - } - if len(res) == 0 { - t.Fatalf("expected results for prefix %s", p) - } + require.NoError(t, err) + require.NotEmpty(t, res, "expected results for prefix %s", p) longPrefix := bitstr.Key(key.BitString(keyspace.MhToBit256(mhs[0]))[:15]) res, err = store.Get(context.Background(), longPrefix) @@ -85,9 +84,7 @@ func TestKeyStoreStoreAndGet(t *testing.T) { } for _, h := range res { bs := bitstr.Key(key.BitString(keyspace.MhToBit256(h))) - if bs[:15] != longPrefix { - t.Fatalf("returned hash does not match long prefix") - } + require.True(t, keyspace.IsPrefix(longPrefix, bs), "returned hash does not match long prefix") } } @@ -104,11 +101,29 @@ func genMultihashesMatchingPrefix(prefix bitstr.Key, n int) []mh.Multihash { } func TestKeyStoreContainsPrefix(t *testing.T) { + t.Run("KeyStore", func(t *testing.T) { + ds := ds.NewMapDatastore() + defer ds.Close() + store, err := NewKeyStore(ds) + require.NoError(t, err) + defer store.Close() + + testKeyStoreContainsPrefixImpl(t, store) + }) + + t.Run("ResettableKeyStore", func(t *testing.T) { + ds := ds.NewMapDatastore() + defer ds.Close() + store, err := NewResettableKeyStore(ds) + require.NoError(t, err) + defer store.Close() + + testKeyStoreContainsPrefixImpl(t, store) + }) +} + +func testKeyStoreContainsPrefixImpl(t *testing.T, store KeyStore) { ctx := context.Background() - ds := ds.NewMapDatastore() - defer ds.Close() - store, err := NewKeyStore(ds) - require.NoError(t, err) ok, err := store.ContainsPrefix(ctx, bitstr.Key("0000")) require.NoError(t, err) @@ -147,145 +162,99 @@ func TestKeyStoreContainsPrefix(t *testing.T) { require.False(t, ok) } -func TestKeyStoreReset(t *testing.T) { - ds := ds.NewMapDatastore() - defer ds.Close() - - store, err := NewResettableKeyStore(ds) - if err != nil { - t.Fatal(err) - } - - first := make([]mh.Multihash, 2) - for i := range first { - h, err := mh.Sum([]byte{byte(i)}, mh.SHA2_256, -1) - if err != nil { - t.Fatal(err) - } - first[i] = h - } - if _, err := store.Put(context.Background(), first...); err != nil { - t.Fatal(err) - } +func TestKeyStoreDelete(t *testing.T) { + t.Run("KeyStore", func(t *testing.T) { + ds := ds.NewMapDatastore() + defer ds.Close() + store, err := NewKeyStore(ds) + require.NoError(t, err) + defer store.Close() - secondChan := make(chan cid.Cid, 2) - second := make([]mh.Multihash, 2) - for i := range 2 { - h, err := mh.Sum([]byte{byte(i + 10)}, mh.SHA2_256, -1) - if err != nil { - t.Fatal(err) - } - second[i] = h - secondChan <- cid.NewCidV1(cid.Raw, h) - } - close(secondChan) + testKeyStoreDeleteImpl(t, store) + }) - err = store.ResetCids(context.Background(), secondChan) - if err != nil { - t.Fatal(err) - } - - // old hashes should not be present - for _, h := range first { - prefix := bitstr.Key(key.BitString(keyspace.MhToBit256(h))[:6]) - got, err := store.Get(context.Background(), prefix) - if err != nil { - t.Fatal(err) - } - for _, m := range got { - if string(m) == string(h) { - t.Fatalf("expected old hash %v to be removed", h) - } - } - } + t.Run("ResettableKeyStore", func(t *testing.T) { + ds := ds.NewMapDatastore() + defer ds.Close() + store, err := NewResettableKeyStore(ds) + require.NoError(t, err) + defer store.Close() - // new hashes should be retrievable - for _, h := range second { - prefix := bitstr.Key(key.BitString(keyspace.MhToBit256(h))[:6]) - got, err := store.Get(context.Background(), prefix) - if err != nil { - t.Fatal(err) - } - found := false - for _, m := range got { - if string(m) == string(h) { - found = true - break - } - } - if !found { - t.Fatalf("expected hash %v after reset", h) - } - } + testKeyStoreDeleteImpl(t, store) + }) } -func TestKeyStoreDelete(t *testing.T) { - ds := ds.NewMapDatastore() - defer ds.Close() - - store, err := NewKeyStore(ds) - if err != nil { - t.Fatal(err) - } - +func testKeyStoreDeleteImpl(t *testing.T, store KeyStore) { mhs := random.Multihashes(3) for i := range mhs { h, err := mh.Sum([]byte{byte(i)}, mh.SHA2_256, -1) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) mhs[i] = h } - if _, err := store.Put(context.Background(), mhs...); err != nil { - t.Fatal(err) - } + _, err := store.Put(context.Background(), mhs...) + require.NoError(t, err) delPrefix := bitstr.Key(key.BitString(keyspace.MhToBit256(mhs[0]))[:6]) - if err := store.Delete(context.Background(), mhs[0]); err != nil { - t.Fatal(err) - } + err = store.Delete(context.Background(), mhs[0]) + require.NoError(t, err) res, err := store.Get(context.Background(), delPrefix) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) for _, h := range res { - if string(h) == string(mhs[0]) { - t.Fatalf("expected no hashes for prefix after delete") - } + require.NotEqual(t, string(h), string(mhs[0]), "expected deleted hash to be gone") } // other hashes should still be retrievable otherPrefix := bitstr.Key(key.BitString(keyspace.MhToBit256(mhs[1]))[:6]) res, err = store.Get(context.Background(), otherPrefix) - if err != nil { - t.Fatal(err) - } - if len(res) == 0 { - t.Fatalf("expected remaining hashes for other prefix") - } + require.NoError(t, err) + require.NotEmpty(t, res, "expected remaining hashes for other prefix") } func TestKeyStoreSize(t *testing.T) { + t.Run("KeyStore", func(t *testing.T) { + ds := ds.NewMapDatastore() + defer ds.Close() + store, err := NewKeyStore(ds) + require.NoError(t, err) + defer store.Close() + + testKeyStoreSizeImpl(t, store) + }) + + t.Run("ResettableKeyStore", func(t *testing.T) { + ds := ds.NewMapDatastore() + defer ds.Close() + store, err := NewResettableKeyStore(ds) + require.NoError(t, err) + defer store.Close() + + testKeyStoreSizeImpl(t, store) + }) +} + +func testKeyStoreSizeImpl(t *testing.T, store KeyStore) { ctx := context.Background() - ds := ds.NewMapDatastore() - defer ds.Close() - store, err := NewKeyStore(ds) - require.NoError(t, err) mhs0 := random.Multihashes(128) - store.Put(ctx, mhs0...) + _, err := store.Put(ctx, mhs0...) + require.NoError(t, err) size, err := store.Size(ctx) require.NoError(t, err) require.Equal(t, len(mhs0), size) - mhs1 := random.Multihashes(102400) - store.Put(ctx, mhs1...) + nKeys := 1 << 12 + batches := 1 << 6 + for range batches { + mhs1 := random.Multihashes(nKeys / batches) + _, err = store.Put(ctx, mhs1...) + require.NoError(t, err) + } size, err = store.Size(ctx) require.NoError(t, err) - require.Equal(t, len(mhs0)+len(mhs1), size) + require.Equal(t, len(mhs0)+nKeys, size) } func TestDsKey(t *testing.T) { diff --git a/provider/datastore/resettable_keystore.go b/provider/datastore/resettable_keystore.go new file mode 100644 index 000000000..c1226d732 --- /dev/null +++ b/provider/datastore/resettable_keystore.go @@ -0,0 +1,291 @@ +package datastore + +import ( + "context" + "errors" + "fmt" + + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + "github.com/libp2p/go-libp2p-kad-dht/provider/internal/keyspace" + mh "github.com/multiformats/go-multihash" +) + +var ErrResetInProgress = errors.New("reset already in progress") + +const ( + opStart opType = iota + lastOp + 1 + opCleanup + opAltPut +) + +type resetOp struct { + op opType + success bool + response chan<- error +} + +// ResettableKeyStore is a KeyStore implementation that supports atomic reset +// operations. It maintains two alternate bases in the underlying datastore and +// can swap between them to provide atomic replacement of all stored keys. +// +// The reset operation allows replacing all stored multihashes with a new set +// without interrupting concurrent read/write operations. During a reset, new +// writes are duplicated to both the current and alternate storage bases to +// maintain consistency. +type ResettableKeyStore struct { + keyStore + + altBase ds.Key + resetInProgress bool + resetSync chan []mh.Multihash // passes keys from worker to reset go routine + resetOps chan resetOp // reset operations that must be run in main go routine +} + +var _ KeyStore = (*ResettableKeyStore)(nil) + +// NewResettableKeyStore creates a new ResettableKeyStore backed by the +// provided datastore. It automatically adds "/0" and "/1" suffixes to the +// configured base prefix to create two alternate storage locations for atomic +// reset operations. +func NewResettableKeyStore(d ds.Batching, opts ...KeyStoreOption) (*ResettableKeyStore, error) { + var cfg keyStoreCfg + opts = append([]KeyStoreOption{KeyStoreDefaultCfg}, opts...) + for i, o := range opts { + if err := o(&cfg); err != nil { + return nil, fmt.Errorf("KeyStore option %d failed: %w", i, err) + } + } + + rks := &ResettableKeyStore{ + keyStore: keyStore{ + ds: d, + base: ds.NewKey(cfg.base).ChildString("0"), + prefixBits: cfg.prefixBits, + batchSize: cfg.batchSize, + requests: make(chan operation), + close: make(chan struct{}), + done: make(chan struct{}), + }, + altBase: ds.NewKey(cfg.base).ChildString("1"), + resetOps: make(chan resetOp), + resetSync: make(chan []mh.Multihash, 128), // buffered to avoid blocking + } + + // start worker goroutine + go rks.worker() + + return rks, nil +} + +// worker processes operations sequentially in a single goroutine for ResettableKeyStore +func (s *ResettableKeyStore) worker() { + defer close(s.done) + + for { + select { + case <-s.close: + return + case op := <-s.requests: + switch op.op { + case opPut: + newKeys, err := s.put(op.ctx, op.keys) + op.response <- operationResponse{multihashes: newKeys, err: err} + + case opGet: + keys, err := s.get(op.ctx, op.prefix) + op.response <- operationResponse{multihashes: keys, err: err} + + case opContainsPrefix: + found, err := s.containsPrefix(op.ctx, op.prefix) + op.response <- operationResponse{found: found, err: err} + + case opDelete: + err := s.delete(op.ctx, op.keys) + op.response <- operationResponse{err: err} + + case opEmpty: + err := empty(op.ctx, s.ds, s.base, s.batchSize) + op.response <- operationResponse{err: err} + + case opSize: + size, err := s.size(op.ctx) + op.response <- operationResponse{size: size, err: err} + + case opAltPut: + err := s.altPut(op.ctx, op.keys) + op.response <- operationResponse{err: err} + + } + case op := <-s.resetOps: + s.handleResetOp(op) + } + } +} + +// resettablePutLocked handles put operations for ResettableKeyStore, with special +// handling during reset operations. +func (s *ResettableKeyStore) put(ctx context.Context, keys []mh.Multihash) ([]mh.Multihash, error) { + if s.resetInProgress { + // Reset is in progress, write to alternate base in addition to current base + s.resetSync <- keys + } + return s.keyStore.put(ctx, keys) +} + +// altPut writes the given multihashes to the alternate base in the datastore. +func (s *ResettableKeyStore) altPut(ctx context.Context, keys []mh.Multihash) error { + b, err := s.ds.Batch(ctx) + if err != nil { + return err + } + for _, h := range keys { + dsk := dsKey(keyspace.MhToBit256(h), s.prefixBits, s.altBase) + if err := b.Put(ctx, dsk, h); err != nil { + return err + } + } + return b.Commit(ctx) +} + +// handleResetOp processes reset operations that need to happen synchronously. +func (s *ResettableKeyStore) handleResetOp(op resetOp) { + if op.op == opStart { + if s.resetInProgress { + op.response <- ErrResetInProgress + return + } + if err := empty(context.Background(), s.ds, s.altBase, s.batchSize); err != nil { + op.response <- err + return + } + s.resetInProgress = true + op.response <- nil + return + } + + // Cleanup operation + if op.success { + // Swap the keystore prefix bases. + oldBase := s.base + s.base = s.altBase + s.altBase = oldBase + } + // Drain resetSync +drain: + for { + select { + case <-s.resetSync: + default: + break drain + } + } + // Empty the unused base prefix + s.resetInProgress = false + op.response <- empty(context.Background(), s.ds, s.altBase, s.batchSize) +} + +// ResetCids atomically replaces all stored keys with the CIDs received from +// keysChan. The operation is thread-safe and non-blocking for concurrent reads +// and writes. +// +// During the reset: +// - New keys from keysChan are written to an alternate storage location +// - Concurrent Put operations are duplicated to both current and alternate +// locations +// - Once all keys are processed, storage locations are atomically swapped +// - The old storage location is cleaned up +// +// Returns ErrResetInProgress if another reset operation is already running. +// The operation can be cancelled via context, which will clean up partial +// state. +func (s *ResettableKeyStore) ResetCids(ctx context.Context, keysChan <-chan cid.Cid) error { + if keysChan == nil { + return nil + } + + opsChan := make(chan error) + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.done: + return ErrKeyStoreClosed + case s.resetOps <- resetOp{op: opStart, response: opsChan}: + select { + case err := <-opsChan: + if err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + } + } + + var success bool + + defer func() { + // Cleanup before returning on success and failure + select { + case s.resetOps <- resetOp{op: opCleanup, success: success, response: opsChan}: + <-opsChan + case <-s.done: + // Safe not to go through the worker since we are done, and we need to + // cleanup + empty(context.Background(), s.ds, s.altBase, s.batchSize) + } + }() + + rsp := make(chan operationResponse) + batchPut := func(ctx context.Context, keys []mh.Multihash) error { + select { + case <-s.done: + return ErrKeyStoreClosed + case <-ctx.Done(): + return ctx.Err() + case s.requests <- operation{op: opAltPut, ctx: ctx, keys: keys, response: rsp}: + return (<-rsp).err + } + } + + keys := make([]mh.Multihash, 0) + + processNewKeys := func(newKeys ...mh.Multihash) error { + keys = append(keys, newKeys...) + if len(keys) >= s.batchSize { + if err := batchPut(ctx, keys); err != nil { + return err + } + keys = keys[:0] + } + return nil + } + + // Read all the keys from the channel and put them in batch at the alternate base +loop: + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.done: + return ErrKeyStoreClosed + case mhs := <-s.resetSync: + if err := processNewKeys(mhs...); err != nil { + return err + } + case c, ok := <-keysChan: + if !ok { + break loop + } + if err := processNewKeys(c.Hash()); err != nil { + return err + } + } + } + // Put final batch + if err := batchPut(ctx, keys); err != nil { + return err + } + success = true + + return nil +} diff --git a/provider/datastore/resettable_keystore_test.go b/provider/datastore/resettable_keystore_test.go new file mode 100644 index 000000000..4a1f4a9f8 --- /dev/null +++ b/provider/datastore/resettable_keystore_test.go @@ -0,0 +1,72 @@ +package datastore + +import ( + "context" + "testing" + + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + "github.com/libp2p/go-libp2p-kad-dht/provider/internal/keyspace" + mh "github.com/multiformats/go-multihash" + + "github.com/probe-lab/go-libdht/kad/key" + "github.com/probe-lab/go-libdht/kad/key/bitstr" + + "github.com/stretchr/testify/require" +) + +func TestKeyStoreReset(t *testing.T) { + ds := ds.NewMapDatastore() + defer ds.Close() + + store, err := NewResettableKeyStore(ds) + require.NoError(t, err) + defer store.Close() + + first := make([]mh.Multihash, 2) + for i := range first { + h, err := mh.Sum([]byte{byte(i)}, mh.SHA2_256, -1) + require.NoError(t, err) + first[i] = h + } + _, err = store.Put(context.Background(), first...) + require.NoError(t, err) + + secondChan := make(chan cid.Cid, 2) + second := make([]mh.Multihash, 2) + for i := range 2 { + h, err := mh.Sum([]byte{byte(i + 10)}, mh.SHA2_256, -1) + require.NoError(t, err) + second[i] = h + secondChan <- cid.NewCidV1(cid.Raw, h) + } + close(secondChan) + + err = store.ResetCids(context.Background(), secondChan) + require.NoError(t, err) + + // old hashes should not be present + for _, h := range first { + prefix := bitstr.Key(key.BitString(keyspace.MhToBit256(h))[:6]) + got, err := store.Get(context.Background(), prefix) + require.NoError(t, err) + for _, m := range got { + require.NotEqual(t, string(m), string(h), "expected old hash %v to be removed", h) + } + } + + // new hashes should be retrievable + for _, h := range second { + prefix := bitstr.Key(key.BitString(keyspace.MhToBit256(h))[:6]) + got, err := store.Get(context.Background(), prefix) + require.NoError(t, err) + found := false + for _, m := range got { + if string(m) == string(h) { + found = true + break + } + } + require.True(t, found, "expected hash %v after reset", h) + } +} diff --git a/provider/provider.go b/provider/provider.go index 0f5ff4641..c26247d7c 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -174,6 +174,7 @@ func New(opts ...Option) (*SweepingProvider, error) { return nil, err } } + cleanupFuncs = append(cleanupFuncs, cfg.keyStore.Close) if err := cfg.validate(); err != nil { cleanup(cleanupFuncs) return nil, err diff --git a/provider/provider_test.go b/provider/provider_test.go index f9a906aed..01e12d29b 100644 --- a/provider/provider_test.go +++ b/provider/provider_test.go @@ -1248,6 +1248,7 @@ func TestRefreshSchedule(t *testing.T) { require.NoError(t, err) prov := SweepingProvider{ + ctx: ctx, keyStore: keyStore, reprovideInterval: time.Hour, From 50883867df825e310bb03a1dd326b2caface58eb Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Thu, 11 Sep 2025 10:42:31 +0200 Subject: [PATCH 03/12] use datastore namespace --- provider/datastore/keystore.go | 28 +++++----- provider/datastore/keystore_test.go | 9 ++- provider/datastore/resettable_keystore.go | 67 +++++++++++++++-------- 3 files changed, 60 insertions(+), 44 deletions(-) diff --git a/provider/datastore/keystore.go b/provider/datastore/keystore.go index 46b4b2799..5399f8f11 100644 --- a/provider/datastore/keystore.go +++ b/provider/datastore/keystore.go @@ -8,7 +8,8 @@ import ( "strings" ds "github.com/ipfs/go-datastore" - query "github.com/ipfs/go-datastore/query" + "github.com/ipfs/go-datastore/namespace" + "github.com/ipfs/go-datastore/query" "github.com/libp2p/go-libp2p-kad-dht/provider/internal/keyspace" mh "github.com/multiformats/go-multihash" @@ -64,7 +65,6 @@ type operationResponse struct { // keyStore indexes multihashes by their kademlia identifier. type keyStore struct { ds ds.Batching - base ds.Key prefixBits int batchSize int @@ -147,8 +147,7 @@ func NewKeyStore(d ds.Batching, opts ...KeyStoreOption) (KeyStore, error) { } ks := &keyStore{ - ds: d, - base: ds.NewKey(cfg.base), + ds: namespace.Wrap(d, ds.NewKey(cfg.base)), prefixBits: cfg.prefixBits, batchSize: cfg.batchSize, requests: make(chan operation), @@ -183,7 +182,7 @@ func NewKeyStore(d ds.Batching, opts ...KeyStoreOption) (KeyStore, error) { // If the prefix is longer than `prefixBits`, only the first `prefixBits` bits // are used, allowing the returned key to serve as a query prefix for the // datastore. -func dsKey[K kad.Key[K]](k K, prefixBits int, base ds.Key) ds.Key { +func dsKey[K kad.Key[K]](k K, prefixBits int) ds.Key { b := strings.Builder{} l := k.BitLen() for i := range min(prefixBits, l) { @@ -193,7 +192,7 @@ func dsKey[K kad.Key[K]](k K, prefixBits int, base ds.Key) ds.Key { if l == keyspace.KeyLen { b.WriteString(base64.URLEncoding.EncodeToString(keyspace.KeyToBytes(k)[prefixBits/8:])) } - return base.ChildString(b.String()) + return ds.NewKey(b.String()) } // decodeKey reconstructs a 256-bit binary key from a hierarchical datastore key string. @@ -207,7 +206,6 @@ func dsKey[K kad.Key[K]](k K, prefixBits int, base ds.Key) ds.Key { // // Returns the reconstructed 256-bit key or an error if base64URL decoding fails. func (s *keyStore) decodeKey(dsk string) (bit256.Key, error) { - dsk = dsk[len(s.base.String()):] // remove leading prefix bs := make([]byte, 32) // Extract individual bits from odd positions (skip '/' separators) for i := range s.prefixBits { @@ -256,7 +254,7 @@ func (s *keyStore) worker() { op.response <- operationResponse{err: err} case opEmpty: - err := empty(op.ctx, s.ds, s.base, s.batchSize) + err := empty(op.ctx, s.ds, s.batchSize) op.response <- operationResponse{err: err} case opSize: @@ -282,7 +280,7 @@ func (s *keyStore) put(ctx context.Context, keys []mh.Multihash) ([]mh.Multihash continue } seen[k] = struct{}{} - dsk := dsKey(k, s.prefixBits, s.base) + dsk := dsKey(k, s.prefixBits) ok, err := s.ds.Has(ctx, dsk) if err != nil { return nil, err @@ -320,7 +318,7 @@ func (s *keyStore) put(ctx context.Context, keys []mh.Multihash) ([]mh.Multihash // get returns all keys whose bit256 representation matches the provided // prefix. func (s *keyStore) get(ctx context.Context, prefix bitstr.Key) ([]mh.Multihash, error) { - dsk := dsKey(prefix, s.prefixBits, s.base).String() + dsk := dsKey(prefix, s.prefixBits).String() res, err := s.ds.Query(ctx, query.Query{Prefix: dsk}) if err != nil { return nil, err @@ -353,7 +351,7 @@ func (s *keyStore) get(ctx context.Context, prefix bitstr.Key) ([]mh.Multihash, // multihash whose kademlia identifier (bit256.Key) starts with the provided // bit-prefix. func (s *keyStore) containsPrefix(ctx context.Context, prefix bitstr.Key) (bool, error) { - dsk := dsKey(prefix, s.prefixBits, s.base).String() + dsk := dsKey(prefix, s.prefixBits).String() longPrefix := prefix.BitLen() > s.prefixBits q := query.Query{Prefix: dsk, KeysOnly: true} if !longPrefix { @@ -386,8 +384,8 @@ func (s *keyStore) containsPrefix(ctx context.Context, prefix bitstr.Key) (bool, // empty deletes all entries under the datastore prefix, assuming s.lk is // already held. -func empty(ctx context.Context, d ds.Batching, base ds.Key, batchSize int) error { - res, err := d.Query(ctx, query.Query{Prefix: base.String(), KeysOnly: true}) +func empty(ctx context.Context, d ds.Batching, batchSize int) error { + res, err := d.Query(ctx, query.Query{KeysOnly: true}) if err != nil { return err } @@ -431,7 +429,7 @@ func (s *keyStore) delete(ctx context.Context, keys []mh.Multihash) error { return err } for _, h := range keys { - dsk := dsKey(keyspace.MhToBit256(h), s.prefixBits, s.base) + dsk := dsKey(keyspace.MhToBit256(h), s.prefixBits) err := b.Delete(ctx, dsk) if err != nil { return err @@ -442,7 +440,7 @@ func (s *keyStore) delete(ctx context.Context, keys []mh.Multihash) error { // size returns the number of keys currently stored in the KeyStore. func (s *keyStore) size(ctx context.Context) (size int, err error) { - q := query.Query{Prefix: s.base.String(), KeysOnly: true} + q := query.Query{KeysOnly: true} res, err := s.ds.Query(ctx, q) if err != nil { return diff --git a/provider/datastore/keystore_test.go b/provider/datastore/keystore_test.go index 66d787e0b..8a4d64906 100644 --- a/provider/datastore/keystore_test.go +++ b/provider/datastore/keystore_test.go @@ -259,13 +259,12 @@ func testKeyStoreSizeImpl(t *testing.T, store KeyStore) { func TestDsKey(t *testing.T) { s := keyStore{ - base: ds.NewKey("/base/prefix"), prefixBits: 8, } k := bit256.ZeroKey() - dsk := dsKey(k, s.prefixBits, s.base) - expectedPrefix := "/base/prefix/0/0/0/0/0/0/0/0/" + dsk := dsKey(k, s.prefixBits) + expectedPrefix := "/0/0/0/0/0/0/0/0/" require.Equal(t, expectedPrefix, dsk.String()[:len(expectedPrefix)]) s.prefixBits = 16 @@ -276,8 +275,8 @@ func TestDsKey(t *testing.T) { require.NoError(t, err) k := bit256.NewKey(b[:]) - sdk := dsKey(k, s.prefixBits, s.base) - require.Equal(t, strings.Count(s.base.String(), "/")+s.prefixBits+1, strings.Count(sdk.String(), "/")) + sdk := dsKey(k, s.prefixBits) + require.Equal(t, s.prefixBits+1, strings.Count(sdk.String(), "/")) decoded, err := s.decodeKey(sdk.String()) require.NoError(t, err) require.Equal(t, k, decoded) diff --git a/provider/datastore/resettable_keystore.go b/provider/datastore/resettable_keystore.go index c1226d732..8cbf2848e 100644 --- a/provider/datastore/resettable_keystore.go +++ b/provider/datastore/resettable_keystore.go @@ -7,6 +7,7 @@ import ( "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" "github.com/libp2p/go-libp2p-kad-dht/provider/internal/keyspace" mh "github.com/multiformats/go-multihash" ) @@ -26,17 +27,35 @@ type resetOp struct { } // ResettableKeyStore is a KeyStore implementation that supports atomic reset -// operations. It maintains two alternate bases in the underlying datastore and -// can swap between them to provide atomic replacement of all stored keys. +// operations using a dual-datastore architecture. It maintains two separate +// datastores (primary and alternate) where only one is active at any time, +// enabling atomic replacement of all stored keys without interrupting +// concurrent operations. // -// The reset operation allows replacing all stored multihashes with a new set -// without interrupting concurrent read/write operations. During a reset, new -// writes are duplicated to both the current and alternate storage bases to -// maintain consistency. +// Architecture: +// - Primary datastore: Currently active storage for all read/write operations +// - Alternate datastore: Standby storage used during reset operations +// - The datastores use "/0" and "/1" namespace suffixes and can be swapped +// +// Reset Operation Flow: +// 1. New keys from reset are written to the alternate (inactive) datastore +// 2. Concurrent Put operations are automatically duplicated to both datastores +// to maintain consistency during the transition +// 3. Once all reset keys are written, the datastores are atomically swapped +// 4. The old datastore (now alternate) is cleaned up +// +// Thread Safety: +// - All operations are processed sequentially by a single worker goroutine +// - Reset operations are non-blocking for concurrent reads and writes +// - Only one reset operation can be active at a time +// +// The reset operation allows complete replacement of stored multihashes +// without data loss or service interruption, making it suitable for +// scenarios requiring periodic full dataset updates. type ResettableKeyStore struct { keyStore - altBase ds.Key + altDs ds.Batching resetInProgress bool resetSync chan []mh.Multihash // passes keys from worker to reset go routine resetOps chan resetOp // reset operations that must be run in main go routine @@ -59,15 +78,14 @@ func NewResettableKeyStore(d ds.Batching, opts ...KeyStoreOption) (*ResettableKe rks := &ResettableKeyStore{ keyStore: keyStore{ - ds: d, - base: ds.NewKey(cfg.base).ChildString("0"), + ds: namespace.Wrap(d, ds.NewKey(cfg.base+"/0")), prefixBits: cfg.prefixBits, batchSize: cfg.batchSize, requests: make(chan operation), close: make(chan struct{}), done: make(chan struct{}), }, - altBase: ds.NewKey(cfg.base).ChildString("1"), + altDs: namespace.Wrap(d, ds.NewKey(cfg.base+"/1")), resetOps: make(chan resetOp), resetSync: make(chan []mh.Multihash, 128), // buffered to avoid blocking } @@ -105,7 +123,7 @@ func (s *ResettableKeyStore) worker() { op.response <- operationResponse{err: err} case opEmpty: - err := empty(op.ctx, s.ds, s.base, s.batchSize) + err := empty(op.ctx, s.ds, s.batchSize) op.response <- operationResponse{err: err} case opSize: @@ -127,20 +145,21 @@ func (s *ResettableKeyStore) worker() { // handling during reset operations. func (s *ResettableKeyStore) put(ctx context.Context, keys []mh.Multihash) ([]mh.Multihash, error) { if s.resetInProgress { - // Reset is in progress, write to alternate base in addition to current base + // Reset is in progress, write to alternate datastore in addition to + // current datastore s.resetSync <- keys } return s.keyStore.put(ctx, keys) } -// altPut writes the given multihashes to the alternate base in the datastore. +// altPut writes the given multihashes to the alternate datastore. func (s *ResettableKeyStore) altPut(ctx context.Context, keys []mh.Multihash) error { - b, err := s.ds.Batch(ctx) + b, err := s.altDs.Batch(ctx) if err != nil { return err } for _, h := range keys { - dsk := dsKey(keyspace.MhToBit256(h), s.prefixBits, s.altBase) + dsk := dsKey(keyspace.MhToBit256(h), s.prefixBits) if err := b.Put(ctx, dsk, h); err != nil { return err } @@ -155,7 +174,7 @@ func (s *ResettableKeyStore) handleResetOp(op resetOp) { op.response <- ErrResetInProgress return } - if err := empty(context.Background(), s.ds, s.altBase, s.batchSize); err != nil { + if err := empty(context.Background(), s.altDs, s.batchSize); err != nil { op.response <- err return } @@ -166,10 +185,10 @@ func (s *ResettableKeyStore) handleResetOp(op resetOp) { // Cleanup operation if op.success { - // Swap the keystore prefix bases. - oldBase := s.base - s.base = s.altBase - s.altBase = oldBase + // Swap the active datastore. + oldDs := s.ds + s.ds = s.altDs + s.altDs = oldDs } // Drain resetSync drain: @@ -180,9 +199,9 @@ drain: break drain } } - // Empty the unused base prefix + // Empty the unused datastore. s.resetInProgress = false - op.response <- empty(context.Background(), s.ds, s.altBase, s.batchSize) + op.response <- empty(context.Background(), s.altDs, s.batchSize) } // ResetCids atomically replaces all stored keys with the CIDs received from @@ -231,7 +250,7 @@ func (s *ResettableKeyStore) ResetCids(ctx context.Context, keysChan <-chan cid. case <-s.done: // Safe not to go through the worker since we are done, and we need to // cleanup - empty(context.Background(), s.ds, s.altBase, s.batchSize) + empty(context.Background(), s.altDs, s.batchSize) } }() @@ -260,7 +279,7 @@ func (s *ResettableKeyStore) ResetCids(ctx context.Context, keysChan <-chan cid. return nil } - // Read all the keys from the channel and put them in batch at the alternate base + // Read all the keys from the channel and write them to the altDs loop: for { select { From d154d71e8f5457cdf7743bd1d2f7a7c3e018b61b Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Thu, 11 Sep 2025 18:14:57 +0200 Subject: [PATCH 04/12] don't sync to write to altDs --- provider/datastore/resettable_keystore.go | 64 +++++------------------ 1 file changed, 12 insertions(+), 52 deletions(-) diff --git a/provider/datastore/resettable_keystore.go b/provider/datastore/resettable_keystore.go index 8cbf2848e..aec69ff42 100644 --- a/provider/datastore/resettable_keystore.go +++ b/provider/datastore/resettable_keystore.go @@ -15,9 +15,8 @@ import ( var ErrResetInProgress = errors.New("reset already in progress") const ( - opStart opType = iota + lastOp + 1 + opStart opType = iota opCleanup - opAltPut ) type resetOp struct { @@ -57,8 +56,7 @@ type ResettableKeyStore struct { altDs ds.Batching resetInProgress bool - resetSync chan []mh.Multihash // passes keys from worker to reset go routine - resetOps chan resetOp // reset operations that must be run in main go routine + resetOps chan resetOp // reset operations that must be run in main go routine } var _ KeyStore = (*ResettableKeyStore)(nil) @@ -85,9 +83,8 @@ func NewResettableKeyStore(d ds.Batching, opts ...KeyStoreOption) (*ResettableKe close: make(chan struct{}), done: make(chan struct{}), }, - altDs: namespace.Wrap(d, ds.NewKey(cfg.base+"/1")), - resetOps: make(chan resetOp), - resetSync: make(chan []mh.Multihash, 128), // buffered to avoid blocking + altDs: namespace.Wrap(d, ds.NewKey(cfg.base+"/1")), + resetOps: make(chan resetOp), } // start worker goroutine @@ -129,11 +126,6 @@ func (s *ResettableKeyStore) worker() { case opSize: size, err := s.size(op.ctx) op.response <- operationResponse{size: size, err: err} - - case opAltPut: - err := s.altPut(op.ctx, op.keys) - op.response <- operationResponse{err: err} - } case op := <-s.resetOps: s.handleResetOp(op) @@ -147,7 +139,7 @@ func (s *ResettableKeyStore) put(ctx context.Context, keys []mh.Multihash) ([]mh if s.resetInProgress { // Reset is in progress, write to alternate datastore in addition to // current datastore - s.resetSync <- keys + s.altPut(ctx, keys) } return s.keyStore.put(ctx, keys) } @@ -190,15 +182,6 @@ func (s *ResettableKeyStore) handleResetOp(op resetOp) { s.ds = s.altDs s.altDs = oldDs } - // Drain resetSync -drain: - for { - select { - case <-s.resetSync: - default: - break drain - } - } // Empty the unused datastore. s.resetInProgress = false op.response <- empty(context.Background(), s.altDs, s.batchSize) @@ -254,31 +237,8 @@ func (s *ResettableKeyStore) ResetCids(ctx context.Context, keysChan <-chan cid. } }() - rsp := make(chan operationResponse) - batchPut := func(ctx context.Context, keys []mh.Multihash) error { - select { - case <-s.done: - return ErrKeyStoreClosed - case <-ctx.Done(): - return ctx.Err() - case s.requests <- operation{op: opAltPut, ctx: ctx, keys: keys, response: rsp}: - return (<-rsp).err - } - } - keys := make([]mh.Multihash, 0) - processNewKeys := func(newKeys ...mh.Multihash) error { - keys = append(keys, newKeys...) - if len(keys) >= s.batchSize { - if err := batchPut(ctx, keys); err != nil { - return err - } - keys = keys[:0] - } - return nil - } - // Read all the keys from the channel and write them to the altDs loop: for { @@ -287,21 +247,21 @@ loop: return ctx.Err() case <-s.done: return ErrKeyStoreClosed - case mhs := <-s.resetSync: - if err := processNewKeys(mhs...); err != nil { - return err - } case c, ok := <-keysChan: if !ok { break loop } - if err := processNewKeys(c.Hash()); err != nil { - return err + keys = append(keys, c.Hash()) + if len(keys) >= s.batchSize { + if err := s.altPut(ctx, keys); err != nil { + return err + } + keys = keys[:0] } } } // Put final batch - if err := batchPut(ctx, keys); err != nil { + if err := s.altPut(ctx, keys); err != nil { return err } success = true From 38a707a3a88d1b94b9cd1764561d82e84c19faec Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Fri, 12 Sep 2025 23:35:28 +0200 Subject: [PATCH 05/12] simplify put --- provider/datastore/keystore.go | 36 +++++++++------------------------- 1 file changed, 9 insertions(+), 27 deletions(-) diff --git a/provider/datastore/keystore.go b/provider/datastore/keystore.go index 5399f8f11..db4048b96 100644 --- a/provider/datastore/keystore.go +++ b/provider/datastore/keystore.go @@ -222,11 +222,6 @@ func (s *keyStore) decodeKey(dsk string) (bit256.Key, error) { return bit256.NewKey(bs), nil } -type pair struct { - k ds.Key - h mh.Multihash -} - // worker processes operations sequentially in a single goroutine func (s *keyStore) worker() { defer close(s.done) @@ -272,7 +267,11 @@ func (s *keyStore) worker() { // returns the keys that weren't present already in the keystore. func (s *keyStore) put(ctx context.Context, keys []mh.Multihash) ([]mh.Multihash, error) { seen := make(map[bit256.Key]struct{}, len(keys)) - toPut := make([]pair, 0, len(keys)) + b, err := s.ds.Batch(ctx) + if err != nil { + return nil, err + } + newKeys := make([]mh.Multihash, 0, len(keys)) for _, h := range keys { k := keyspace.MhToBit256(h) @@ -286,32 +285,15 @@ func (s *keyStore) put(ctx context.Context, keys []mh.Multihash) ([]mh.Multihash return nil, err } if !ok { - toPut = append(toPut, pair{k: dsk, h: h}) - } - } - clear(seen) - if len(toPut) == 0 { - // Nothing to do - return nil, nil - } - - b, err := s.ds.Batch(ctx) - if err != nil { - return nil, err - } - for _, p := range toPut { - if err := b.Put(ctx, p.k, p.h); err != nil { - return nil, err + if err := b.Put(ctx, dsk, h); err != nil { + return nil, err + } + newKeys = append(newKeys, h) } } if err := b.Commit(ctx); err != nil { return nil, err } - - newKeys := make([]mh.Multihash, len(toPut)) - for i, p := range toPut { - newKeys[i] = p.h - } return newKeys, nil } From 2f182c7fca1df7ef71f1391503cf49a75f0d1684 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Fri, 12 Sep 2025 23:57:59 +0200 Subject: [PATCH 06/12] deduplicate operation execution code --- provider/datastore/keystore.go | 141 ++++++++------------------------- 1 file changed, 31 insertions(+), 110 deletions(-) diff --git a/provider/datastore/keystore.go b/provider/datastore/keystore.go index db4048b96..d737f6c75 100644 --- a/provider/datastore/keystore.go +++ b/provider/datastore/keystore.go @@ -439,107 +439,63 @@ func (s *keyStore) size(ctx context.Context) (size int, err error) { return } -// Put stores the provided keys in the underlying datastore, grouping them by -// the first prefixLen bits. It returns only the keys that were not previously -// persisted in the datastore (i.e., newly added keys). -func (s *keyStore) Put(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihash, error) { - if len(keys) == 0 { - return nil, nil - } +// executeOperation sends an operation request to the worker goroutine and +// waits for the response. It handles the communication protocol and returns +// the results based on the operation type. +func (s *keyStore) executeOperation(op opType, ctx context.Context, keys []mh.Multihash, prefix bitstr.Key) ([]mh.Multihash, int, bool, error) { response := make(chan operationResponse, 1) select { case s.requests <- operation{ - op: opPut, + op: op, ctx: ctx, keys: keys, + prefix: prefix, response: response, }: case <-ctx.Done(): - return nil, ctx.Err() + return nil, 0, false, ctx.Err() case <-s.close: - return nil, ErrKeyStoreClosed + return nil, 0, false, ErrKeyStoreClosed } select { case resp := <-response: - return resp.multihashes, resp.err + return resp.multihashes, resp.size, resp.found, resp.err case <-ctx.Done(): - return nil, ctx.Err() + return nil, 0, false, ctx.Err() } } +// Put stores the provided keys in the underlying datastore, grouping them by +// the first prefixLen bits. It returns only the keys that were not previously +// persisted in the datastore (i.e., newly added keys). +func (s *keyStore) Put(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihash, error) { + if len(keys) == 0 { + return nil, nil + } + newKeys, _, _, err := s.executeOperation(opPut, ctx, keys, "") + return newKeys, err +} + // Get returns all keys whose bit256 representation matches the provided // prefix. func (s *keyStore) Get(ctx context.Context, prefix bitstr.Key) ([]mh.Multihash, error) { - response := make(chan operationResponse, 1) - select { - case s.requests <- operation{ - op: opGet, - ctx: ctx, - prefix: prefix, - response: response, - }: - case <-ctx.Done(): - return nil, ctx.Err() - case <-s.close: - return nil, ErrKeyStoreClosed - } - - select { - case resp := <-response: - return resp.multihashes, resp.err - case <-ctx.Done(): - return nil, ctx.Err() - } + keys, _, _, err := s.executeOperation(opGet, ctx, nil, prefix) + return keys, err } // ContainsPrefix reports whether the KeyStore currently holds at least one // multihash whose kademlia identifier (bit256.Key) starts with the provided // bit-prefix. func (s *keyStore) ContainsPrefix(ctx context.Context, prefix bitstr.Key) (bool, error) { - response := make(chan operationResponse, 1) - select { - case s.requests <- operation{ - op: opContainsPrefix, - ctx: ctx, - prefix: prefix, - response: response, - }: - case <-ctx.Done(): - return false, ctx.Err() - case <-s.close: - return false, ErrKeyStoreClosed - } - - select { - case resp := <-response: - return resp.found, resp.err - case <-ctx.Done(): - return false, ctx.Err() - } + _, _, found, err := s.executeOperation(opContainsPrefix, ctx, nil, prefix) + return found, err } // Empty deletes all entries under the datastore prefix. func (s *keyStore) Empty(ctx context.Context) error { - response := make(chan operationResponse, 1) - select { - case s.requests <- operation{ - op: opEmpty, - ctx: ctx, - response: response, - }: - case <-ctx.Done(): - return ctx.Err() - case <-s.close: - return ErrKeyStoreClosed - } - - select { - case resp := <-response: - return resp.err - case <-ctx.Done(): - return ctx.Err() - } + _, _, _, err := s.executeOperation(opEmpty, ctx, nil, "") + return err } // Delete removes the given keys from datastore. @@ -547,26 +503,8 @@ func (s *keyStore) Delete(ctx context.Context, keys ...mh.Multihash) error { if len(keys) == 0 { return nil } - response := make(chan operationResponse, 1) - select { - case s.requests <- operation{ - op: opDelete, - ctx: ctx, - keys: keys, - response: response, - }: - case <-ctx.Done(): - return ctx.Err() - case <-s.close: - return ErrKeyStoreClosed - } - - select { - case resp := <-response: - return resp.err - case <-ctx.Done(): - return ctx.Err() - } + _, _, _, err := s.executeOperation(opDelete, ctx, keys, "") + return err } // Size returns the number of keys currently stored in the KeyStore. @@ -574,25 +512,8 @@ func (s *keyStore) Delete(ctx context.Context, keys ...mh.Multihash) error { // The size is obtained by iterating over all keys in the underlying // datastore, so it may be expensive for large stores. func (s *keyStore) Size(ctx context.Context) (int, error) { - response := make(chan operationResponse, 1) - select { - case s.requests <- operation{ - op: opSize, - ctx: ctx, - response: response, - }: - case <-ctx.Done(): - return 0, ctx.Err() - case <-s.close: - return 0, ErrKeyStoreClosed - } - - select { - case resp := <-response: - return resp.size, resp.err - case <-ctx.Done(): - return 0, ctx.Err() - } + _, size, _, err := s.executeOperation(opSize, ctx, nil, "") + return size, err } // Close shuts down the worker goroutine and releases resources. From 54e8d3c3b16d7c51b16c3e11224f647ac32890cc Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Sun, 14 Sep 2025 16:24:51 +0200 Subject: [PATCH 07/12] buffered provider --- go.mod | 10 +- go.sum | 16 +- provider/buffered/options.go | 65 +++++ provider/buffered/provider.go | 244 +++++++++++++++++++ {dual/provider => provider/dual}/options.go | 2 +- {dual/provider => provider/dual}/provider.go | 27 +- provider/internal/interface.go | 14 ++ provider/provider.go | 3 + 8 files changed, 354 insertions(+), 27 deletions(-) create mode 100644 provider/buffered/options.go create mode 100644 provider/buffered/provider.go rename {dual/provider => provider/dual}/options.go (99%) rename {dual/provider => provider/dual}/provider.go (93%) create mode 100644 provider/internal/interface.go diff --git a/go.mod b/go.mod index 8bcd06d87..e10d75e7a 100644 --- a/go.mod +++ b/go.mod @@ -1,18 +1,19 @@ module github.com/libp2p/go-libp2p-kad-dht -go 1.24 +go 1.24.0 require ( - github.com/gammazero/deque v1.0.0 + github.com/gammazero/deque v1.1.0 github.com/google/gopacket v1.1.19 github.com/google/uuid v1.6.0 github.com/guillaumemichel/reservedpool v0.2.0 github.com/hashicorp/golang-lru v1.0.2 github.com/ipfs/boxo v0.33.1 github.com/ipfs/go-cid v0.5.0 - github.com/ipfs/go-datastore v0.8.2 + github.com/ipfs/go-datastore v0.8.3 github.com/ipfs/go-detect-race v0.0.1 - github.com/ipfs/go-log/v2 v2.8.0 + github.com/ipfs/go-dsqueue v0.0.4 + github.com/ipfs/go-log/v2 v2.8.1 github.com/ipfs/go-test v0.2.3 github.com/libp2p/go-libp2p v0.43.0 github.com/libp2p/go-libp2p-kbucket v0.8.0 @@ -53,6 +54,7 @@ require ( github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gorilla/websocket v1.5.3 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/huin/goupnp v1.3.0 // indirect github.com/ipfs/go-block-format v0.2.2 // indirect github.com/ipld/go-ipld-prime v0.21.0 // indirect diff --git a/go.sum b/go.sum index 6f0e3007d..351d71df3 100644 --- a/go.sum +++ b/go.sum @@ -63,8 +63,8 @@ github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiD github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34= -github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo= +github.com/gammazero/deque v1.1.0 h1:OyiyReBbnEG2PP0Bnv1AASLIYvyKqIFN5xfl1t8oGLo= +github.com/gammazero/deque v1.1.0/go.mod h1:JVrR+Bj1NMQbPnYclvDlvSX0nVGReLrQZ0aUMuWLctg= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= @@ -117,6 +117,8 @@ github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmv github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= @@ -131,17 +133,19 @@ github.com/ipfs/go-cid v0.5.0 h1:goEKKhaGm0ul11IHA7I6p1GmKz8kEYniqFopaB5Otwg= github.com/ipfs/go-cid v0.5.0/go.mod h1:0L7vmeNXpQpUS9vt+yEARkJ8rOg43DF3iPgn4GIN0mk= github.com/ipfs/go-datastore v0.1.0/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= github.com/ipfs/go-datastore v0.1.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw= -github.com/ipfs/go-datastore v0.8.2 h1:Jy3wjqQR6sg/LhyY0NIePZC3Vux19nLtg7dx0TVqr6U= -github.com/ipfs/go-datastore v0.8.2/go.mod h1:W+pI1NsUsz3tcsAACMtfC+IZdnQTnC/7VfPoJBQuts0= +github.com/ipfs/go-datastore v0.8.3 h1:z391GsQyGKUIUof2tPoaZVeDknbt7fNHs6Gqjcw5Jo4= +github.com/ipfs/go-datastore v0.8.3/go.mod h1:raxQ/CreIy9L6MxT71ItfMX12/ASN6EhXJoUFjICQ2M= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-ds-badger v0.0.7/go.mod h1:qt0/fWzZDoPW6jpQeqUjR5kBfhDNB65jd9YlmAvpQBk= github.com/ipfs/go-ds-leveldb v0.1.0/go.mod h1:hqAW8y4bwX5LWcCtku2rFNX3vjDZCy5LZCg+cSZvYb8= +github.com/ipfs/go-dsqueue v0.0.4 h1:tesq26hKRYPG72Tu9kZKsbsLWp1KBfAxWNQlMyU17tk= +github.com/ipfs/go-dsqueue v0.0.4/go.mod h1:K68ng9BVl+gLr8fqCJKaoXnXqo6MzQ6nV0MhZZFEwg4= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc= github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM= -github.com/ipfs/go-log/v2 v2.8.0 h1:SptNTPJQV3s5EF4FdrTu/yVdOKfGbDgn1EBZx4til2o= -github.com/ipfs/go-log/v2 v2.8.0/go.mod h1:2LEEhdv8BGubPeSFTyzbqhCqrwqxCbuTNTLWqgNAipo= +github.com/ipfs/go-log/v2 v2.8.1 h1:Y/X36z7ASoLJaYIJAL4xITXgwf7RVeqb1+/25aq/Xk0= +github.com/ipfs/go-log/v2 v2.8.1/go.mod h1:NyhTBcZmh2Y55eWVjOeKf8M7e4pnJYM3yDZNxQBWEEY= github.com/ipfs/go-test v0.2.3 h1:Z/jXNAReQFtCYyn7bsv/ZqUwS6E7iIcSpJ2CuzCvnrc= github.com/ipfs/go-test v0.2.3/go.mod h1:QW8vSKkwYvWFwIZQLGQXdkt9Ud76eQXRQ9Ao2H+cA1o= github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH9C2E= diff --git a/provider/buffered/options.go b/provider/buffered/options.go new file mode 100644 index 000000000..11f9f4a96 --- /dev/null +++ b/provider/buffered/options.go @@ -0,0 +1,65 @@ +// Package buffered provides a buffered provider implementation that queues operations +// and processes them in batches for improved performance. +package buffered + +import "time" + +const ( + // DefaultDsName is the default datastore namespace for the buffered provider. + DefaultDsName = "bprov" // for buffered provider + // DefaultBatchSize is the default number of operations to process in a single batch. + DefaultBatchSize = 1 << 10 + // DefaultIdleWriteTime is the default duration to wait before flushing pending operations. + DefaultIdleWriteTime = time.Minute +) + +// config contains all options for the buffered provider. +type config struct { + dsName string + batchSize int + idleWriteTime time.Duration +} + +// Option is a function that configures the buffered provider. +type Option func(*config) + +// getOpts creates a config and applies Options to it. +func getOpts(opts []Option) config { + cfg := config{ + dsName: DefaultDsName, + batchSize: DefaultBatchSize, + idleWriteTime: DefaultIdleWriteTime, + } + + for _, opt := range opts { + opt(&cfg) + } + return cfg +} + +// WithDsName sets the datastore namespace for the buffered provider. +// If name is empty, the option is ignored. +func WithDsName(name string) Option { + return func(c *config) { + if len(name) > 0 { + c.dsName = name + } + } +} + +// WithBatchSize sets the number of operations to process in a single batch. +// If n is zero or negative, the option is ignored. +func WithBatchSize(n int) Option { + return func(c *config) { + if n > 0 { + c.batchSize = n + } + } +} + +// WithIdleWriteTime sets the duration to wait before flushing pending operations. +func WithIdleWriteTime(d time.Duration) Option { + return func(c *config) { + c.idleWriteTime = d + } +} diff --git a/provider/buffered/provider.go b/provider/buffered/provider.go new file mode 100644 index 000000000..3bfe8d455 --- /dev/null +++ b/provider/buffered/provider.go @@ -0,0 +1,244 @@ +package buffered + +import ( + "errors" + "sync" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-dsqueue" + logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p-kad-dht/provider" + "github.com/libp2p/go-libp2p-kad-dht/provider/internal" + mh "github.com/multiformats/go-multihash" +) + +var logger = logging.Logger(provider.LoggerName) + +const ( + // provideOnceOp represents a one-time provide operation. + provideOnceOp byte = iota + // startProvidingOp represents starting continuous providing. + startProvidingOp + // forceStartProvidingOp represents forcefully starting providing (overrides existing). + forceStartProvidingOp + // stopProvidingOp represents stopping providing. + stopProvidingOp + // lastOp is used for array sizing. + lastOp +) + +var _ internal.Provider = (*SweepingProvider)(nil) + +// SweepingProvider implements a buffered provider that queues operations and +// processes them asynchronously in batches. +type SweepingProvider struct { + closeOnce sync.Once + done chan struct{} + closed chan struct{} + provider internal.Provider + queue *dsqueue.DSQueue + batchSize int +} + +// New creates a new SweepingProvider that wraps the given provider with +// buffering capabilities. Operations are queued and processed asynchronously +// in batches for improved performance. +func New(prov internal.Provider, ds datastore.Batching, opts ...Option) *SweepingProvider { + cfg := getOpts(opts) + s := &SweepingProvider{ + done: make(chan struct{}), + closed: make(chan struct{}), + + provider: prov, + queue: dsqueue.New(ds, cfg.dsName, + dsqueue.WithDedupCacheSize(0), // disable deduplication + dsqueue.WithIdleWriteTime(cfg.idleWriteTime), + ), + batchSize: cfg.batchSize, + } + go s.worker() + return s +} + +// Close stops the provider and releases all resources. +// +// It waits for the worker goroutine to finish processing current operations +// and closes the underneath provider. The queue current state is persisted on +// the datastore. +func (s *SweepingProvider) Close() error { + var err error + s.closeOnce.Do(func() { + close(s.closed) + err = errors.Join(s.queue.Close(), s.provider.Close()) + <-s.done + }) + return err +} + +// toBytes serializes an operation and multihash into a byte slice for storage. +func toBytes(op byte, key mh.Multihash) []byte { + return append([]byte{op}, key...) +} + +// fromBytes deserializes a byte slice back into an operation and multihash. +func fromBytes(data []byte) (byte, mh.Multihash, error) { + op := data[0] + h, err := mh.Cast(data[1:]) + return op, h, err +} + +// getOperations processes a batch of dequeued operations and groups them by +// type. +// +// It discards multihashes from the `StopProviding` operation if +// `StartProviding` was called after `StopProviding` for the same multihash. +func getOperations(dequeued [][]byte) ([][]mh.Multihash, error) { + ops := [lastOp][]mh.Multihash{} + stopProv := make(map[string]struct{}) + + for _, bs := range dequeued { + op, h, err := fromBytes(bs) + if err != nil { + return nil, err + } + switch op { + case provideOnceOp: + ops[provideOnceOp] = append(ops[provideOnceOp], h) + case startProvidingOp, forceStartProvidingOp: + delete(stopProv, string(h)) + ops[op] = append(ops[op], h) + case stopProvidingOp: + stopProv[string(h)] = struct{}{} + } + } + for hstr := range stopProv { + ops[stopProvidingOp] = append(ops[stopProvidingOp], mh.Multihash(hstr)) + } + return ops[:], nil +} + +// worker processes operations from the queue in batches. +// It runs in a separate goroutine and continues until the provider is closed. +func (s *SweepingProvider) worker() { + defer close(s.done) + for { + select { + case <-s.closed: + return + default: + } + + res, err := s.queue.GetN(s.batchSize) + if err != nil { + logger.Warnf("BufferedSweepingProvider unable to dequeue: %v", err) + continue + } + ops, err := getOperations(res) + if err != nil { + logger.Warnf("BufferedSweepingProvider unable to parse dequeued item: %v", err) + continue + } + + // Process `StartProviding` (force=true) ops first, so that if + // `StartProviding` (force=false) is called after, there is no need to + // enqueue the multihash a second time to the provide queue. + err = s.provider.StartProviding(true, ops[forceStartProvidingOp]...) + if err != nil { + logger.Warnf("BufferedSweepingProvider unable to start providing (force): %v", err) + } + err = s.provider.StartProviding(false, ops[startProvidingOp]...) + if err != nil { + logger.Warnf("BufferedSweepingProvider unable to start providing: %v", err) + } + err = s.provider.ProvideOnce(ops[provideOnceOp]...) + if err != nil { + logger.Warnf("BufferedSweepingProvider unable to provide once: %v", err) + } + // Process `StopProviding` last, so that multihashes that should have been + // provided, and then stopped provided in the same batch are provided only + // once. Don't `StopProviding` multihashes, for which `StartProviding` has + // been called after `StopProviding`. + err = s.provider.StopProviding(ops[stopProvidingOp]...) + if err != nil { + logger.Warnf("BufferedSweepingProvider unable to stop providing: %v", err) + } + } +} + +// enqueue adds operations to the queue for asynchronous processing. +func (s *SweepingProvider) enqueue(op byte, keys ...mh.Multihash) error { + for _, h := range keys { + if err := s.queue.Put(toBytes(op, h)); err != nil { + return err + } + } + return nil +} + +// ProvideOnce enqueues multihashes for which the provider will send provider +// records out only once to the DHT swarm. It does NOT take the responsibility +// to reprovide these keys. +// +// Returns immediately after enqueuing the keys, the actual provide operation +// happens asynchronously. Returns an error if the multihashes couldn't be +// enqueued. +func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) error { + return s.enqueue(provideOnceOp, keys...) +} + +// StartProviding adds the supplied keys to the queue of keys that will be +// provided to the DHT swarm unless they were already provided in the past. The +// keys will be periodically reprovided until StopProviding is called for the +// same keys or the keys are removed from the Keystore. +// +// If force is true, the keys are provided to the DHT swarm regardless of +// whether they were already being reprovided in the past. +// +// Returns immediately after enqueuing the keys, the actual provide operation +// happens asynchronously. Returns an error if the multihashes couldn't be +// enqueued. +func (s *SweepingProvider) StartProviding(force bool, keys ...mh.Multihash) error { + op := startProvidingOp + if force { + op = forceStartProvidingOp + } + return s.enqueue(op, keys...) +} + +// StopProviding adds the supplied multihashes to the BufferedSweepingProvider +// queue, to stop reproviding the given keys to the DHT swarm. +// +// The node stops being referred as a provider when the provider records in the +// DHT swarm expire. +// +// Returns immediately after enqueuing the keys, the actual provide operation +// happens asynchronously. Returns an error if the multihashes couldn't be +// enqueued. +func (s *SweepingProvider) StopProviding(keys ...mh.Multihash) error { + return s.enqueue(stopProvidingOp, keys...) +} + +// Clear clears the all the keys from the provide queue and returns the number +// of keys that were cleared. +// +// The keys are not deleted from the keystore, so they will continue to be +// reprovided as scheduled. +func (s *SweepingProvider) Clear() int { + return s.provider.Clear() +} + +// RefreshSchedule scans the KeyStore for any keys that are not currently +// scheduled for reproviding. If such keys are found, it schedules their +// associated keyspace region to be reprovided. +// +// This function doesn't remove prefixes that have no keys from the schedule. +// This is done automatically during the reprovide operation if a region has no +// keys. +// +// Returns an error if the provider is closed or if the node is currently +// Offline (either never bootstrapped, or disconnected since more than +// `OfflineDelay`). The schedule depends on the network size, hence recent +// network connectivity is essential. +func (s *SweepingProvider) RefreshSchedule() error { + return s.provider.RefreshSchedule() +} diff --git a/dual/provider/options.go b/provider/dual/options.go similarity index 99% rename from dual/provider/options.go rename to provider/dual/options.go index a7ee5f41d..281ecf6d2 100644 --- a/dual/provider/options.go +++ b/provider/dual/options.go @@ -1,4 +1,4 @@ -package provider +package dual import ( "errors" diff --git a/dual/provider/provider.go b/provider/dual/provider.go similarity index 93% rename from dual/provider/provider.go rename to provider/dual/provider.go index 6099b2c7f..f14840698 100644 --- a/dual/provider/provider.go +++ b/provider/dual/provider.go @@ -1,4 +1,4 @@ -package provider +package dual import ( "context" @@ -10,9 +10,12 @@ import ( "github.com/libp2p/go-libp2p-kad-dht/dual" "github.com/libp2p/go-libp2p-kad-dht/provider" "github.com/libp2p/go-libp2p-kad-dht/provider/datastore" + "github.com/libp2p/go-libp2p-kad-dht/provider/internal" mh "github.com/multiformats/go-multihash" ) +var _ internal.Provider = (*SweepingProvider)(nil) + // SweepingProvider manages provides and reprovides for both DHT swarms (LAN // and WAN) in the dual DHT setup. type SweepingProvider struct { @@ -98,6 +101,13 @@ func (s *SweepingProvider) runOnBoth(f func(*provider.SweepingProvider) error) e return errors.Join(errs[:]...) } +// Close stops both DHT providers and releases associated resources. +func (s *SweepingProvider) Close() error { + return s.runOnBoth(func(p *provider.SweepingProvider) error { + return p.Close() + }) +} + // ProvideOnce sends provider records for the specified keys to both DHT swarms // only once. It does not automatically reprovide those keys afterward. // @@ -191,18 +201,3 @@ func (s *SweepingProvider) RefreshSchedule() error { return p.RefreshSchedule() }) } - -var ( - _ dhtProvider = (*SweepingProvider)(nil) - _ dhtProvider = (*provider.SweepingProvider)(nil) -) - -// dhtProvider is the interface to ensure that SweepingProvider and -// provider.SweepingProvider share the same interface. -type dhtProvider interface { - StartProviding(force bool, keys ...mh.Multihash) error - StopProviding(keys ...mh.Multihash) error - ProvideOnce(keys ...mh.Multihash) error - Clear() int - RefreshSchedule() error -} diff --git a/provider/internal/interface.go b/provider/internal/interface.go new file mode 100644 index 000000000..0c63e0aee --- /dev/null +++ b/provider/internal/interface.go @@ -0,0 +1,14 @@ +package internal + +import ( + mh "github.com/multiformats/go-multihash" +) + +type Provider interface { + StartProviding(force bool, keys ...mh.Multihash) error + StopProviding(keys ...mh.Multihash) error + ProvideOnce(keys ...mh.Multihash) error + Clear() int + RefreshSchedule() error + Close() error +} diff --git a/provider/provider.go b/provider/provider.go index c26247d7c..20520545d 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -27,6 +27,7 @@ import ( pb "github.com/libp2p/go-libp2p-kad-dht/pb" "github.com/libp2p/go-libp2p-kad-dht/provider/datastore" + "github.com/libp2p/go-libp2p-kad-dht/provider/internal" "github.com/libp2p/go-libp2p-kad-dht/provider/internal/connectivity" "github.com/libp2p/go-libp2p-kad-dht/provider/internal/keyspace" "github.com/libp2p/go-libp2p-kad-dht/provider/internal/queue" @@ -103,6 +104,8 @@ const ( burstWorker ) +var _ internal.Provider = (*SweepingProvider)(nil) + type SweepingProvider struct { done chan struct{} ctx context.Context From e10840aa800355d45470ba542c456dbba71f4cbb Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Sun, 14 Sep 2025 17:55:48 +0200 Subject: [PATCH 08/12] tests --- provider/buffered/provider_test.go | 292 +++++++++++++++++++++++++++++ 1 file changed, 292 insertions(+) create mode 100644 provider/buffered/provider_test.go diff --git a/provider/buffered/provider_test.go b/provider/buffered/provider_test.go new file mode 100644 index 000000000..302a92b41 --- /dev/null +++ b/provider/buffered/provider_test.go @@ -0,0 +1,292 @@ +//go:build go1.25 +// +build go1.25 + +package buffered + +import ( + "bytes" + "sync" + "testing" + "testing/synctest" + "time" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-test/random" + "github.com/libp2p/go-libp2p-kad-dht/provider/internal" + mh "github.com/multiformats/go-multihash" +) + +var _ internal.Provider = (*fakeProvider)(nil) + +type fakeProvider struct { + mu sync.Mutex + provideOnceCalls [][]mh.Multihash + startProvidingCalls []startProvidingCall + stopProvidingCalls [][]mh.Multihash + + // Signal when operations are processed + processed chan struct{} +} + +type startProvidingCall struct { + force bool + keys []mh.Multihash +} + +func (f *fakeProvider) ProvideOnce(keys ...mh.Multihash) error { + f.mu.Lock() + defer f.mu.Unlock() + if len(keys) > 0 { + f.provideOnceCalls = append(f.provideOnceCalls, keys) + if f.processed != nil { + select { + case f.processed <- struct{}{}: + default: + } + } + } + return nil +} + +func (f *fakeProvider) StartProviding(force bool, keys ...mh.Multihash) error { + f.mu.Lock() + defer f.mu.Unlock() + if len(keys) > 0 { + f.startProvidingCalls = append(f.startProvidingCalls, startProvidingCall{ + force: force, + keys: keys, + }) + if f.processed != nil { + select { + case f.processed <- struct{}{}: + default: + } + } + } + return nil +} + +func (f *fakeProvider) StopProviding(keys ...mh.Multihash) error { + f.mu.Lock() + defer f.mu.Unlock() + if len(keys) > 0 { + f.stopProvidingCalls = append(f.stopProvidingCalls, keys) + if f.processed != nil { + select { + case f.processed <- struct{}{}: + default: + } + } + } + return nil +} + +func (f *fakeProvider) Clear() int { + // Unused + return 0 +} + +func (f *fakeProvider) RefreshSchedule() error { + // Unused + return nil +} + +func (f *fakeProvider) Close() error { + // Unused + return nil +} + +func newFakeProvider() *fakeProvider { + return &fakeProvider{ + processed: make(chan struct{}, 10), // Buffered channel for test signaling + } +} + +func TestQueueingMechanism(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + fake := newFakeProvider() + ds := datastore.NewMapDatastore() + provider := New(fake, ds, + WithDsName("test1"), + WithIdleWriteTime(time.Millisecond), + WithBatchSize(10)) + defer provider.Close() + + keys := random.Multihashes(3) + + // Queue various operations + if err := provider.ProvideOnce(keys[0]); err != nil { + t.Fatalf("ProvideOnce failed: %v", err) + } + if err := provider.StartProviding(false, keys[1]); err != nil { + t.Fatalf("StartProviding failed: %v", err) + } + if err := provider.StartProviding(true, keys[2]); err != nil { + t.Fatalf("StartProviding (force) failed: %v", err) + } + if err := provider.StopProviding(keys[0]); err != nil { + t.Fatalf("StopProviding failed: %v", err) + } + + // Wait for operations to be processed by expecting 4 signals + for i := 0; i < 4; i++ { + select { + case <-fake.processed: + case <-time.After(time.Second): + t.Fatalf("Timeout waiting for operation %d to be processed", i+1) + } + } + + // Verify all operations were dequeued and processed + if len(fake.provideOnceCalls) != 1 { + t.Errorf("Expected 1 ProvideOnce call, got %d", len(fake.provideOnceCalls)) + } else if len(fake.provideOnceCalls[0]) != 1 || !bytes.Equal(fake.provideOnceCalls[0][0], keys[0]) { + t.Errorf("Expected ProvideOnce call with keys[0], got %v", fake.provideOnceCalls[0]) + } + + if len(fake.startProvidingCalls) != 2 { + t.Errorf("Expected 2 StartProviding calls, got %d", len(fake.startProvidingCalls)) + } else { + // Check that we have one force=true call and one force=false call + foundForce := false + foundRegular := false + for _, call := range fake.startProvidingCalls { + if call.force { + foundForce = true + if len(call.keys) != 1 || !bytes.Equal(call.keys[0], keys[2]) { + t.Errorf("Expected force StartProviding call with keys[2], got %v", call.keys) + } + } else { + foundRegular = true + if len(call.keys) != 1 || !bytes.Equal(call.keys[0], keys[1]) { + t.Errorf("Expected regular StartProviding call with keys[1], got %v", call.keys) + } + } + } + if !foundForce { + t.Errorf("Expected to find a StartProviding call with force=true") + } + if !foundRegular { + t.Errorf("Expected to find a StartProviding call with force=false") + } + } + + if len(fake.stopProvidingCalls) != 1 { + t.Errorf("Expected 1 StopProviding call, got %d", len(fake.stopProvidingCalls)) + } else if len(fake.stopProvidingCalls[0]) != 1 || !bytes.Equal(fake.stopProvidingCalls[0][0], keys[0]) { + t.Errorf("Expected StopProviding call with keys[0], got %v", fake.stopProvidingCalls[0]) + } + }) +} + +func TestStartProvidingAfterStopProvidingRemovesStopOperation(t *testing.T) { + // Test the core logic directly by calling getOperations with known data + t.Run("DirectTest", func(t *testing.T) { + key := random.Multihashes(1)[0] + + // Create batch data that simulates StopProviding followed by StartProviding + stopData := toBytes(stopProvidingOp, key) + startData := toBytes(startProvidingOp, key) + + dequeued := [][]byte{stopData, startData} + ops, err := getOperations(dequeued) // We need to create this helper + if err != nil { + t.Fatalf("getOperations failed: %v", err) + } + + // StartProviding should be present + if len(ops[startProvidingOp]) != 1 || !bytes.Equal(ops[startProvidingOp][0], key) { + t.Errorf("Expected StartProviding operation with key, got %v", ops[startProvidingOp]) + } + + // StopProviding should be canceled (empty) + if len(ops[stopProvidingOp]) != 0 { + t.Errorf("Expected StopProviding operations to be canceled, got %v", ops[stopProvidingOp]) + } + }) +} + +func TestMultipleOperationsOnSameKey(t *testing.T) { + // Test the core batch processing logic directly + t.Run("DirectTest", func(t *testing.T) { + key := random.Multihashes(1)[0] + + // Create batch data with multiple operations on same key + ops := [][]byte{ + toBytes(stopProvidingOp, key), // StopProviding + toBytes(forceStartProvidingOp, key), // StartProviding(force=true) + toBytes(stopProvidingOp, key), // StopProviding again + toBytes(startProvidingOp, key), // StartProviding(force=false) + } + + processed, err := getOperations(ops) + if err != nil { + t.Fatalf("getOperations failed: %v", err) + } + + // Should have 2 StartProviding operations + if len(processed[startProvidingOp]) != 1 { + t.Errorf("Expected 1 StartProviding (force=false) operation, got %d", len(processed[startProvidingOp])) + } + if len(processed[forceStartProvidingOp]) != 1 { + t.Errorf("Expected 1 StartProviding (force=true) operation, got %d", len(processed[forceStartProvidingOp])) + } + + // StopProviding should be canceled (empty) because StartProviding operations were in same batch + if len(processed[stopProvidingOp]) != 0 { + t.Errorf("Expected 0 StopProviding operations (should be canceled), got %d", len(processed[stopProvidingOp])) + } + }) +} + +func TestBatchProcessing(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + fake := newFakeProvider() + ds := datastore.NewMapDatastore() + provider := New(fake, ds, + WithDsName("test4"), + WithBatchSize(3), // Process 3 operations at once + WithIdleWriteTime(time.Second)) + defer provider.Close() + + // Queue multiple keys - total of 3 operations (2 from ProvideOnce + 1 from StartProviding) + keys := random.Multihashes(3) + + if err := provider.ProvideOnce(keys[0], keys[1]); err != nil { + t.Fatalf("ProvideOnce failed: %v", err) + } + if err := provider.StartProviding(false, keys[2]); err != nil { + t.Fatalf("StartProviding failed: %v", err) + } + + // Wait for batch to be triggered (should process all 3 operations in one batch) + // Expect 2 signals: 1 for ProvideOnce (with 2 keys), 1 for StartProviding (with 1 key) + for i := 0; i < 2; i++ { + select { + case <-fake.processed: + case <-time.After(time.Second): + t.Fatalf("Timeout waiting for operation %d to be processed", i+1) + } + } + + // Close to ensure all operations are flushed + provider.Close() + + // Verify operations were batched correctly + totalProvideOnceCalls := 0 + for _, call := range fake.provideOnceCalls { + totalProvideOnceCalls += len(call) + } + if totalProvideOnceCalls != 2 { + t.Errorf("Expected 2 total keys in ProvideOnce calls, got %d", totalProvideOnceCalls) + } + + totalStartProvidingCalls := 0 + for _, call := range fake.startProvidingCalls { + totalStartProvidingCalls += len(call.keys) + } + if totalStartProvidingCalls != 1 { + t.Errorf("Expected 1 total key in StartProviding calls, got %d", totalStartProvidingCalls) + } + }) +} From 0551bf2b352c404c985731011a57f873463af81d Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Tue, 16 Sep 2025 09:50:01 +0200 Subject: [PATCH 09/12] removing redundant code --- provider/buffered/provider.go | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/provider/buffered/provider.go b/provider/buffered/provider.go index 3bfe8d455..1d704e5ba 100644 --- a/provider/buffered/provider.go +++ b/provider/buffered/provider.go @@ -117,6 +117,17 @@ func getOperations(dequeued [][]byte) ([][]mh.Multihash, error) { return ops[:], nil } +// executeOperation executes a provider operation on the underlying provider +// with the given multihashes, logging any errors encountered. +func executeOperation(f func(...mh.Multihash) error, keys []mh.Multihash) { + if len(keys) == 0 { + return + } + if err := f(keys...); err != nil { + logger.Warn(err) + } +} + // worker processes operations from the queue in batches. // It runs in a separate goroutine and continues until the provider is closed. func (s *SweepingProvider) worker() { @@ -142,26 +153,14 @@ func (s *SweepingProvider) worker() { // Process `StartProviding` (force=true) ops first, so that if // `StartProviding` (force=false) is called after, there is no need to // enqueue the multihash a second time to the provide queue. - err = s.provider.StartProviding(true, ops[forceStartProvidingOp]...) - if err != nil { - logger.Warnf("BufferedSweepingProvider unable to start providing (force): %v", err) - } - err = s.provider.StartProviding(false, ops[startProvidingOp]...) - if err != nil { - logger.Warnf("BufferedSweepingProvider unable to start providing: %v", err) - } - err = s.provider.ProvideOnce(ops[provideOnceOp]...) - if err != nil { - logger.Warnf("BufferedSweepingProvider unable to provide once: %v", err) - } + executeOperation(func(keys ...mh.Multihash) error { return s.provider.StartProviding(true, keys...) }, ops[forceStartProvidingOp]) + executeOperation(func(keys ...mh.Multihash) error { return s.provider.StartProviding(false, keys...) }, ops[startProvidingOp]) + executeOperation(s.provider.ProvideOnce, ops[provideOnceOp]) // Process `StopProviding` last, so that multihashes that should have been // provided, and then stopped provided in the same batch are provided only // once. Don't `StopProviding` multihashes, for which `StartProviding` has // been called after `StopProviding`. - err = s.provider.StopProviding(ops[stopProvidingOp]...) - if err != nil { - logger.Warnf("BufferedSweepingProvider unable to stop providing: %v", err) - } + executeOperation(s.provider.StopProviding, ops[stopProvidingOp]) } } From 743cd49af3728ebc2ebd1efe0524e0f6ae611078 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Tue, 16 Sep 2025 09:58:28 +0200 Subject: [PATCH 10/12] docs --- provider/buffered/provider.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/provider/buffered/provider.go b/provider/buffered/provider.go index 1d704e5ba..c2a29bd5a 100644 --- a/provider/buffered/provider.go +++ b/provider/buffered/provider.go @@ -29,8 +29,9 @@ const ( var _ internal.Provider = (*SweepingProvider)(nil) -// SweepingProvider implements a buffered provider that queues operations and -// processes them asynchronously in batches. +// buffered.SweepingProvider is a wrapper around a SweepingProvider buffering +// requests, to allow core operations to return instantly. Operations are +// queued and processed asynchronously in batches for improved performance. type SweepingProvider struct { closeOnce sync.Once done chan struct{} @@ -149,6 +150,8 @@ func (s *SweepingProvider) worker() { logger.Warnf("BufferedSweepingProvider unable to parse dequeued item: %v", err) continue } + // Execute the 4 kinds of queued provider operations on the underlying + // provider. // Process `StartProviding` (force=true) ops first, so that if // `StartProviding` (force=false) is called after, there is no need to From 339b0f0b8ddf00bc4e4b3f8c5227486b60a5ab41 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Tue, 16 Sep 2025 10:20:14 +0200 Subject: [PATCH 11/12] wait on empty queue --- provider/buffered/provider.go | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/provider/buffered/provider.go b/provider/buffered/provider.go index c2a29bd5a..9e407aca4 100644 --- a/provider/buffered/provider.go +++ b/provider/buffered/provider.go @@ -36,6 +36,8 @@ type SweepingProvider struct { closeOnce sync.Once done chan struct{} closed chan struct{} + + newItems chan struct{} provider internal.Provider queue *dsqueue.DSQueue batchSize int @@ -50,6 +52,7 @@ func New(prov internal.Provider, ds datastore.Batching, opts ...Option) *Sweepin done: make(chan struct{}), closed: make(chan struct{}), + newItems: make(chan struct{}, 1), provider: prov, queue: dsqueue.New(ds, cfg.dsName, dsqueue.WithDedupCacheSize(0), // disable deduplication @@ -133,11 +136,22 @@ func executeOperation(f func(...mh.Multihash) error, keys []mh.Multihash) { // It runs in a separate goroutine and continues until the provider is closed. func (s *SweepingProvider) worker() { defer close(s.done) + var emptyQueue bool for { - select { - case <-s.closed: - return - default: + if emptyQueue { + select { + case <-s.closed: + return + case <-s.newItems: + } + emptyQueue = false + } else { + select { + case <-s.closed: + return + case <-s.newItems: + default: + } } res, err := s.queue.GetN(s.batchSize) @@ -145,6 +159,10 @@ func (s *SweepingProvider) worker() { logger.Warnf("BufferedSweepingProvider unable to dequeue: %v", err) continue } + if len(res) < s.batchSize { + // Queue was fully drained. + emptyQueue = true + } ops, err := getOperations(res) if err != nil { logger.Warnf("BufferedSweepingProvider unable to parse dequeued item: %v", err) @@ -174,6 +192,10 @@ func (s *SweepingProvider) enqueue(op byte, keys ...mh.Multihash) error { return err } } + select { + case s.newItems <- struct{}{}: + default: + } return nil } From 450daee602a86ac850023ffe776ce74ce7ef9cca Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Tue, 16 Sep 2025 10:20:42 +0200 Subject: [PATCH 12/12] fix flaky test --- provider/buffered/provider_test.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/provider/buffered/provider_test.go b/provider/buffered/provider_test.go index 302a92b41..3c66ae56b 100644 --- a/provider/buffered/provider_test.go +++ b/provider/buffered/provider_test.go @@ -258,16 +258,7 @@ func TestBatchProcessing(t *testing.T) { if err := provider.StartProviding(false, keys[2]); err != nil { t.Fatalf("StartProviding failed: %v", err) } - - // Wait for batch to be triggered (should process all 3 operations in one batch) - // Expect 2 signals: 1 for ProvideOnce (with 2 keys), 1 for StartProviding (with 1 key) - for i := 0; i < 2; i++ { - select { - case <-fake.processed: - case <-time.After(time.Second): - t.Fatalf("Timeout waiting for operation %d to be processed", i+1) - } - } + synctest.Wait() // Close to ensure all operations are flushed provider.Close()