Skip to content

Commit 18c0a17

Browse files
committed
add RemoveByRange
1 parent 47ef8aa commit 18c0a17

File tree

5 files changed

+80
-8
lines changed

5 files changed

+80
-8
lines changed

storage/operation/badgerimpl/writer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ func (b *ReaderBatchWriter) Delete(key []byte) error {
103103

104104
// DeleteByRange removes all keys with a prefix that falls within the
105105
// range [start, end], both inclusive.
106-
// No errors expected during normal operation
106+
// It returns error if endPrefix < startPrefix
107+
// no other errors are expected during normal operation
107108
func (b *ReaderBatchWriter) DeleteByRange(globalReader storage.Reader, startPrefix, endPrefix []byte) error {
108109
err := operation.IterateKeysInPrefixRange(startPrefix, endPrefix, func(key []byte) error {
109110
err := b.batch.Delete(key)

storage/operation/pebbleimpl/writer.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package pebbleimpl
22

33
import (
4+
"bytes"
45
"fmt"
56

67
"github.com/cockroachdb/pebble"
@@ -102,8 +103,13 @@ func (b *ReaderBatchWriter) Delete(key []byte) error {
102103

103104
// DeleteByRange removes all keys with a prefix that falls within the
104105
// range [start, end], both inclusive.
105-
// No errors expected during normal operation
106+
// It returns error if endPrefix < startPrefix
107+
// no other errors are expected during normal operation
106108
func (b *ReaderBatchWriter) DeleteByRange(globalReader storage.Reader, startPrefix, endPrefix []byte) error {
109+
if bytes.Compare(startPrefix, endPrefix) > 0 {
110+
return fmt.Errorf("startPrefix key must be less than or equal to endPrefix key")
111+
}
112+
107113
// DeleteRange takes the prefix range with start (inclusive) and end (exclusive, note: not inclusive).
108114
// therefore, we need to increment the endPrefix to make it inclusive.
109115
start, end, hasUpperBound := storage.StartEndPrefixToLowerUpperBound(startPrefix, endPrefix)

storage/operation/reads.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ import (
1717
// return (false, err) if running into any exception, the iteration should be stopped.
1818
type CheckFunc func(key []byte) (bool, error)
1919

20-
// createFunc returns a pointer to an initialized entity that we can potentially
20+
// CreateFunc returns a pointer to an initialized entity that we can potentially
2121
// decode the next value into during a badger DB iteration.
2222
type CreateFunc func() interface{}
2323

24-
// handleFunc is a function that starts the processing of the current key-value
24+
// HandleFunc is a function that starts the processing of the current key-value
2525
// pair during a badger iteration. It should be called after the key was checked
2626
// and the entity was decoded.
2727
// No errors are expected during normal operation. Any errors will halt the iteration.

storage/operation/writes.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package operation
22

33
import (
4+
"bytes"
5+
"fmt"
6+
47
"github.com/vmihailenco/msgpack"
58

69
"github.com/onflow/flow-go/module/irrecoverable"
@@ -43,13 +46,22 @@ func Remove(key []byte) func(storage.Writer) error {
4346
}
4447
}
4548

46-
// RemoveByPrefix removes all keys with the given prefix defined by [startPrefix, endPrefix] (both inclusive).
47-
// If no keys exist with the given prefix, this is a no-op.
49+
// RemoveByPrefix removes all keys with the given prefix
4850
// Error returns:
4951
// * generic error in case of unexpected database error
5052
func RemoveByPrefix(reader storage.Reader, key []byte) func(storage.Writer) error {
53+
return RemoveByRange(reader, key, key)
54+
}
55+
56+
// RemoveByRange removes all keys with a prefix that falls within the range [start, end], both inclusive.
57+
// It returns error if endPrefix < startPrefix
58+
// no other errors are expected during normal operation
59+
func RemoveByRange(reader storage.Reader, startPrefix []byte, endPrefix []byte) func(storage.Writer) error {
5160
return func(w storage.Writer) error {
52-
err := w.DeleteByRange(reader, key, key)
61+
if bytes.Compare(startPrefix, endPrefix) > 0 {
62+
return fmt.Errorf("startPrefix key must be less than or equal to endPrefix key")
63+
}
64+
err := w.DeleteByRange(reader, startPrefix, endPrefix)
5365
if err != nil {
5466
return irrecoverable.NewExceptionf("could not delete item: %w", err)
5567
}

storage/operation/writes_test.go

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func TestConcurrentRemove(t *testing.T) {
192192
})
193193
}
194194

195-
func TestRemoveRange(t *testing.T) {
195+
func TestRemoveByPrefix(t *testing.T) {
196196
dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) {
197197

198198
// Define the prefix
@@ -244,6 +244,59 @@ func TestRemoveRange(t *testing.T) {
244244
})
245245
}
246246

247+
func TestRemoveByRange(t *testing.T) {
248+
dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) {
249+
250+
startPrefix, endPrefix := []byte{0x10}, []byte{0x12}
251+
// Create a range of keys around the boundaries of the prefix
252+
keys := [][]byte{
253+
{0x09, 0xff},
254+
// within the range
255+
{0x10, 0x00},
256+
{0x10, 0x50},
257+
{0x10, 0xff},
258+
{0x11},
259+
{0x12},
260+
{0x12, 0x00},
261+
{0x12, 0xff},
262+
// after end -> not included in range
263+
{0x13},
264+
{0x1A, 0xff},
265+
}
266+
267+
// Keys expected to be in the prefix range
268+
includeStart, includeEnd := 1, 7
269+
270+
// Insert the keys into the storage
271+
require.NoError(t, withWriter(func(writer storage.Writer) error {
272+
for _, key := range keys {
273+
value := []byte{0x00} // value are skipped, doesn't matter
274+
err := operation.Upsert(key, value)(writer)
275+
if err != nil {
276+
return err
277+
}
278+
}
279+
return nil
280+
}))
281+
282+
// Remove the keys in the prefix range
283+
require.NoError(t, withWriter(operation.RemoveByRange(r, startPrefix, endPrefix)))
284+
285+
// Verify that the keys in the prefix range have been removed
286+
for i, key := range keys {
287+
var exists bool
288+
require.NoError(t, operation.Exists(key, &exists)(r))
289+
t.Logf("key %x exists: %t", key, exists)
290+
291+
deleted := includeStart <= i && i <= includeEnd
292+
293+
// An item that was not deleted must exist
294+
require.Equal(t, !deleted, exists,
295+
"expected key %x to be %s", key, map[bool]string{true: "deleted", false: "not deleted"})
296+
}
297+
})
298+
}
299+
247300
func TestRemoveFrom(t *testing.T) {
248301
dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) {
249302

0 commit comments

Comments
 (0)