Skip to content

Commit 5088386

Browse files
use datastore namespace
1 parent 172e857 commit 5088386

File tree

3 files changed

+60
-44
lines changed

3 files changed

+60
-44
lines changed

provider/datastore/keystore.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ import (
88
"strings"
99

1010
ds "github.com/ipfs/go-datastore"
11-
query "github.com/ipfs/go-datastore/query"
11+
"github.com/ipfs/go-datastore/namespace"
12+
"github.com/ipfs/go-datastore/query"
1213
"github.com/libp2p/go-libp2p-kad-dht/provider/internal/keyspace"
1314
mh "github.com/multiformats/go-multihash"
1415

@@ -64,7 +65,6 @@ type operationResponse struct {
6465
// keyStore indexes multihashes by their kademlia identifier.
6566
type keyStore struct {
6667
ds ds.Batching
67-
base ds.Key
6868
prefixBits int
6969
batchSize int
7070

@@ -147,8 +147,7 @@ func NewKeyStore(d ds.Batching, opts ...KeyStoreOption) (KeyStore, error) {
147147
}
148148

149149
ks := &keyStore{
150-
ds: d,
151-
base: ds.NewKey(cfg.base),
150+
ds: namespace.Wrap(d, ds.NewKey(cfg.base)),
152151
prefixBits: cfg.prefixBits,
153152
batchSize: cfg.batchSize,
154153
requests: make(chan operation),
@@ -183,7 +182,7 @@ func NewKeyStore(d ds.Batching, opts ...KeyStoreOption) (KeyStore, error) {
183182
// If the prefix is longer than `prefixBits`, only the first `prefixBits` bits
184183
// are used, allowing the returned key to serve as a query prefix for the
185184
// datastore.
186-
func dsKey[K kad.Key[K]](k K, prefixBits int, base ds.Key) ds.Key {
185+
func dsKey[K kad.Key[K]](k K, prefixBits int) ds.Key {
187186
b := strings.Builder{}
188187
l := k.BitLen()
189188
for i := range min(prefixBits, l) {
@@ -193,7 +192,7 @@ func dsKey[K kad.Key[K]](k K, prefixBits int, base ds.Key) ds.Key {
193192
if l == keyspace.KeyLen {
194193
b.WriteString(base64.URLEncoding.EncodeToString(keyspace.KeyToBytes(k)[prefixBits/8:]))
195194
}
196-
return base.ChildString(b.String())
195+
return ds.NewKey(b.String())
197196
}
198197

199198
// decodeKey reconstructs a 256-bit binary key from a hierarchical datastore key string.
@@ -207,7 +206,6 @@ func dsKey[K kad.Key[K]](k K, prefixBits int, base ds.Key) ds.Key {
207206
//
208207
// Returns the reconstructed 256-bit key or an error if base64URL decoding fails.
209208
func (s *keyStore) decodeKey(dsk string) (bit256.Key, error) {
210-
dsk = dsk[len(s.base.String()):] // remove leading prefix
211209
bs := make([]byte, 32)
212210
// Extract individual bits from odd positions (skip '/' separators)
213211
for i := range s.prefixBits {
@@ -256,7 +254,7 @@ func (s *keyStore) worker() {
256254
op.response <- operationResponse{err: err}
257255

258256
case opEmpty:
259-
err := empty(op.ctx, s.ds, s.base, s.batchSize)
257+
err := empty(op.ctx, s.ds, s.batchSize)
260258
op.response <- operationResponse{err: err}
261259

262260
case opSize:
@@ -282,7 +280,7 @@ func (s *keyStore) put(ctx context.Context, keys []mh.Multihash) ([]mh.Multihash
282280
continue
283281
}
284282
seen[k] = struct{}{}
285-
dsk := dsKey(k, s.prefixBits, s.base)
283+
dsk := dsKey(k, s.prefixBits)
286284
ok, err := s.ds.Has(ctx, dsk)
287285
if err != nil {
288286
return nil, err
@@ -320,7 +318,7 @@ func (s *keyStore) put(ctx context.Context, keys []mh.Multihash) ([]mh.Multihash
320318
// get returns all keys whose bit256 representation matches the provided
321319
// prefix.
322320
func (s *keyStore) get(ctx context.Context, prefix bitstr.Key) ([]mh.Multihash, error) {
323-
dsk := dsKey(prefix, s.prefixBits, s.base).String()
321+
dsk := dsKey(prefix, s.prefixBits).String()
324322
res, err := s.ds.Query(ctx, query.Query{Prefix: dsk})
325323
if err != nil {
326324
return nil, err
@@ -353,7 +351,7 @@ func (s *keyStore) get(ctx context.Context, prefix bitstr.Key) ([]mh.Multihash,
353351
// multihash whose kademlia identifier (bit256.Key) starts with the provided
354352
// bit-prefix.
355353
func (s *keyStore) containsPrefix(ctx context.Context, prefix bitstr.Key) (bool, error) {
356-
dsk := dsKey(prefix, s.prefixBits, s.base).String()
354+
dsk := dsKey(prefix, s.prefixBits).String()
357355
longPrefix := prefix.BitLen() > s.prefixBits
358356
q := query.Query{Prefix: dsk, KeysOnly: true}
359357
if !longPrefix {
@@ -386,8 +384,8 @@ func (s *keyStore) containsPrefix(ctx context.Context, prefix bitstr.Key) (bool,
386384

387385
// empty deletes all entries under the datastore prefix, assuming s.lk is
388386
// already held.
389-
func empty(ctx context.Context, d ds.Batching, base ds.Key, batchSize int) error {
390-
res, err := d.Query(ctx, query.Query{Prefix: base.String(), KeysOnly: true})
387+
func empty(ctx context.Context, d ds.Batching, batchSize int) error {
388+
res, err := d.Query(ctx, query.Query{KeysOnly: true})
391389
if err != nil {
392390
return err
393391
}
@@ -431,7 +429,7 @@ func (s *keyStore) delete(ctx context.Context, keys []mh.Multihash) error {
431429
return err
432430
}
433431
for _, h := range keys {
434-
dsk := dsKey(keyspace.MhToBit256(h), s.prefixBits, s.base)
432+
dsk := dsKey(keyspace.MhToBit256(h), s.prefixBits)
435433
err := b.Delete(ctx, dsk)
436434
if err != nil {
437435
return err
@@ -442,7 +440,7 @@ func (s *keyStore) delete(ctx context.Context, keys []mh.Multihash) error {
442440

443441
// size returns the number of keys currently stored in the KeyStore.
444442
func (s *keyStore) size(ctx context.Context) (size int, err error) {
445-
q := query.Query{Prefix: s.base.String(), KeysOnly: true}
443+
q := query.Query{KeysOnly: true}
446444
res, err := s.ds.Query(ctx, q)
447445
if err != nil {
448446
return

provider/datastore/keystore_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -259,13 +259,12 @@ func testKeyStoreSizeImpl(t *testing.T, store KeyStore) {
259259

260260
func TestDsKey(t *testing.T) {
261261
s := keyStore{
262-
base: ds.NewKey("/base/prefix"),
263262
prefixBits: 8,
264263
}
265264

266265
k := bit256.ZeroKey()
267-
dsk := dsKey(k, s.prefixBits, s.base)
268-
expectedPrefix := "/base/prefix/0/0/0/0/0/0/0/0/"
266+
dsk := dsKey(k, s.prefixBits)
267+
expectedPrefix := "/0/0/0/0/0/0/0/0/"
269268
require.Equal(t, expectedPrefix, dsk.String()[:len(expectedPrefix)])
270269

271270
s.prefixBits = 16
@@ -276,8 +275,8 @@ func TestDsKey(t *testing.T) {
276275
require.NoError(t, err)
277276
k := bit256.NewKey(b[:])
278277

279-
sdk := dsKey(k, s.prefixBits, s.base)
280-
require.Equal(t, strings.Count(s.base.String(), "/")+s.prefixBits+1, strings.Count(sdk.String(), "/"))
278+
sdk := dsKey(k, s.prefixBits)
279+
require.Equal(t, s.prefixBits+1, strings.Count(sdk.String(), "/"))
281280
decoded, err := s.decodeKey(sdk.String())
282281
require.NoError(t, err)
283282
require.Equal(t, k, decoded)

provider/datastore/resettable_keystore.go

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/ipfs/go-cid"
99
ds "github.com/ipfs/go-datastore"
10+
"github.com/ipfs/go-datastore/namespace"
1011
"github.com/libp2p/go-libp2p-kad-dht/provider/internal/keyspace"
1112
mh "github.com/multiformats/go-multihash"
1213
)
@@ -26,17 +27,35 @@ type resetOp struct {
2627
}
2728

2829
// ResettableKeyStore is a KeyStore implementation that supports atomic reset
29-
// operations. It maintains two alternate bases in the underlying datastore and
30-
// can swap between them to provide atomic replacement of all stored keys.
30+
// operations using a dual-datastore architecture. It maintains two separate
31+
// datastores (primary and alternate) where only one is active at any time,
32+
// enabling atomic replacement of all stored keys without interrupting
33+
// concurrent operations.
3134
//
32-
// The reset operation allows replacing all stored multihashes with a new set
33-
// without interrupting concurrent read/write operations. During a reset, new
34-
// writes are duplicated to both the current and alternate storage bases to
35-
// maintain consistency.
35+
// Architecture:
36+
// - Primary datastore: Currently active storage for all read/write operations
37+
// - Alternate datastore: Standby storage used during reset operations
38+
// - The datastores use "/0" and "/1" namespace suffixes and can be swapped
39+
//
40+
// Reset Operation Flow:
41+
// 1. New keys from reset are written to the alternate (inactive) datastore
42+
// 2. Concurrent Put operations are automatically duplicated to both datastores
43+
// to maintain consistency during the transition
44+
// 3. Once all reset keys are written, the datastores are atomically swapped
45+
// 4. The old datastore (now alternate) is cleaned up
46+
//
47+
// Thread Safety:
48+
// - All operations are processed sequentially by a single worker goroutine
49+
// - Reset operations are non-blocking for concurrent reads and writes
50+
// - Only one reset operation can be active at a time
51+
//
52+
// The reset operation allows complete replacement of stored multihashes
53+
// without data loss or service interruption, making it suitable for
54+
// scenarios requiring periodic full dataset updates.
3655
type ResettableKeyStore struct {
3756
keyStore
3857

39-
altBase ds.Key
58+
altDs ds.Batching
4059
resetInProgress bool
4160
resetSync chan []mh.Multihash // passes keys from worker to reset go routine
4261
resetOps chan resetOp // reset operations that must be run in main go routine
@@ -59,15 +78,14 @@ func NewResettableKeyStore(d ds.Batching, opts ...KeyStoreOption) (*ResettableKe
5978

6079
rks := &ResettableKeyStore{
6180
keyStore: keyStore{
62-
ds: d,
63-
base: ds.NewKey(cfg.base).ChildString("0"),
81+
ds: namespace.Wrap(d, ds.NewKey(cfg.base+"/0")),
6482
prefixBits: cfg.prefixBits,
6583
batchSize: cfg.batchSize,
6684
requests: make(chan operation),
6785
close: make(chan struct{}),
6886
done: make(chan struct{}),
6987
},
70-
altBase: ds.NewKey(cfg.base).ChildString("1"),
88+
altDs: namespace.Wrap(d, ds.NewKey(cfg.base+"/1")),
7189
resetOps: make(chan resetOp),
7290
resetSync: make(chan []mh.Multihash, 128), // buffered to avoid blocking
7391
}
@@ -105,7 +123,7 @@ func (s *ResettableKeyStore) worker() {
105123
op.response <- operationResponse{err: err}
106124

107125
case opEmpty:
108-
err := empty(op.ctx, s.ds, s.base, s.batchSize)
126+
err := empty(op.ctx, s.ds, s.batchSize)
109127
op.response <- operationResponse{err: err}
110128

111129
case opSize:
@@ -127,20 +145,21 @@ func (s *ResettableKeyStore) worker() {
127145
// handling during reset operations.
128146
func (s *ResettableKeyStore) put(ctx context.Context, keys []mh.Multihash) ([]mh.Multihash, error) {
129147
if s.resetInProgress {
130-
// Reset is in progress, write to alternate base in addition to current base
148+
// Reset is in progress, write to alternate datastore in addition to
149+
// current datastore
131150
s.resetSync <- keys
132151
}
133152
return s.keyStore.put(ctx, keys)
134153
}
135154

136-
// altPut writes the given multihashes to the alternate base in the datastore.
155+
// altPut writes the given multihashes to the alternate datastore.
137156
func (s *ResettableKeyStore) altPut(ctx context.Context, keys []mh.Multihash) error {
138-
b, err := s.ds.Batch(ctx)
157+
b, err := s.altDs.Batch(ctx)
139158
if err != nil {
140159
return err
141160
}
142161
for _, h := range keys {
143-
dsk := dsKey(keyspace.MhToBit256(h), s.prefixBits, s.altBase)
162+
dsk := dsKey(keyspace.MhToBit256(h), s.prefixBits)
144163
if err := b.Put(ctx, dsk, h); err != nil {
145164
return err
146165
}
@@ -155,7 +174,7 @@ func (s *ResettableKeyStore) handleResetOp(op resetOp) {
155174
op.response <- ErrResetInProgress
156175
return
157176
}
158-
if err := empty(context.Background(), s.ds, s.altBase, s.batchSize); err != nil {
177+
if err := empty(context.Background(), s.altDs, s.batchSize); err != nil {
159178
op.response <- err
160179
return
161180
}
@@ -166,10 +185,10 @@ func (s *ResettableKeyStore) handleResetOp(op resetOp) {
166185

167186
// Cleanup operation
168187
if op.success {
169-
// Swap the keystore prefix bases.
170-
oldBase := s.base
171-
s.base = s.altBase
172-
s.altBase = oldBase
188+
// Swap the active datastore.
189+
oldDs := s.ds
190+
s.ds = s.altDs
191+
s.altDs = oldDs
173192
}
174193
// Drain resetSync
175194
drain:
@@ -180,9 +199,9 @@ drain:
180199
break drain
181200
}
182201
}
183-
// Empty the unused base prefix
202+
// Empty the unused datastore.
184203
s.resetInProgress = false
185-
op.response <- empty(context.Background(), s.ds, s.altBase, s.batchSize)
204+
op.response <- empty(context.Background(), s.altDs, s.batchSize)
186205
}
187206

188207
// ResetCids atomically replaces all stored keys with the CIDs received from
@@ -231,7 +250,7 @@ func (s *ResettableKeyStore) ResetCids(ctx context.Context, keysChan <-chan cid.
231250
case <-s.done:
232251
// Safe not to go through the worker since we are done, and we need to
233252
// cleanup
234-
empty(context.Background(), s.ds, s.altBase, s.batchSize)
253+
empty(context.Background(), s.altDs, s.batchSize)
235254
}
236255
}()
237256

@@ -260,7 +279,7 @@ func (s *ResettableKeyStore) ResetCids(ctx context.Context, keysChan <-chan cid.
260279
return nil
261280
}
262281

263-
// Read all the keys from the channel and put them in batch at the alternate base
282+
// Read all the keys from the channel and write them to the altDs
264283
loop:
265284
for {
266285
select {

0 commit comments

Comments
 (0)