Skip to content

Commit 45950a7

Browse files
authored
Merge pull request #6803 from onflow/leo/db-ops-writes
Extract uncurried functions from storage writes
2 parents ee4c9e4 + 36ae5ce commit 45950a7

File tree

7 files changed

+329
-77
lines changed

7 files changed

+329
-77
lines changed

ledger/complete/wal/checkpoint_v6_writer.go

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/onflow/flow-go/ledger/complete/mtrie/node"
2020
"github.com/onflow/flow-go/ledger/complete/mtrie/trie"
2121
utilsio "github.com/onflow/flow-go/utils/io"
22+
"github.com/onflow/flow-go/utils/merr"
2223
)
2324

2425
const subtrieLevel = 4
@@ -708,39 +709,7 @@ func decodeSubtrieCount(encoded []byte) (uint16, error) {
708709
return binary.BigEndian.Uint16(encoded), nil
709710
}
710711

711-
// closeAndMergeError close the closable and merge the closeErr with the given err into a multierror
712-
// Note: when using this function in a defer function, don't use as below:
713-
// func XXX() (
714-
//
715-
// err error,
716-
// ) {
717-
// def func() {
718-
// // bad, because the definition of err might get overwritten
719-
// err = closeAndMergeError(closable, err)
720-
// }()
721-
//
722-
// Better to use as below:
723-
// func XXX() (
724-
//
725-
// errToReturn error,
726-
// ) {
727-
// def func() {
728-
// // good, because the error to returned is only updated here, and guaranteed to be returned
729-
// errToReturn = closeAndMergeError(closable, errToReturn)
730-
// }()
731-
func closeAndMergeError(closable io.Closer, err error) error {
732-
var merr *multierror.Error
733-
if err != nil {
734-
merr = multierror.Append(merr, err)
735-
}
736-
737-
closeError := closable.Close()
738-
if closeError != nil {
739-
merr = multierror.Append(merr, closeError)
740-
}
741-
742-
return merr.ErrorOrNil()
743-
}
712+
var closeAndMergeError = merr.CloseAndMergeError
744713

745714
// withFile opens the file at the given path, and calls the given function with the opened file.
746715
// it handles closing the file and evicting the file from Linux page cache.

storage/operation/prefix.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
//nolint:golint,unused
2+
package operation
3+
4+
import (
5+
"encoding/binary"
6+
"fmt"
7+
8+
"github.com/onflow/flow-go/model/flow"
9+
)
10+
11+
const (
12+
13+
// codes for special database markers
14+
// codeMax = 1 // deprecated
15+
codeDBType = 2 // specifies a database type
16+
17+
// codes for views with special meaning
18+
codeSafetyData = 10 // safety data for hotstuff state
19+
codeLivenessData = 11 // liveness data for hotstuff state
20+
21+
// codes for fields associated with the root state
22+
codeSporkID = 13
23+
codeProtocolVersion = 14
24+
codeEpochCommitSafetyThreshold = 15
25+
codeSporkRootBlockHeight = 16
26+
27+
// code for heights with special meaning
28+
codeFinalizedHeight = 20 // latest finalized block height
29+
codeSealedHeight = 21 // latest sealed block height
30+
codeClusterHeight = 22 // latest finalized height on cluster
31+
codeExecutedBlock = 23 // latest executed block with max height
32+
codeFinalizedRootHeight = 24 // the height of the highest finalized block contained in the root snapshot
33+
codeLastCompleteBlockHeight = 25 // the height of the last block for which all collections were received
34+
codeEpochFirstHeight = 26 // the height of the first block in a given epoch
35+
codeSealedRootHeight = 27 // the height of the highest sealed block contained in the root snapshot
36+
37+
// codes for single entity storage
38+
codeHeader = 30
39+
_ = 31 // DEPRECATED: 31 was used for identities before epochs
40+
codeGuarantee = 32
41+
codeSeal = 33
42+
codeTransaction = 34
43+
codeCollection = 35
44+
codeExecutionResult = 36
45+
codeResultApproval = 37
46+
codeChunk = 38
47+
codeExecutionReceiptMeta = 39 // NOTE: prior to Mainnet25, this erroneously had the same value as codeExecutionResult (36)
48+
49+
// codes for indexing single identifier by identifier/integer
50+
codeHeightToBlock = 40 // index mapping height to block ID
51+
codeBlockIDToLatestSealID = 41 // index mapping a block its last payload seal
52+
codeClusterBlockToRefBlock = 42 // index cluster block ID to reference block ID
53+
codeRefHeightToClusterBlock = 43 // index reference block height to cluster block IDs
54+
codeBlockIDToFinalizedSeal = 44 // index _finalized_ seal by sealed block ID
55+
codeBlockIDToQuorumCertificate = 45 // index of quorum certificates by block ID
56+
codeEpochProtocolStateByBlockID = 46 // index of epoch protocol state entry by block ID
57+
codeProtocolKVStoreByBlockID = 47 // index of protocol KV store entry by block ID
58+
59+
// codes for indexing multiple identifiers by identifier
60+
codeBlockChildren = 50 // index mapping block ID to children blocks
61+
_ = 51 // DEPRECATED: 51 was used for identity indexes before epochs
62+
codePayloadGuarantees = 52 // index mapping block ID to payload guarantees
63+
codePayloadSeals = 53 // index mapping block ID to payload seals
64+
codeCollectionBlock = 54 // index mapping collection ID to block ID
65+
codeOwnBlockReceipt = 55 // index mapping block ID to execution receipt ID for execution nodes
66+
_ = 56 // DEPRECATED: 56 was used for block->epoch status prior to Dynamic Protocol State in Mainnet25
67+
codePayloadReceipts = 57 // index mapping block ID to payload receipts
68+
codePayloadResults = 58 // index mapping block ID to payload results
69+
codeAllBlockReceipts = 59 // index mapping of blockID to multiple receipts
70+
codePayloadProtocolStateID = 60 // index mapping block ID to payload protocol state ID
71+
72+
// codes related to protocol level information
73+
codeEpochSetup = 61 // EpochSetup service event, keyed by ID
74+
codeEpochCommit = 62 // EpochCommit service event, keyed by ID
75+
codeBeaconPrivateKey = 63 // BeaconPrivateKey, keyed by epoch counter
76+
codeDKGStarted = 64 // flag that the DKG for an epoch has been started
77+
codeDKGEnded = 65 // flag that the DKG for an epoch has ended (stores end state)
78+
codeVersionBeacon = 67 // flag for storing version beacons
79+
codeEpochProtocolState = 68
80+
codeProtocolKVStore = 69
81+
82+
// code for ComputationResult upload status storage
83+
// NOTE: for now only GCP uploader is supported. When other uploader (AWS e.g.) needs to
84+
// be supported, we will need to define new code.
85+
codeComputationResults = 66
86+
87+
// job queue consumers and producers
88+
codeJobConsumerProcessed = 70
89+
codeJobQueue = 71
90+
codeJobQueuePointer = 72
91+
92+
// legacy codes (should be cleaned up)
93+
codeChunkDataPack = 100
94+
codeCommit = 101
95+
codeEvent = 102
96+
codeExecutionStateInteractions = 103
97+
codeTransactionResult = 104
98+
codeFinalizedCluster = 105
99+
codeServiceEvent = 106
100+
codeTransactionResultIndex = 107
101+
codeLightTransactionResult = 108
102+
codeLightTransactionResultIndex = 109
103+
codeTransactionResultErrorMessage = 110
104+
codeTransactionResultErrorMessageIndex = 111
105+
codeIndexCollection = 200
106+
codeIndexExecutionResultByBlock = 202
107+
codeIndexCollectionByTransaction = 203
108+
codeIndexResultApprovalByChunk = 204
109+
110+
// TEMPORARY codes
111+
blockedNodeIDs = 205 // manual override for adding node IDs to list of ejected nodes, applies to networking layer only
112+
113+
// internal failure information that should be preserved across restarts
114+
codeExecutionFork = 254
115+
codeEpochEmergencyFallbackTriggered = 255
116+
)
117+
118+
func MakePrefix(code byte, keys ...interface{}) []byte {
119+
prefix := make([]byte, 1)
120+
prefix[0] = code
121+
for _, key := range keys {
122+
prefix = append(prefix, KeyPartToBytes(key)...)
123+
}
124+
return prefix
125+
}
126+
127+
func KeyPartToBytes(v interface{}) []byte {
128+
switch i := v.(type) {
129+
case uint8:
130+
return []byte{i}
131+
case uint32:
132+
b := make([]byte, 4)
133+
binary.BigEndian.PutUint32(b, i)
134+
return b
135+
case uint64:
136+
b := make([]byte, 8)
137+
binary.BigEndian.PutUint64(b, i)
138+
return b
139+
case string:
140+
return []byte(i)
141+
case flow.Role:
142+
return []byte{byte(i)}
143+
case flow.Identifier:
144+
return i[:]
145+
case flow.ChainID:
146+
return []byte(i)
147+
default:
148+
panic(fmt.Sprintf("unsupported type to convert (%T)", v))
149+
}
150+
}

storage/operation/reads.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/onflow/flow-go/module/irrecoverable"
1111
"github.com/onflow/flow-go/storage"
12+
"github.com/onflow/flow-go/utils/merr"
1213
)
1314

1415
// CheckFunc is a function that checks if the value should be read and decoded.
@@ -51,7 +52,7 @@ func IterateKeysByPrefixRange(r storage.Reader, startPrefix []byte, endPrefix []
5152
// IterateKeys will iterate over all entries in the database, where the key starts with a prefixes in
5253
// the range [startPrefix, endPrefix] (both inclusive).
5354
// No errors expected during normal operations.
54-
func IterateKeys(r storage.Reader, startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) error {
55+
func IterateKeys(r storage.Reader, startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) (errToReturn error) {
5556
if len(startPrefix) == 0 {
5657
return fmt.Errorf("startPrefix prefix is empty")
5758
}
@@ -69,7 +70,9 @@ func IterateKeys(r storage.Reader, startPrefix []byte, endPrefix []byte, iterFun
6970
if err != nil {
7071
return fmt.Errorf("can not create iterator: %w", err)
7172
}
72-
defer it.Close()
73+
defer func() {
74+
errToReturn = merr.CloseAndMergeError(it, errToReturn)
75+
}()
7376

7477
for it.First(); it.Valid(); it.Next() {
7578
item := it.IterItem()
@@ -130,7 +133,7 @@ func TraverseByPrefix(r storage.Reader, prefix []byte, iterFunc IterationFunc, o
130133
// When this returned function is executed (and only then), it will write into the `keyExists` whether
131134
// the key exists.
132135
// No errors are expected during normal operation.
133-
func KeyExists(r storage.Reader, key []byte) (bool, error) {
136+
func KeyExists(r storage.Reader, key []byte) (exist bool, errToReturn error) {
134137
_, closer, err := r.Get(key)
135138
if err != nil {
136139
// the key does not exist in the database
@@ -140,7 +143,9 @@ func KeyExists(r storage.Reader, key []byte) (bool, error) {
140143
// exception while checking for the key
141144
return false, irrecoverable.NewExceptionf("could not load data: %w", err)
142145
}
143-
defer closer.Close()
146+
defer func() {
147+
errToReturn = merr.CloseAndMergeError(closer, errToReturn)
148+
}()
144149

145150
// the key does exist in the database
146151
return true, nil
@@ -153,13 +158,15 @@ func KeyExists(r storage.Reader, key []byte) (bool, error) {
153158
// - storage.ErrNotFound if the key does not exist in the database
154159
// - generic error in case of unexpected failure from the database layer, or failure
155160
// to decode an existing database value
156-
func RetrieveByKey(r storage.Reader, key []byte, entity interface{}) error {
161+
func RetrieveByKey(r storage.Reader, key []byte, entity interface{}) (errToReturn error) {
157162
val, closer, err := r.Get(key)
158163
if err != nil {
159164
return err
160165
}
161166

162-
defer closer.Close()
167+
defer func() {
168+
errToReturn = merr.CloseAndMergeError(closer, errToReturn)
169+
}()
163170

164171
err = msgpack.Unmarshal(val, entity)
165172
if err != nil {
@@ -172,7 +179,7 @@ func RetrieveByKey(r storage.Reader, key []byte, entity interface{}) error {
172179
// keys with the format prefix` + `height` (where "+" denotes concatenation of binary strings). The height
173180
// is encoded as Big-Endian (entries with numerically smaller height have lexicographically smaller key).
174181
// The function finds the *highest* key with the given prefix and height equal to or below the given height.
175-
func FindHighestAtOrBelowByPrefix(r storage.Reader, prefix []byte, height uint64, entity interface{}) error {
182+
func FindHighestAtOrBelowByPrefix(r storage.Reader, prefix []byte, height uint64, entity interface{}) (errToReturn error) {
176183
if len(prefix) == 0 {
177184
return fmt.Errorf("prefix must not be empty")
178185
}
@@ -182,7 +189,9 @@ func FindHighestAtOrBelowByPrefix(r storage.Reader, prefix []byte, height uint64
182189
if err != nil {
183190
return fmt.Errorf("can not create iterator: %w", err)
184191
}
185-
defer it.Close()
192+
defer func() {
193+
errToReturn = merr.CloseAndMergeError(it, errToReturn)
194+
}()
186195

187196
var highestKey []byte
188197

@@ -203,7 +212,9 @@ func FindHighestAtOrBelowByPrefix(r storage.Reader, prefix []byte, height uint64
203212
return err
204213
}
205214

206-
defer closer.Close()
215+
defer func() {
216+
errToReturn = merr.CloseAndMergeError(closer, errToReturn)
217+
}()
207218

208219
err = msgpack.Unmarshal(val, entity)
209220
if err != nil {

storage/operation/writes.go

Lines changed: 29 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,61 +10,55 @@ import (
1010
"github.com/onflow/flow-go/storage"
1111
)
1212

13-
// Upsert will encode the given entity using msgpack and will insert the resulting
13+
// UpsertByKey will encode the given entity using msgpack and will insert the resulting
1414
// binary data under the provided key.
1515
// If the key already exists, the value will be overwritten.
1616
// Error returns:
1717
// - generic error in case of unexpected failure from the database layer or
1818
// encoding failure.
19-
func Upsert(key []byte, val interface{}) func(storage.Writer) error {
20-
return func(w storage.Writer) error {
21-
value, err := msgpack.Marshal(val)
22-
if err != nil {
23-
return irrecoverable.NewExceptionf("failed to encode value: %w", err)
24-
}
25-
26-
err = w.Set(key, value)
27-
if err != nil {
28-
return irrecoverable.NewExceptionf("failed to store data: %w", err)
29-
}
19+
func UpsertByKey(w storage.Writer, key []byte, val interface{}) error {
20+
value, err := msgpack.Marshal(val)
21+
if err != nil {
22+
return irrecoverable.NewExceptionf("failed to encode value: %w", err)
23+
}
3024

31-
return nil
25+
err = w.Set(key, value)
26+
if err != nil {
27+
return irrecoverable.NewExceptionf("failed to store data: %w", err)
3228
}
29+
30+
return nil
3331
}
3432

35-
// Remove removes the entity with the given key, if it exists. If it doesn't
33+
// RemoveByKey removes the entity with the given key, if it exists. If it doesn't
3634
// exist, this is a no-op.
3735
// Error returns:
3836
// * generic error in case of unexpected database error
39-
func Remove(key []byte) func(storage.Writer) error {
40-
return func(w storage.Writer) error {
41-
err := w.Delete(key)
42-
if err != nil {
43-
return irrecoverable.NewExceptionf("could not delete item: %w", err)
44-
}
45-
return nil
37+
func RemoveByKey(w storage.Writer, key []byte) error {
38+
err := w.Delete(key)
39+
if err != nil {
40+
return irrecoverable.NewExceptionf("could not delete item: %w", err)
4641
}
42+
return nil
4743
}
4844

49-
// RemoveByPrefix removes all keys with the given prefix
45+
// RemoveByKeyPrefix removes all keys with the given prefix
5046
// Error returns:
5147
// * generic error in case of unexpected database error
52-
func RemoveByPrefix(reader storage.Reader, key []byte) func(storage.Writer) error {
53-
return RemoveByRange(reader, key, key)
48+
func RemoveByKeyPrefix(reader storage.Reader, w storage.Writer, key []byte) error {
49+
return RemoveByKeyRange(reader, w, key, key)
5450
}
5551

56-
// RemoveByRange removes all keys with a prefix that falls within the range [start, end], both inclusive.
52+
// RemoveByKeyRange removes all keys with a prefix that falls within the range [start, end], both inclusive.
5753
// It returns error if endPrefix < startPrefix
5854
// no other errors are expected during normal operation
59-
func RemoveByRange(reader storage.Reader, startPrefix []byte, endPrefix []byte) func(storage.Writer) error {
60-
return func(w storage.Writer) error {
61-
if bytes.Compare(startPrefix, endPrefix) > 0 {
62-
return fmt.Errorf("startPrefix key must be less than or equal to endPrefix key")
63-
}
64-
err := w.DeleteByRange(reader, startPrefix, endPrefix)
65-
if err != nil {
66-
return irrecoverable.NewExceptionf("could not delete item: %w", err)
67-
}
68-
return nil
55+
func RemoveByKeyRange(reader storage.Reader, w storage.Writer, startPrefix []byte, endPrefix []byte) error {
56+
if bytes.Compare(startPrefix, endPrefix) > 0 {
57+
return fmt.Errorf("startPrefix key must be less than or equal to endPrefix key")
58+
}
59+
err := w.DeleteByRange(reader, startPrefix, endPrefix)
60+
if err != nil {
61+
return irrecoverable.NewExceptionf("could not delete item: %w", err)
6962
}
63+
return nil
7064
}

0 commit comments

Comments
 (0)