Skip to content

Commit e5ae317

Browse files
committed
extract functions out of functors
1 parent 05862f6 commit e5ae317

File tree

5 files changed

+148
-121
lines changed

5 files changed

+148
-121
lines changed

storage/operation/badgerimpl/writer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (b *ReaderBatchWriter) Delete(key []byte) error {
106106
// It returns error if endPrefix < startPrefix
107107
// no other errors are expected during normal operation
108108
func (b *ReaderBatchWriter) DeleteByRange(globalReader storage.Reader, startPrefix, endPrefix []byte) error {
109-
err := operation.IterateKeysInPrefixRange(startPrefix, endPrefix, func(key []byte) error {
109+
err := operation.Iterate(startPrefix, endPrefix, func(key []byte) error {
110110
err := b.batch.Delete(key)
111111
if err != nil {
112112
return fmt.Errorf("could not add key to delete batch (%v): %w", key, err)

storage/operation/pebbleimpl/writer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,11 @@ func (b *ReaderBatchWriter) DeleteByRange(globalReader storage.Reader, startPref
117117
return b.batch.DeleteRange(start, end, pebble.Sync)
118118
}
119119

120-
return operation.IterateKeysInPrefixRange(startPrefix, endPrefix, func(key []byte) error {
120+
return operation.IterateKeysByPrefixRange(globalReader, startPrefix, endPrefix, func(key []byte) error {
121121
err := b.batch.Delete(key, pebble.Sync)
122122
if err != nil {
123123
return fmt.Errorf("could not add key to delete batch (%v): %w", key, err)
124124
}
125125
return nil
126-
})(globalReader)
126+
})
127127
}

storage/operation/reads.go

Lines changed: 98 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ import (
1414
// CheckFunc is a function that checks if the value should be read and decoded.
1515
// return (true, nil) to read the value and pass it to the CreateFunc and HandleFunc for decoding
1616
// return (false, nil) to skip reading the value
17-
// return (false, err) if running into any exception, the iteration should be stopped.
17+
// return (false, err) if running into any error, the iteration should be stopped.
18+
// when making a CheckFunc to be used in the IterationFunc to iterate over the keys, a sentinel error
19+
// can be defined and checked to stop the iteration early, such as finding the first key that match
20+
// certain condition.
1821
type CheckFunc func(key []byte) (bool, error)
1922

2023
// CreateFunc returns a pointer to an initialized entity that we can potentially
@@ -28,12 +31,13 @@ type CreateFunc func() interface{}
2831
type HandleFunc func() error
2932
type IterationFunc func() (CheckFunc, CreateFunc, HandleFunc)
3033

31-
// IterateKeysInPrefixRange will iterate over all entries in the database, where the key starts with a prefixes in
34+
// IterateKey will iterate over all entries in the database, where the key starts with a prefixes in
3235
// the range [startPrefix, endPrefix] (both inclusive). We require that startPrefix <= endPrefix (otherwise this
3336
// function errors). On every such key, the `check` function is called. If `check` errors, iteration is aborted.
37+
// In other words, error returned by the iteration functions will be propagated to the caller.
3438
// No errors expected during normal operations.
35-
func IterateKeysInPrefixRange(startPrefix []byte, endPrefix []byte, check func(key []byte) error) func(storage.Reader) error {
36-
return Iterate(startPrefix, endPrefix, func() (CheckFunc, CreateFunc, HandleFunc) {
39+
func IterateKeysByPrefixRange(r storage.Reader, startPrefix []byte, endPrefix []byte, check func(key []byte) error) error {
40+
return IterateKeys(r, startPrefix, endPrefix, func() (CheckFunc, CreateFunc, HandleFunc) {
3741
return func(key []byte) (bool, error) {
3842
err := check(key)
3943
if err != nil {
@@ -44,100 +48,88 @@ func IterateKeysInPrefixRange(startPrefix []byte, endPrefix []byte, check func(k
4448
}, storage.IteratorOption{IterateKeyOnly: true})
4549
}
4650

47-
// Iterate will iterate over all keys with prefixes in the given range [startPrefix, endPrefix] (both inclusive)
48-
func Iterate(startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) func(storage.Reader) error {
49-
return func(r storage.Reader) error {
51+
// IterateKey will iterate over all entries in the database, where the key starts with a prefixes in
52+
// the range [startPrefix, endPrefix] (both inclusive).
53+
// No errors expected during normal operations.
54+
func IterateKeys(r storage.Reader, startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) error {
55+
if len(startPrefix) == 0 {
56+
return fmt.Errorf("startPrefix prefix is empty")
57+
}
5058

51-
if len(startPrefix) == 0 {
52-
return fmt.Errorf("startPrefix prefix is empty")
53-
}
59+
if len(endPrefix) == 0 {
60+
return fmt.Errorf("endPrefix prefix is empty")
61+
}
5462

55-
if len(endPrefix) == 0 {
56-
return fmt.Errorf("endPrefix prefix is empty")
57-
}
63+
// Reverse iteration is not supported by pebble
64+
if bytes.Compare(startPrefix, endPrefix) > 0 {
65+
return fmt.Errorf("startPrefix key must be less than or equal to endPrefix key")
66+
}
5867

59-
// Reverse iteration is not supported by pebble
60-
if bytes.Compare(startPrefix, endPrefix) > 0 {
61-
return fmt.Errorf("startPrefix key must be less than or equal to endPrefix key")
62-
}
68+
it, err := r.NewIter(startPrefix, endPrefix, opt)
69+
if err != nil {
70+
return fmt.Errorf("can not create iterator: %w", err)
71+
}
72+
defer it.Close()
6373

64-
it, err := r.NewIter(startPrefix, endPrefix, opt)
65-
if err != nil {
66-
return fmt.Errorf("can not create iterator: %w", err)
67-
}
68-
defer it.Close()
74+
for it.First(); it.Valid(); it.Next() {
75+
item := it.IterItem()
76+
key := item.Key()
77+
78+
// initialize processing functions for iteration
79+
check, create, handle := iterFunc()
6980

70-
for it.First(); it.Valid(); it.Next() {
71-
item := it.IterItem()
72-
key := item.Key()
81+
keyCopy := make([]byte, len(key))
7382

74-
// initialize processing functions for iteration
75-
check, create, handle := iterFunc()
83+
// The underlying database may re-use and modify the backing memory of the returned key.
84+
// Tor safety we proactively make a copy before passing the key to the upper layer.
85+
copy(keyCopy, key)
7686

77-
keyCopy := make([]byte, len(key))
87+
// check if we should process the item at all
88+
shouldReadValue, err := check(keyCopy)
89+
if err != nil {
90+
return err
91+
}
92+
if !shouldReadValue { // skip reading value
93+
continue
94+
}
7895

79-
// The underlying database may re-use and modify the backing memory of the returned key.
80-
// Tor safety we proactively make a copy before passing the key to the upper layer.
81-
copy(keyCopy, key)
96+
err = item.Value(func(val []byte) error {
8297

83-
// check if we should process the item at all
84-
shouldReadValue, err := check(keyCopy)
98+
// decode into the entity
99+
entity := create()
100+
err = msgpack.Unmarshal(val, entity)
85101
if err != nil {
86-
return err
102+
return irrecoverable.NewExceptionf("could not decode entity: %w", err)
87103
}
88-
if !shouldReadValue { // skip reading value
89-
continue
90-
}
91-
92-
err = item.Value(func(val []byte) error {
93-
94-
// decode into the entity
95-
entity := create()
96-
err = msgpack.Unmarshal(val, entity)
97-
if err != nil {
98-
return irrecoverable.NewExceptionf("could not decode entity: %w", err)
99-
}
100-
101-
// process the entity
102-
err = handle()
103-
if err != nil {
104-
return fmt.Errorf("could not handle entity: %w", err)
105-
}
106-
107-
return nil
108-
})
109104

105+
// process the entity
106+
err = handle()
110107
if err != nil {
111-
return fmt.Errorf("could not process value: %w", err)
108+
return fmt.Errorf("could not handle entity: %w", err)
112109
}
113-
}
114110

115-
return nil
111+
return nil
112+
})
113+
114+
if err != nil {
115+
return fmt.Errorf("could not process value: %w", err)
116+
}
116117
}
118+
119+
return nil
117120
}
118121

119122
// Traverse will iterate over all keys with the given prefix
120-
func Traverse(prefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) func(storage.Reader) error {
121-
return Iterate(prefix, prefix, iterFunc, opt)
123+
// error returned by the iteration functions will be propagated to the caller.
124+
// No other errors are expected during normal operation.
125+
func TraverseByPrefix(r storage.Reader, prefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) error {
126+
return IterateKeys(r, prefix, prefix, iterFunc, opt)
122127
}
123128

124-
// Exists takes a key and a pointer to an a boolean variable `keyExists` as inputs and returns an function.
129+
// KeyExists returns true if a key exists in the database.
125130
// When this returned function is executed (and only then), it will write into the `keyExists` whether
126131
// the key exists.
127132
// No errors are expected during normal operation.
128-
func Exists(key []byte, keyExists *bool) func(storage.Reader) error {
129-
return func(r storage.Reader) error {
130-
exists, err := KeyExists(r, key)
131-
if err != nil {
132-
return err
133-
}
134-
*keyExists = exists
135-
return nil
136-
}
137-
}
138-
139-
// KeyExists returns true if a key exists in the database.
140-
// No errors are expected during normal operation.
141133
func KeyExists(r storage.Reader, key []byte) (bool, error) {
142134
_, closer, err := r.Get(key)
143135
if err != nil {
@@ -154,13 +146,6 @@ func KeyExists(r storage.Reader, key []byte) (bool, error) {
154146
return true, nil
155147
}
156148

157-
// Retrieve returns a functor that retrieves the binary data under the given key from the database
158-
func Retrieve(key []byte, entity interface{}) func(storage.Reader) error {
159-
return func(r storage.Reader) error {
160-
return RetrieveByKey(r, key, entity)
161-
}
162-
}
163-
164149
// RetrieveByKey will retrieve the binary data under the given key from the database
165150
// and decode it into the given entity. The provided entity needs to be a
166151
// pointer to an initialized entity of the correct type.
@@ -183,49 +168,47 @@ func RetrieveByKey(r storage.Reader, key []byte, entity interface{}) error {
183168
return nil
184169
}
185170

186-
// FindHighestAtOrBelow is for database entries that are indexed by block height. It is suitable to search
171+
// FindHighestAtOrBelowByPrefix is for database entries that are indexed by block height. It is suitable to search
187172
// keys with the format prefix` + `height` (where "+" denotes concatenation of binary strings). The height
188173
// is encoded as Big-Endian (entries with numerically smaller height have lexicographically smaller key).
189174
// The function finds the *highest* key with the given prefix and height equal to or below the given height.
190-
func FindHighestAtOrBelow(prefix []byte, height uint64, entity interface{}) func(storage.Reader) error {
191-
return func(r storage.Reader) error {
192-
if len(prefix) == 0 {
193-
return fmt.Errorf("prefix must not be empty")
194-
}
195-
196-
key := append(prefix, EncodeKeyPart(height)...)
197-
it, err := r.NewIter(prefix, key, storage.DefaultIteratorOptions())
198-
if err != nil {
199-
return fmt.Errorf("can not create iterator: %w", err)
200-
}
201-
defer it.Close()
175+
func FindHighestAtOrBelowByPrefix(r storage.Reader, prefix []byte, height uint64, entity interface{}) error {
176+
if len(prefix) == 0 {
177+
return fmt.Errorf("prefix must not be empty")
178+
}
202179

203-
var highestKey []byte
180+
key := append(prefix, EncodeKeyPart(height)...)
181+
it, err := r.NewIter(prefix, key, storage.DefaultIteratorOptions())
182+
if err != nil {
183+
return fmt.Errorf("can not create iterator: %w", err)
184+
}
185+
defer it.Close()
204186

205-
// find highest value below the given height
206-
for it.First(); it.Valid(); it.Next() {
207-
// copy the key to avoid the underlying slices of the key
208-
// being modified by the Next() call
209-
highestKey = it.IterItem().KeyCopy(highestKey)
210-
}
187+
var highestKey []byte
211188

212-
if len(highestKey) == 0 {
213-
return storage.ErrNotFound
214-
}
189+
// find highest value below the given height
190+
for it.First(); it.Valid(); it.Next() {
191+
// copy the key to avoid the underlying slices of the key
192+
// being modified by the Next() call
193+
highestKey = it.IterItem().KeyCopy(highestKey)
194+
}
215195

216-
// read the value of the highest key
217-
val, closer, err := r.Get(highestKey)
218-
if err != nil {
219-
return err
220-
}
196+
if len(highestKey) == 0 {
197+
return storage.ErrNotFound
198+
}
221199

222-
defer closer.Close()
200+
// read the value of the highest key
201+
val, closer, err := r.Get(highestKey)
202+
if err != nil {
203+
return err
204+
}
223205

224-
err = msgpack.Unmarshal(val, entity)
225-
if err != nil {
226-
return irrecoverable.NewExceptionf("failed to decode value: %w", err)
227-
}
206+
defer closer.Close()
228207

229-
return nil
208+
err = msgpack.Unmarshal(val, entity)
209+
if err != nil {
210+
return irrecoverable.NewExceptionf("failed to decode value: %w", err)
230211
}
212+
213+
return nil
231214
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package operation
2+
3+
import "github.com/onflow/flow-go/storage"
4+
5+
// Leo: This package includes deprecated functions that wraps the operation of reading from the database.
6+
// They are needed because the original badger implementation is also implemented in the same wrapped function manner,
7+
// since badger requires reads to be done in a transaction, which is stateful.
8+
// Using these deprecated functions could minimize the changes during refactor and easier to review the changes.
9+
// The simplified implementation of the functions are in the reads.go file, which are encouraged to be used instead.
10+
11+
func Iterate(startPrefix []byte, endPrefix []byte, check func(key []byte) error) func(storage.Reader) error {
12+
return func(r storage.Reader) error {
13+
return IterateKeysByPrefixRange(r, startPrefix, endPrefix, check)
14+
}
15+
}
16+
17+
func Traverse(prefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) func(storage.Reader) error {
18+
return func(r storage.Reader) error {
19+
return TraverseByPrefix(r, prefix, iterFunc, opt)
20+
}
21+
}
22+
23+
func Retrieve(key []byte, entity interface{}) func(storage.Reader) error {
24+
return func(r storage.Reader) error {
25+
return RetrieveByKey(r, key, entity)
26+
}
27+
}
28+
29+
func Exists(key []byte, keyExists *bool) func(storage.Reader) error {
30+
return func(r storage.Reader) error {
31+
exists, err := KeyExists(r, key)
32+
if err != nil {
33+
return err
34+
}
35+
*keyExists = exists
36+
return nil
37+
}
38+
}
39+
40+
func FindHighestAtOrBelow(prefix []byte, height uint64, entity interface{}) func(storage.Reader) error {
41+
return func(r storage.Reader) error {
42+
return FindHighestAtOrBelowByPrefix(r, prefix, height, entity)
43+
}
44+
}

storage/operation/reads_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func TestIterateKeysInPrefixRange(t *testing.T) {
5353

5454
// Forward iteration and check boundaries
5555
var found [][]byte
56-
require.NoError(t, operation.IterateKeysInPrefixRange(prefixStart, prefixEnd, func(key []byte) error {
56+
require.NoError(t, operation.Iterate(prefixStart, prefixEnd, func(key []byte) error {
5757
found = append(found, key)
5858
return nil
5959
})(r), "should iterate forward without error")
@@ -66,7 +66,7 @@ func TestIterateInvalidRange(t *testing.T) {
6666
dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) {
6767

6868
var found [][]byte
69-
require.Error(t, operation.IterateKeysInPrefixRange([]byte{0x02}, []byte{0x01}, func(key []byte) error {
69+
require.Error(t, operation.Iterate([]byte{0x02}, []byte{0x01}, func(key []byte) error {
7070
found = append(found, key)
7171
return nil
7272
})(r))
@@ -118,7 +118,7 @@ func TestIterationBoundary(t *testing.T) {
118118

119119
// Forward iteration and check boundaries
120120
var found [][]byte
121-
require.NoError(t, operation.IterateKeysInPrefixRange(prefixStart, prefixEnd, func(key []byte) error {
121+
require.NoError(t, operation.Iterate(prefixStart, prefixEnd, func(key []byte) error {
122122
found = append(found, key)
123123
return nil
124124
})(r), "should iterate forward without error")

0 commit comments

Comments
 (0)