@@ -3,9 +3,11 @@ package datastore
3
3
import (
4
4
"context"
5
5
"encoding/base64"
6
+ "errors"
6
7
"fmt"
7
8
"strings"
8
9
"sync"
10
+ "sync/atomic"
9
11
10
12
"github.com/ipfs/go-cid"
11
13
ds "github.com/ipfs/go-datastore"
@@ -18,8 +20,19 @@ import (
18
20
"github.com/probe-lab/go-libdht/kad/key/bitstr"
19
21
)
20
22
21
- // KeyStore indexes multihashes by their kademlia identifier.
22
- type KeyStore struct {
23
+ // KeyStore provides thread-safe storage and retrieval of multihashes, indexed
24
+ // by their kademlia 256-bit identifier.
25
+ type KeyStore interface {
26
+ Put (context.Context , ... mh.Multihash ) ([]mh.Multihash , error )
27
+ Get (context.Context , bitstr.Key ) ([]mh.Multihash , error )
28
+ ContainsPrefix (context.Context , bitstr.Key ) (bool , error )
29
+ Delete (context.Context , ... mh.Multihash ) error
30
+ Empty (context.Context ) error
31
+ Size (context.Context ) (int , error )
32
+ }
33
+
34
+ // keyStore indexes multihashes by their kademlia identifier.
35
+ type keyStore struct {
23
36
lk sync.Mutex
24
37
25
38
ds ds.Batching
@@ -91,50 +104,22 @@ func WithBatchSize(size int) KeyStoreOption {
91
104
}
92
105
93
106
// NewKeyStore creates a new KeyStore backed by the provided datastore.
94
- func NewKeyStore (d ds.Batching , opts ... KeyStoreOption ) (* KeyStore , error ) {
107
+ func NewKeyStore (d ds.Batching , opts ... KeyStoreOption ) (KeyStore , error ) {
95
108
var cfg keyStoreCfg
96
109
opts = append ([]KeyStoreOption {KeyStoreDefaultCfg }, opts ... )
97
110
for i , o := range opts {
98
111
if err := o (& cfg ); err != nil {
99
112
return nil , fmt .Errorf ("KeyStore option %d failed: %w" , i , err )
100
113
}
101
114
}
102
- return & KeyStore {
115
+ return & keyStore {
103
116
ds : d ,
104
117
base : ds .NewKey (cfg .base ),
105
118
prefixBits : cfg .prefixBits ,
106
119
batchSize : cfg .batchSize ,
107
120
}, nil
108
121
}
109
122
110
- // ResetCids purges the KeyStore and repopulates it with the provided cids'
111
- // multihashes.
112
- func (s * KeyStore ) ResetCids (ctx context.Context , keysChan <- chan cid.Cid ) error {
113
- s .lk .Lock ()
114
- defer s .lk .Unlock ()
115
-
116
- err := s .emptyLocked (ctx )
117
- if err != nil {
118
- return fmt .Errorf ("KeyStore empty failed during reset: %w" , err )
119
- }
120
- keys := make ([]mh.Multihash , 0 , s .batchSize )
121
- for c := range keysChan {
122
- keys = append (keys , c .Hash ())
123
- if len (keys ) == cap (keys ) {
124
- _ , err = s .putLocked (ctx , keys ... )
125
- if err != nil {
126
- return fmt .Errorf ("KeyStore put failed during reset: %w" , err )
127
- }
128
- keys = keys [:0 ]
129
- }
130
- }
131
- _ , err = s .putLocked (ctx , keys ... )
132
- if err != nil {
133
- return fmt .Errorf ("KeyStore put failed during reset: %w" , err )
134
- }
135
- return nil
136
- }
137
-
138
123
// dsKey returns the datastore key for the provided binary key.
139
124
//
140
125
// The function creates a hierarchical datastore key by expanding bits into
@@ -179,7 +164,7 @@ func dsKey[K kad.Key[K]](k K, prefixBits int, base ds.Key) ds.Key {
179
164
// "base/bit0/bit1/.../bitN/base64url_suffix"
180
165
//
181
166
// Returns the reconstructed 256-bit key or an error if base64URL decoding fails.
182
- func (s * KeyStore ) decodeKey (dsk string ) (bit256.Key , error ) {
167
+ func (s * keyStore ) decodeKey (dsk string ) (bit256.Key , error ) {
183
168
dsk = dsk [len (s .base .String ()):] // remove leading prefix
184
169
bs := make ([]byte , 32 )
185
170
// Extract individual bits from odd positions (skip '/' separators)
@@ -204,7 +189,7 @@ type pair struct {
204
189
205
190
// putLocked stores the provided keys while assuming s.lk is already held, and
206
191
// returns the keys that weren't present already in the keystore.
207
- func (s * KeyStore ) putLocked (ctx context.Context , keys ... mh.Multihash ) ([]mh.Multihash , error ) {
192
+ func (s * keyStore ) putLocked (ctx context.Context , keys ... mh.Multihash ) ([]mh.Multihash , error ) {
208
193
seen := make (map [bit256.Key ]struct {}, len (keys ))
209
194
toPut := make ([]pair , 0 , len (keys ))
210
195
@@ -252,7 +237,7 @@ func (s *KeyStore) putLocked(ctx context.Context, keys ...mh.Multihash) ([]mh.Mu
252
237
// Put stores the provided keys in the underlying datastore, grouping them by
253
238
// the first prefixLen bits. It returns only the keys that were not previously
254
239
// persisted in the datastore (i.e., newly added keys).
255
- func (s * KeyStore ) Put (ctx context.Context , keys ... mh.Multihash ) ([]mh.Multihash , error ) {
240
+ func (s * keyStore ) Put (ctx context.Context , keys ... mh.Multihash ) ([]mh.Multihash , error ) {
256
241
if len (keys ) == 0 {
257
242
return nil , nil
258
243
}
@@ -264,7 +249,7 @@ func (s *KeyStore) Put(ctx context.Context, keys ...mh.Multihash) ([]mh.Multihas
264
249
265
250
// Get returns all keys whose bit256 representation matches the provided
266
251
// prefix.
267
- func (s * KeyStore ) Get (ctx context.Context , prefix bitstr.Key ) ([]mh.Multihash , error ) {
252
+ func (s * keyStore ) Get (ctx context.Context , prefix bitstr.Key ) ([]mh.Multihash , error ) {
268
253
s .lk .Lock ()
269
254
defer s .lk .Unlock ()
270
255
@@ -300,7 +285,7 @@ func (s *KeyStore) Get(ctx context.Context, prefix bitstr.Key) ([]mh.Multihash,
300
285
// ContainsPrefix reports whether the KeyStore currently holds at least one
301
286
// multihash whose kademlia identifier (bit256.Key) starts with the provided
302
287
// bit-prefix.
303
- func (s * KeyStore ) ContainsPrefix (ctx context.Context , prefix bitstr.Key ) (bool , error ) {
288
+ func (s * keyStore ) ContainsPrefix (ctx context.Context , prefix bitstr.Key ) (bool , error ) {
304
289
s .lk .Lock ()
305
290
defer s .lk .Unlock ()
306
291
@@ -337,8 +322,8 @@ func (s *KeyStore) ContainsPrefix(ctx context.Context, prefix bitstr.Key) (bool,
337
322
338
323
// emptyLocked deletes all entries under the datastore prefix, assuming s.lk is
339
324
// already held.
340
- func ( s * KeyStore ) emptyLocked (ctx context.Context ) error {
341
- res , err := s . ds . Query (ctx , query.Query {Prefix : s . base .String (), KeysOnly : true })
325
+ func emptyLocked (ctx context.Context , d ds. Batching , base ds. Key , batchSize int ) error {
326
+ res , err := d . Query (ctx , query.Query {Prefix : base .String (), KeysOnly : true })
342
327
if err != nil {
343
328
return err
344
329
}
@@ -348,7 +333,7 @@ func (s *KeyStore) emptyLocked(ctx context.Context) error {
348
333
if len (keys ) == 0 {
349
334
return nil
350
335
}
351
- b , err := s . ds .Batch (ctx )
336
+ b , err := d .Batch (ctx )
352
337
if err != nil {
353
338
return nil
354
339
}
@@ -359,7 +344,7 @@ func (s *KeyStore) emptyLocked(ctx context.Context) error {
359
344
}
360
345
return b .Commit (ctx )
361
346
}
362
- keys := make ([]ds.Key , 0 , s . batchSize )
347
+ keys := make ([]ds.Key , 0 , batchSize )
363
348
for r := range res .Next () {
364
349
if r .Error != nil {
365
350
return r .Error
@@ -376,15 +361,15 @@ func (s *KeyStore) emptyLocked(ctx context.Context) error {
376
361
}
377
362
378
363
// Empty deletes all entries under the datastore prefix.
379
- func (s * KeyStore ) Empty (ctx context.Context ) error {
364
+ func (s * keyStore ) Empty (ctx context.Context ) error {
380
365
s .lk .Lock ()
381
366
defer s .lk .Unlock ()
382
367
383
- return s . emptyLocked (ctx )
368
+ return emptyLocked (ctx , s . ds , s . base , s . batchSize )
384
369
}
385
370
386
371
// Delete removes the given keys from datastore.
387
- func (s * KeyStore ) Delete (ctx context.Context , keys ... mh.Multihash ) error {
372
+ func (s * keyStore ) Delete (ctx context.Context , keys ... mh.Multihash ) error {
388
373
if len (keys ) == 0 {
389
374
return nil
390
375
}
@@ -409,7 +394,7 @@ func (s *KeyStore) Delete(ctx context.Context, keys ...mh.Multihash) error {
409
394
//
410
395
// The size is obtained by iterating over all keys in the underlying
411
396
// datastore, so it may be expensive for large stores.
412
- func (s * KeyStore ) Size (ctx context.Context ) (size int , err error ) {
397
+ func (s * keyStore ) Size (ctx context.Context ) (size int , err error ) {
413
398
s .lk .Lock ()
414
399
defer s .lk .Unlock ()
415
400
@@ -429,3 +414,158 @@ func (s *KeyStore) Size(ctx context.Context) (size int, err error) {
429
414
}
430
415
return
431
416
}
417
+
418
+ var ErrResetInProgress = errors .New ("reset already in progress" )
419
+
420
+ const (
421
+ ready uint32 = iota
422
+ inProgress
423
+ cleanup
424
+ )
425
+
426
+ // ResettableKeyStore is a KeyStore implementation that supports atomic reset
427
+ // operations. It maintains two alternate bases in the underlying datastore and
428
+ // can swap between them to provide atomic replacement of all stored keys.
429
+ //
430
+ // The reset operation allows replacing all stored multihashes with a new set
431
+ // without interrupting concurrent read/write operations. During a reset, new
432
+ // writes are duplicated to both the current and alternate storage bases to
433
+ // maintain consistency.
434
+ type ResettableKeyStore struct {
435
+ keyStore
436
+
437
+ altBase ds.Key
438
+ // resetInProgress tracks the state of the reset operation:
439
+ // - 0: no reset in progress, ready for new reset
440
+ // - 1: reset in progress, writing to alternate base
441
+ // - 2: post reset cleanup
442
+ resetState atomic.Uint32
443
+ }
444
+
445
+ var _ KeyStore = (* ResettableKeyStore )(nil )
446
+
447
+ // NewResettableKeyStore creates a new ResettableKeyStore backed by the
448
+ // provided datastore. It automatically adds "/0" and "/1" suffixes to the
449
+ // configured base prefix to create two alternate storage locations for atomic
450
+ // reset operations.
451
+ func NewResettableKeyStore (d ds.Batching , opts ... KeyStoreOption ) (* ResettableKeyStore , error ) {
452
+ var cfg keyStoreCfg
453
+ opts = append ([]KeyStoreOption {KeyStoreDefaultCfg }, opts ... )
454
+ for i , o := range opts {
455
+ if err := o (& cfg ); err != nil {
456
+ return nil , fmt .Errorf ("KeyStore option %d failed: %w" , i , err )
457
+ }
458
+ }
459
+ return & ResettableKeyStore {
460
+ keyStore : keyStore {
461
+ ds : d ,
462
+ base : ds .NewKey (cfg .base ).ChildString ("0" ),
463
+ prefixBits : cfg .prefixBits ,
464
+ batchSize : cfg .batchSize ,
465
+ },
466
+ altBase : ds .NewKey (cfg .base ).ChildString ("1" ),
467
+ }, nil
468
+ }
469
+
470
+ func batchPut (ctx context.Context , d ds.Batching , base ds.Key , prefixBits int , keys []mh.Multihash ) error {
471
+ b , err := d .Batch (ctx )
472
+ if err != nil {
473
+ return err
474
+ }
475
+ for _ , h := range keys {
476
+ dsk := dsKey (keyspace .MhToBit256 (h ), prefixBits , base )
477
+ if err := b .Put (ctx , dsk , h ); err != nil {
478
+ return err
479
+ }
480
+ }
481
+ return b .Commit (ctx )
482
+ }
483
+
484
+ // ResetCids atomically replaces all stored keys with the CIDs received from
485
+ // keysChan. The operation is thread-safe and non-blocking for concurrent reads
486
+ // and writes.
487
+ //
488
+ // During the reset:
489
+ // - New keys from keysChan are written to an alternate storage location
490
+ // - Concurrent Put operations are duplicated to both current and alternate
491
+ // locations
492
+ // - Once all keys are processed, storage locations are atomically swapped
493
+ // - The old storage location is cleaned up
494
+ //
495
+ // Returns ErrResetInProgress if another reset operation is already running.
496
+ // The operation can be cancelled via context, which will clean up partial
497
+ // state.
498
+ func (s * ResettableKeyStore ) ResetCids (ctx context.Context , keysChan <- chan cid.Cid ) error {
499
+ if ! s .resetState .CompareAndSwap (ready , inProgress ) {
500
+ return ErrResetInProgress
501
+ }
502
+ defer s .resetState .Store (ready )
503
+
504
+ // Make sure the alternate base is empty
505
+ if err := emptyLocked (ctx , s .ds , s .altBase , s .batchSize ); err != nil {
506
+ return fmt .Errorf ("ResettableKeyStore empty failed during reset: %w" , err )
507
+ }
508
+
509
+ // Read all the keys from the channel and put them in batch at the alternate
510
+ // base
511
+ keys := make ([]mh.Multihash , 0 , s .batchSize )
512
+ loop:
513
+ for {
514
+ select {
515
+ case <- ctx .Done ():
516
+ // Context cancelled, abort reset and clean up alternate base
517
+ s .resetState .Store (cleanup )
518
+ if err := emptyLocked (ctx , s .ds , s .altBase , s .batchSize ); err != nil {
519
+ return fmt .Errorf ("ResettableKeyStore empty failed during reset: %w" , err )
520
+ }
521
+ return nil
522
+ case c , ok := <- keysChan :
523
+ if ! ok {
524
+ break loop
525
+ }
526
+ keys = append (keys , c .Hash ())
527
+ if len (keys ) == cap (keys ) {
528
+ if err := batchPut (ctx , s .ds , s .altBase , s .prefixBits , keys ); err != nil {
529
+ return fmt .Errorf ("ResettableKeyStore put failed during reset: %w" , err )
530
+ }
531
+ keys = keys [:0 ]
532
+ }
533
+ }
534
+ }
535
+ // Put final batch
536
+ if err := batchPut (ctx , s .ds , s .altBase , s .prefixBits , keys ); err != nil {
537
+ return fmt .Errorf ("ResettableKeyStore put failed during reset: %w" , err )
538
+ }
539
+
540
+ // Swap the bases. The base in use is now the one to which we just wrote.
541
+ s .lk .Lock ()
542
+ oldBase := s .base
543
+ s .base = s .altBase
544
+ s .resetState .Store (cleanup )
545
+ s .lk .Unlock ()
546
+ s .altBase = oldBase
547
+
548
+ // Clean up the old base
549
+ if err := emptyLocked (ctx , s .ds , oldBase , s .batchSize ); err != nil {
550
+ return fmt .Errorf ("ResettableKeyStore empty failed during reset: %w" , err )
551
+ }
552
+
553
+ return nil
554
+ }
555
+
556
+ // Put stores the provided keys in the underlying datastore, with special
557
+ // handling during reset operations. When a reset is in progress, keys are
558
+ // written to both the current storage location and the alternate location
559
+ // being prepared for the reset.
560
+ //
561
+ // This ensures that keys written during a reset operation remain available
562
+ // after the reset completes. Returns only the keys that were not previously
563
+ // persisted.
564
+ func (s * ResettableKeyStore ) Put (ctx context.Context , keys ... mh.Multihash ) ([]mh.Multihash , error ) {
565
+ if s .resetState .Load () == inProgress {
566
+ // Reset is in progress, write to alternate base in addition to current
567
+ // base.
568
+ batchPut (ctx , s .ds , s .altBase , s .prefixBits , keys )
569
+ }
570
+ return s .keyStore .Put (ctx , keys ... )
571
+ }
0 commit comments