Skip to content

Commit dd0c337

Browse files
provider: ResettableKeyStore (#1146)
1 parent 16a883e commit dd0c337

File tree

6 files changed

+193
-53
lines changed

6 files changed

+193
-53
lines changed

dual/provider/options.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const (
1919
)
2020

2121
type config struct {
22-
keyStore *datastore.KeyStore
22+
keyStore datastore.KeyStore
2323

2424
reprovideInterval [2]time.Duration // [0] = LAN, [1] = WAN
2525
maxReprovideDelay [2]time.Duration
@@ -87,7 +87,7 @@ var DefaultConfig = func(cfg *config) error {
8787
return nil
8888
}
8989

90-
func WithKeyStore(keyStore *datastore.KeyStore) Option {
90+
func WithKeyStore(keyStore datastore.KeyStore) Option {
9191
return func(cfg *config) error {
9292
if keyStore == nil {
9393
return errors.New("provider config: keyStore cannot be nil")

dual/provider/provider.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type SweepingProvider struct {
1919
dht *dual.DHT
2020
LAN *provider.SweepingProvider
2121
WAN *provider.SweepingProvider
22-
keyStore *datastore.KeyStore
22+
keyStore datastore.KeyStore
2323
}
2424

2525
// New creates a new SweepingProvider that manages provides and reprovides for

provider/datastore/keystore.go

Lines changed: 185 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package datastore
33
import (
44
"context"
55
"encoding/base64"
6+
"errors"
67
"fmt"
78
"strings"
89
"sync"
10+
"sync/atomic"
911

1012
"github.com/ipfs/go-cid"
1113
ds "github.com/ipfs/go-datastore"
@@ -18,8 +20,19 @@ import (
1820
"github.com/probe-lab/go-libdht/kad/key/bitstr"
1921
)
2022

21-
// KeyStore indexes multihashes by their kademlia identifier.
22-
type KeyStore struct {
23+
// KeyStore provides thread-safe storage and retrieval of multihashes, indexed
24+
// by their kademlia 256-bit identifier.
25+
type KeyStore interface {
26+
Put(context.Context, ...mh.Multihash) ([]mh.Multihash, error)
27+
Get(context.Context, bitstr.Key) ([]mh.Multihash, error)
28+
ContainsPrefix(context.Context, bitstr.Key) (bool, error)
29+
Delete(context.Context, ...mh.Multihash) error
30+
Empty(context.Context) error
31+
Size(context.Context) (int, error)
32+
}
33+
34+
// keyStore indexes multihashes by their kademlia identifier.
35+
type keyStore struct {
2336
lk sync.Mutex
2437

2538
ds ds.Batching
@@ -91,50 +104,22 @@ func WithBatchSize(size int) KeyStoreOption {
91104
}
92105

93106
// NewKeyStore creates a new KeyStore backed by the provided datastore.
94-
func NewKeyStore(d ds.Batching, opts ...KeyStoreOption) (*KeyStore, error) {
107+
func NewKeyStore(d ds.Batching, opts ...KeyStoreOption) (KeyStore, error) {
95108
var cfg keyStoreCfg
96109
opts = append([]KeyStoreOption{KeyStoreDefaultCfg}, opts...)
97110
for i, o := range opts {
98111
if err := o(&cfg); err != nil {
99112
return nil, fmt.Errorf("KeyStore option %d failed: %w", i, err)
100113
}
101114
}
102-
return &KeyStore{
115+
return &keyStore{
103116
ds: d,
104117
base: ds.NewKey(cfg.base),
105118
prefixBits: cfg.prefixBits,
106119
batchSize: cfg.batchSize,
107120
}, nil
108121
}
109122

110-
// ResetCids purges the KeyStore and repopulates it with the provided cids'
111-
// multihashes.
112-
func (s *KeyStore) ResetCids(ctx context.Context, keysChan <-chan cid.Cid) error {
113-
s.lk.Lock()
114-
defer s.lk.Unlock()
115-
116-
err := s.emptyLocked(ctx)
117-
if err != nil {
118-
return fmt.Errorf("KeyStore empty failed during reset: %w", err)
119-
}
120-
keys := make([]mh.Multihash, 0, s.batchSize)
121-
for c := range keysChan {
122-
keys = append(keys, c.Hash())
123-
if len(keys) == cap(keys) {
124-
_, err = s.putLocked(ctx, keys...)
125-
if err != nil {
126-
return fmt.Errorf("KeyStore put failed during reset: %w", err)
127-
}
128-
keys = keys[:0]
129-
}
130-
}
131-
_, err = s.putLocked(ctx, keys...)
132-
if err != nil {
133-
return fmt.Errorf("KeyStore put failed during reset: %w", err)
134-
}
135-
return nil
136-
}
137-
138123
// dsKey returns the datastore key for the provided binary key.
139124
//
140125
// The function creates a hierarchical datastore key by expanding bits into
@@ -179,7 +164,7 @@ func dsKey[K kad.Key[K]](k K, prefixBits int, base ds.Key) ds.Key {
179164
// "base/bit0/bit1/.../bitN/base64url_suffix"
180165
//
181166
// Returns the reconstructed 256-bit key or an error if base64URL decoding fails.
182-
func (s *KeyStore) decodeKey(dsk string) (bit256.Key, error) {
167+
func (s *keyStore) decodeKey(dsk string) (bit256.Key, error) {
183168
dsk = dsk[len(s.base.String()):] // remove leading prefix
184169
bs := make([]byte, 32)
185170
// Extract individual bits from odd positions (skip '/' separators)
@@ -204,7 +189,7 @@ type pair struct {
204189

205190
// putLocked stores the provided keys while assuming s.lk is already held, and
206191
// returns the keys that weren't present already in the keystore.
207-
func (s *KeyStore) putLocked(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihash, error) {
192+
func (s *keyStore) putLocked(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihash, error) {
208193
seen := make(map[bit256.Key]struct{}, len(keys))
209194
toPut := make([]pair, 0, len(keys))
210195

@@ -252,7 +237,7 @@ func (s *KeyStore) putLocked(ctx context.Context, keys ...mh.Multihash) ([]mh.Mu
252237
// Put stores the provided keys in the underlying datastore, grouping them by
253238
// the first prefixLen bits. It returns only the keys that were not previously
254239
// persisted in the datastore (i.e., newly added keys).
255-
func (s *KeyStore) Put(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihash, error) {
240+
func (s *keyStore) Put(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihash, error) {
256241
if len(keys) == 0 {
257242
return nil, nil
258243
}
@@ -264,7 +249,7 @@ func (s *KeyStore) Put(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihas
264249

265250
// Get returns all keys whose bit256 representation matches the provided
266251
// prefix.
267-
func (s *KeyStore) Get(ctx context.Context, prefix bitstr.Key) ([]mh.Multihash, error) {
252+
func (s *keyStore) Get(ctx context.Context, prefix bitstr.Key) ([]mh.Multihash, error) {
268253
s.lk.Lock()
269254
defer s.lk.Unlock()
270255

@@ -300,7 +285,7 @@ func (s *KeyStore) Get(ctx context.Context, prefix bitstr.Key) ([]mh.Multihash,
300285
// ContainsPrefix reports whether the KeyStore currently holds at least one
301286
// multihash whose kademlia identifier (bit256.Key) starts with the provided
302287
// bit-prefix.
303-
func (s *KeyStore) ContainsPrefix(ctx context.Context, prefix bitstr.Key) (bool, error) {
288+
func (s *keyStore) ContainsPrefix(ctx context.Context, prefix bitstr.Key) (bool, error) {
304289
s.lk.Lock()
305290
defer s.lk.Unlock()
306291

@@ -337,8 +322,8 @@ func (s *KeyStore) ContainsPrefix(ctx context.Context, prefix bitstr.Key) (bool,
337322

338323
// emptyLocked deletes all entries under the datastore prefix, assuming s.lk is
339324
// already held.
340-
func (s *KeyStore) emptyLocked(ctx context.Context) error {
341-
res, err := s.ds.Query(ctx, query.Query{Prefix: s.base.String(), KeysOnly: true})
325+
func emptyLocked(ctx context.Context, d ds.Batching, base ds.Key, batchSize int) error {
326+
res, err := d.Query(ctx, query.Query{Prefix: base.String(), KeysOnly: true})
342327
if err != nil {
343328
return err
344329
}
@@ -348,7 +333,7 @@ func (s *KeyStore) emptyLocked(ctx context.Context) error {
348333
if len(keys) == 0 {
349334
return nil
350335
}
351-
b, err := s.ds.Batch(ctx)
336+
b, err := d.Batch(ctx)
352337
if err != nil {
353338
return nil
354339
}
@@ -359,7 +344,7 @@ func (s *KeyStore) emptyLocked(ctx context.Context) error {
359344
}
360345
return b.Commit(ctx)
361346
}
362-
keys := make([]ds.Key, 0, s.batchSize)
347+
keys := make([]ds.Key, 0, batchSize)
363348
for r := range res.Next() {
364349
if r.Error != nil {
365350
return r.Error
@@ -376,15 +361,15 @@ func (s *KeyStore) emptyLocked(ctx context.Context) error {
376361
}
377362

378363
// Empty deletes all entries under the datastore prefix.
379-
func (s *KeyStore) Empty(ctx context.Context) error {
364+
func (s *keyStore) Empty(ctx context.Context) error {
380365
s.lk.Lock()
381366
defer s.lk.Unlock()
382367

383-
return s.emptyLocked(ctx)
368+
return emptyLocked(ctx, s.ds, s.base, s.batchSize)
384369
}
385370

386371
// Delete removes the given keys from datastore.
387-
func (s *KeyStore) Delete(ctx context.Context, keys ...mh.Multihash) error {
372+
func (s *keyStore) Delete(ctx context.Context, keys ...mh.Multihash) error {
388373
if len(keys) == 0 {
389374
return nil
390375
}
@@ -409,7 +394,7 @@ func (s *KeyStore) Delete(ctx context.Context, keys ...mh.Multihash) error {
409394
//
410395
// The size is obtained by iterating over all keys in the underlying
411396
// datastore, so it may be expensive for large stores.
412-
func (s *KeyStore) Size(ctx context.Context) (size int, err error) {
397+
func (s *keyStore) Size(ctx context.Context) (size int, err error) {
413398
s.lk.Lock()
414399
defer s.lk.Unlock()
415400

@@ -429,3 +414,158 @@ func (s *KeyStore) Size(ctx context.Context) (size int, err error) {
429414
}
430415
return
431416
}
417+
418+
var ErrResetInProgress = errors.New("reset already in progress")
419+
420+
const (
421+
ready uint32 = iota
422+
inProgress
423+
cleanup
424+
)
425+
426+
// ResettableKeyStore is a KeyStore implementation that supports atomic reset
427+
// operations. It maintains two alternate bases in the underlying datastore and
428+
// can swap between them to provide atomic replacement of all stored keys.
429+
//
430+
// The reset operation allows replacing all stored multihashes with a new set
431+
// without interrupting concurrent read/write operations. During a reset, new
432+
// writes are duplicated to both the current and alternate storage bases to
433+
// maintain consistency.
434+
type ResettableKeyStore struct {
435+
keyStore
436+
437+
altBase ds.Key
438+
// resetInProgress tracks the state of the reset operation:
439+
// - 0: no reset in progress, ready for new reset
440+
// - 1: reset in progress, writing to alternate base
441+
// - 2: post reset cleanup
442+
resetState atomic.Uint32
443+
}
444+
445+
var _ KeyStore = (*ResettableKeyStore)(nil)
446+
447+
// NewResettableKeyStore creates a new ResettableKeyStore backed by the
448+
// provided datastore. It automatically adds "/0" and "/1" suffixes to the
449+
// configured base prefix to create two alternate storage locations for atomic
450+
// reset operations.
451+
func NewResettableKeyStore(d ds.Batching, opts ...KeyStoreOption) (*ResettableKeyStore, error) {
452+
var cfg keyStoreCfg
453+
opts = append([]KeyStoreOption{KeyStoreDefaultCfg}, opts...)
454+
for i, o := range opts {
455+
if err := o(&cfg); err != nil {
456+
return nil, fmt.Errorf("KeyStore option %d failed: %w", i, err)
457+
}
458+
}
459+
return &ResettableKeyStore{
460+
keyStore: keyStore{
461+
ds: d,
462+
base: ds.NewKey(cfg.base).ChildString("0"),
463+
prefixBits: cfg.prefixBits,
464+
batchSize: cfg.batchSize,
465+
},
466+
altBase: ds.NewKey(cfg.base).ChildString("1"),
467+
}, nil
468+
}
469+
470+
func batchPut(ctx context.Context, d ds.Batching, base ds.Key, prefixBits int, keys []mh.Multihash) error {
471+
b, err := d.Batch(ctx)
472+
if err != nil {
473+
return err
474+
}
475+
for _, h := range keys {
476+
dsk := dsKey(keyspace.MhToBit256(h), prefixBits, base)
477+
if err := b.Put(ctx, dsk, h); err != nil {
478+
return err
479+
}
480+
}
481+
return b.Commit(ctx)
482+
}
483+
484+
// ResetCids atomically replaces all stored keys with the CIDs received from
485+
// keysChan. The operation is thread-safe and non-blocking for concurrent reads
486+
// and writes.
487+
//
488+
// During the reset:
489+
// - New keys from keysChan are written to an alternate storage location
490+
// - Concurrent Put operations are duplicated to both current and alternate
491+
// locations
492+
// - Once all keys are processed, storage locations are atomically swapped
493+
// - The old storage location is cleaned up
494+
//
495+
// Returns ErrResetInProgress if another reset operation is already running.
496+
// The operation can be cancelled via context, which will clean up partial
497+
// state.
498+
func (s *ResettableKeyStore) ResetCids(ctx context.Context, keysChan <-chan cid.Cid) error {
499+
if !s.resetState.CompareAndSwap(ready, inProgress) {
500+
return ErrResetInProgress
501+
}
502+
defer s.resetState.Store(ready)
503+
504+
// Make sure the alternate base is empty
505+
if err := emptyLocked(ctx, s.ds, s.altBase, s.batchSize); err != nil {
506+
return fmt.Errorf("ResettableKeyStore empty failed during reset: %w", err)
507+
}
508+
509+
// Read all the keys from the channel and put them in batch at the alternate
510+
// base
511+
keys := make([]mh.Multihash, 0, s.batchSize)
512+
loop:
513+
for {
514+
select {
515+
case <-ctx.Done():
516+
// Context cancelled, abort reset and clean up alternate base
517+
s.resetState.Store(cleanup)
518+
if err := emptyLocked(ctx, s.ds, s.altBase, s.batchSize); err != nil {
519+
return fmt.Errorf("ResettableKeyStore empty failed during reset: %w", err)
520+
}
521+
return nil
522+
case c, ok := <-keysChan:
523+
if !ok {
524+
break loop
525+
}
526+
keys = append(keys, c.Hash())
527+
if len(keys) == cap(keys) {
528+
if err := batchPut(ctx, s.ds, s.altBase, s.prefixBits, keys); err != nil {
529+
return fmt.Errorf("ResettableKeyStore put failed during reset: %w", err)
530+
}
531+
keys = keys[:0]
532+
}
533+
}
534+
}
535+
// Put final batch
536+
if err := batchPut(ctx, s.ds, s.altBase, s.prefixBits, keys); err != nil {
537+
return fmt.Errorf("ResettableKeyStore put failed during reset: %w", err)
538+
}
539+
540+
// Swap the bases. The base in use is now the one to which we just wrote.
541+
s.lk.Lock()
542+
oldBase := s.base
543+
s.base = s.altBase
544+
s.resetState.Store(cleanup)
545+
s.lk.Unlock()
546+
s.altBase = oldBase
547+
548+
// Clean up the old base
549+
if err := emptyLocked(ctx, s.ds, oldBase, s.batchSize); err != nil {
550+
return fmt.Errorf("ResettableKeyStore empty failed during reset: %w", err)
551+
}
552+
553+
return nil
554+
}
555+
556+
// Put stores the provided keys in the underlying datastore, with special
557+
// handling during reset operations. When a reset is in progress, keys are
558+
// written to both the current storage location and the alternate location
559+
// being prepared for the reset.
560+
//
561+
// This ensures that keys written during a reset operation remain available
562+
// after the reset completes. Returns only the keys that were not previously
563+
// persisted.
564+
func (s *ResettableKeyStore) Put(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihash, error) {
565+
if s.resetState.Load() == inProgress {
566+
// Reset is in progress, write to alternate base in addition to current
567+
// base.
568+
batchPut(ctx, s.ds, s.altBase, s.prefixBits, keys)
569+
}
570+
return s.keyStore.Put(ctx, keys...)
571+
}

provider/datastore/keystore_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func TestKeyStoreReset(t *testing.T) {
151151
ds := ds.NewMapDatastore()
152152
defer ds.Close()
153153

154-
store, err := NewKeyStore(ds)
154+
store, err := NewResettableKeyStore(ds)
155155
if err != nil {
156156
t.Fatal(err)
157157
}
@@ -289,7 +289,7 @@ func TestKeyStoreSize(t *testing.T) {
289289
}
290290

291291
func TestDsKey(t *testing.T) {
292-
s := KeyStore{
292+
s := keyStore{
293293
base: ds.NewKey("/base/prefix"),
294294
prefixBits: 8,
295295
}

0 commit comments

Comments
 (0)