Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions store/bolt_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,34 @@ func (s *boltStore) Scan(ctx context.Context, start []byte, end []byte, limit in
return res, errors.WithStack(err)
}

func (s *boltStore) ScanKeys(ctx context.Context, start []byte, end []byte, limit int) ([][]byte, error) {
s.log.InfoContext(ctx, "ScanKeys",
slog.String("start", string(start)),
slog.String("end", string(end)),
slog.Int("limit", limit),
)

var res [][]byte

err := s.bbolt.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
if b == nil {
return nil
}

c := b.Cursor()
for k, _ := c.Seek(start); k != nil && (end == nil || bytes.Compare(k, end) < 0); k, _ = c.Next() {
res = append(res, append([]byte(nil), k...))
if len(res) >= limit {
break
}
}
return nil
})

return res, errors.WithStack(err)
}

func (s *boltStore) Put(ctx context.Context, key []byte, value []byte) error {
s.log.InfoContext(ctx, "put",
slog.String("key", string(key)),
Expand Down
39 changes: 39 additions & 0 deletions store/bolt_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,45 @@ func TestBoltStore_Scan(t *testing.T) {
assert.Equal(t, 100, cnt)
}

func TestBoltStore_ScanKeys(t *testing.T) {
ctx := context.Background()
t.Parallel()
st := mustStore(NewBoltStore(t.TempDir() + "/bolt.db"))

for i := 0; i < 999; i++ {
keyStr := "prefix " + strconv.Itoa(i) + "foo"
key := []byte(keyStr)
b := make([]byte, 8)
binary.PutVarint(b, int64(i))
err := st.Put(ctx, key, b)
assert.NoError(t, err)
}

res, err := st.ScanKeys(ctx, []byte("prefix"), []byte("z"), 100)
assert.NoError(t, err)
assert.Equal(t, 100, len(res))

sortedKeys := make([][]byte, 999)

for _, k := range res {
str := string(k)
i, err := strconv.Atoi(str[7 : len(str)-3])
assert.NoError(t, err)
sortedKeys[i] = k
}

cnt := 0
for i, k := range sortedKeys {
if k == nil {
continue
}
cnt++
assert.Equal(t, []byte("prefix "+strconv.Itoa(i)+"foo"), k)
}

assert.Equal(t, 100, cnt)
}

func TestBoltStore_Txn(t *testing.T) {
t.Parallel()
t.Run("success", func(t *testing.T) {
Expand Down
30 changes: 30 additions & 0 deletions store/rb_memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,36 @@ func (s *rbMemoryStore) Scan(ctx context.Context, start []byte, end []byte, limi
return result, nil
}

func (s *rbMemoryStore) ScanKeys(ctx context.Context, start []byte, end []byte, limit int) ([][]byte, error) {
s.mtx.RLock()
defer s.mtx.RUnlock()

var result [][]byte

s.tree.Each(func(key interface{}, _ interface{}) {
k, ok := key.([]byte)
if !ok {
return
}

if start != nil && bytes.Compare(k, start) < 0 {
return
}

if end != nil && bytes.Compare(k, end) > 0 {
return
}

if len(result) >= limit {
return
}

result = append(result, k)

})
return result, nil
}
Comment on lines +129 to +157
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This implementation has two major issues:

  1. Correctness (Critical): It returns direct references to the key slices stored in the tree (result = append(result, k)). This breaks encapsulation, as a caller could modify the returned slice and corrupt the internal state of the treemap, leading to unpredictable behavior.
  2. Performance (High): It uses s.tree.Each which iterates over all elements in the store, regardless of the scan range or limit. This is very inefficient for range scans on large datasets. The if len(result) >= limit check does not stop the iteration early; it only prevents appending more items.

You should refactor this to use a treemap.Iterator to scan the range efficiently, and ensure you return copies of the keys to maintain encapsulation.


func (s *rbMemoryStore) Put(ctx context.Context, key []byte, value []byte) error {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand Down
39 changes: 39 additions & 0 deletions store/rb_memory_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,45 @@ func TestRbMemoryStore_Scan(t *testing.T) {
assert.Equal(t, 100, cnt)
}

func TestRbMemoryStore_ScanKeys(t *testing.T) {
ctx := context.Background()
t.Parallel()
st := NewRbMemoryStore()

for i := 0; i < 9999; i++ {
keyStr := "prefix " + strconv.Itoa(i) + "foo"
key := []byte(keyStr)
b := make([]byte, 8)
binary.PutVarint(b, int64(i))
err := st.Put(ctx, key, b)
assert.NoError(t, err)
}

res, err := st.ScanKeys(ctx, []byte("prefix"), []byte("z"), 100)
assert.NoError(t, err)
assert.Equal(t, 100, len(res))

sortedKeys := make([][]byte, 9999)

for _, k := range res {
str := string(k)
i, err := strconv.Atoi(str[7 : len(str)-3])
assert.NoError(t, err)
sortedKeys[i] = k
}

cnt := 0
for i, k := range sortedKeys {
if k == nil {
continue
}
cnt++
assert.Equal(t, []byte("prefix "+strconv.Itoa(i)+"foo"), k)
}

assert.Equal(t, 100, cnt)
}

func TestRbMemoryStore_Txn(t *testing.T) {
t.Parallel()
t.Run("success", func(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Store interface {
type ScanStore interface {
Store
Scan(ctx context.Context, start []byte, end []byte, limit int) ([]*KVPair, error)
ScanKeys(ctx context.Context, start []byte, end []byte, limit int) ([][]byte, error)
}

type TTLStore interface {
Expand All @@ -56,6 +57,7 @@ type Txn interface {
type ScanTxn interface {
Txn
Scan(ctx context.Context, start []byte, end []byte, limit int) ([]*KVPair, error)
ScanKeys(ctx context.Context, start []byte, end []byte, limit int) ([][]byte, error)
}

type TTLTxn interface {
Expand Down
Loading