Skip to content

Commit 7a3c1ee

Browse files
authored
Merge pull request #7967 from onflow/leo/refactor-index-protocol-kv-store
[Storage] Refactor index protocol kv store
2 parents be53e37 + 474879a commit 7a3c1ee

File tree

13 files changed

+200
-184
lines changed

13 files changed

+200
-184
lines changed

state/protocol/badger/state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func bootstrapProtocolState(
232232
// The sealing segment contains a protocol state entry for every block in the segment, including the root block.
233233
for protocolStateID, stateEntry := range segment.ProtocolStateEntries {
234234
// Store the protocol KV Store entry
235-
err := protocolKVStoreSnapshots.BatchStore(lctx, rw, protocolStateID, &stateEntry.KVStore)
235+
err := protocolKVStoreSnapshots.BatchStore(rw, protocolStateID, &stateEntry.KVStore)
236236
if err != nil {
237237
return fmt.Errorf("could not store protocol state kvstore: %w", err)
238238
}

state/protocol/protocol_state/kvstore/kvstore_storage.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package kvstore
33
import (
44
"fmt"
55

6-
"github.com/jordanschalm/lockctx"
7-
86
"github.com/onflow/flow-go/model/flow"
97
"github.com/onflow/flow-go/state/protocol"
108
"github.com/onflow/flow-go/state/protocol/protocol_state"
@@ -37,16 +35,15 @@ func NewProtocolKVStore(protocolStateSnapshots storage.ProtocolKVStore) *Protoco
3735
// data blob. If the encoding fails, an error is returned.
3836
// BatchStore is idempotent, i.e. it accepts repeated calls with the same pairs of (stateID, kvStore).
3937
// Here, the ID is expected to be a collision-resistant hash of the snapshot (including the
40-
// ProtocolStateVersion). Hence, for the same ID (key), BatchStore will reject changing the data (value).
38+
// ProtocolStateVersion).
4139
//
42-
// Expected errors during normal operations:
43-
// - storage.ErrDataMismatch if a _different_ KV store for the given stateID has already been persisted
44-
func (p *ProtocolKVStore) BatchStore(lctx lockctx.Proof, rw storage.ReaderBatchWriter, stateID flow.Identifier, kvStore protocol.KVStoreReader) error {
40+
// No error is exepcted during normal operations
41+
func (p *ProtocolKVStore) BatchStore(rw storage.ReaderBatchWriter, stateID flow.Identifier, kvStore protocol.KVStoreReader) error {
4542
version, data, err := kvStore.VersionedEncode()
4643
if err != nil {
4744
return fmt.Errorf("failed to VersionedEncode protocol state: %w", err)
4845
}
49-
return p.ProtocolKVStore.BatchStore(lctx, rw, stateID, &flow.PSKeyValueStoreData{
46+
return p.ProtocolKVStore.BatchStore(rw, stateID, &flow.PSKeyValueStoreData{
5047
Version: version,
5148
Data: data,
5249
})

state/protocol/protocol_state/kvstore/kvstore_storage_test.go

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ func TestProtocolKVStore_StoreTx(t *testing.T) {
2929
// On the happy path, where the input `kvState` encodes its state successfully, the wrapped store
3030
// should be called to persist the version-encoded snapshot.
3131
t.Run("happy path", func(t *testing.T) {
32-
lockManager := storage.NewTestingLockManager()
3332
expectedVersion := uint64(13)
3433
encData := unittest.RandomBytes(117)
3534
versionedSnapshot := &flow.PSKeyValueStoreData{
@@ -38,32 +37,26 @@ func TestProtocolKVStore_StoreTx(t *testing.T) {
3837
}
3938
kvState.On("VersionedEncode").Return(expectedVersion, encData, nil).Once()
4039

41-
err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error {
42-
rw := storagemock.NewReaderBatchWriter(t)
43-
llStorage.On("BatchStore", lctx, rw, kvStateID, versionedSnapshot).Return(nil).Once()
40+
rw := storagemock.NewReaderBatchWriter(t)
41+
llStorage.On("BatchStore", rw, kvStateID, versionedSnapshot).Return(nil).Once()
4442

45-
// TODO: potentially update - we might be bringing back a functor here, because we acquire a lock as explained in slack thread https://flow-foundation.slack.com/archives/C071612SJJE/p1754600182033289?thread_ts=1752912083.194619&cid=C071612SJJE
46-
// Calling `BatchStore` should return the output of the wrapped low-level storage, which is a deferred database
47-
// update. Conceptually, it is possible that `ProtocolKVStore` wraps the deferred database operation in faulty
48-
// code, such that it cannot be executed. Therefore, we execute the top-level deferred database update below
49-
// and verify that the deferred database operation returned by the lower-level is actually reached.
50-
return store.BatchStore(lctx, rw, kvStateID, kvState)
51-
})
43+
// Calling `BatchStore` should return the output of the wrapped low-level storage, which is a deferred database
44+
// update. Conceptually, it is possible that `ProtocolKVStore` wraps the deferred database operation in faulty
45+
// code, such that it cannot be executed. Therefore, we execute the top-level deferred database update below
46+
// and verify that the deferred database operation returned by the lower-level is actually reached.
47+
err := store.BatchStore(rw, kvStateID, kvState)
5248
require.NoError(t, err)
5349
})
5450

5551
// On the unhappy path, i.e. when the encoding of input `kvState` failed, `ProtocolKVStore` should produce
5652
// a deferred database update that always returns the encoding error.
5753
t.Run("encoding fails", func(t *testing.T) {
58-
lockManager := storage.NewTestingLockManager()
5954
encodingError := errors.New("encoding error")
6055

6156
kvState.On("VersionedEncode").Return(uint64(0), nil, encodingError).Once()
6257

63-
err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error {
64-
rw := storagemock.NewReaderBatchWriter(t)
65-
return store.BatchStore(lctx, rw, kvStateID, kvState)
66-
})
58+
rw := storagemock.NewReaderBatchWriter(t)
59+
err := store.BatchStore(rw, kvStateID, kvState)
6760
require.ErrorIs(t, err, encodingError)
6861
})
6962
}

state/protocol/protocol_state/kvstore_storage.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,9 @@ type ProtocolKVStore interface {
1919
// BatchStore adds the KV-store snapshot in the database using the given ID as key. Per convention, all
2020
// implementations of [protocol.KVStoreReader] should be able to successfully encode their state into a
2121
// data blob. If the encoding fails, an error is returned.
22-
// BatchStore is idempotent, i.e. it accepts repeated calls with the same pairs of (stateID, kvStore).
23-
// Here, the ID is expected to be a collision-resistant hash of the snapshot (including the
24-
// ProtocolStateVersion). Hence, for the same ID (key), BatchStore will reject changing the data (value).
2522
//
26-
// Expected errors during normal operations:
27-
// - storage.ErrDataMismatch if a _different_ KV store for the given stateID has already been persisted
28-
BatchStore(lctx lockctx.Proof, rw storage.ReaderBatchWriter, stateID flow.Identifier, kvStore protocol.KVStoreReader) error
23+
// No error is expected during normal operations
24+
BatchStore(rw storage.ReaderBatchWriter, stateID flow.Identifier, kvStore protocol.KVStoreReader) error
2925

3026
// BatchIndex writes the blockID->stateID index to the input write batch.
3127
// In a nutshell, we want to maintain a map from `blockID` to `stateID`, where `blockID` references the
@@ -34,12 +30,15 @@ type ProtocolKVStore interface {
3430
// - Consider block B, whose ingestion might potentially lead to an updated KV store. For example,
3531
// the KV store changes if we seal some execution results emitting specific service events.
3632
// - For the key `blockID`, we use the identity of block B which _proposes_ this updated KV store.
37-
// - CAUTION: The updated state requires confirmation by a QC and will only become active at the
33+
// - IMPORTANT: The updated state requires confirmation by a QC and will only become active at the
3834
// child block, _after_ validating the QC.
3935
//
40-
// It requires the caller to acquire storage.LockInsertBlock lock
36+
// CAUTION: To prevent data corruption, we need to guarantee atomicity of existence-check and the subsequent
37+
// database write. Hence, we require the caller to acquire [storage.LockInsertBlock] and hold it until the
38+
// database write has been committed.
39+
//
4140
// Expected errors of the returned anonymous function:
42-
// - storage.ErrAlreadyExists if a KV store for the given blockID has already been indexed.
41+
// - [storage.ErrAlreadyExists] if a KV store for the given blockID has already been indexed.
4342
BatchIndex(lctx lockctx.Proof, rw storage.ReaderBatchWriter, blockID flow.Identifier, stateID flow.Identifier) error
4443

4544
// ByID retrieves the KV store snapshot with the given ID.

state/protocol/protocol_state/mock/protocol_kv_store.go

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

state/protocol/protocol_state/state/mutable_protocol_state_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func (s *StateMutatorSuite) testEvolveState(seals []*flow.Seal, expectedResultin
109109
// expect calls to prepare a deferred update for indexing and storing the resulting state:
110110
// as state has not changed, we expect the parent blocks protocol state ID
111111
if stateChangeExpected {
112-
s.protocolKVStoreDB.On("BatchStore", mock.Anything, rw, expectedResultingStateID, &s.evolvingState).Return(nil).Once()
112+
s.protocolKVStoreDB.On("BatchStore", rw, expectedResultingStateID, &s.evolvingState).Return(nil).Once()
113113
}
114114

115115
deferredDBOps := deferred.NewDeferredBlockPersist()
@@ -553,7 +553,7 @@ func (s *StateMutatorSuite) Test_EncodeFailed() {
553553

554554
rw := storagemock.NewReaderBatchWriter(s.T())
555555
s.protocolKVStoreDB.On("BatchIndex", mock.Anything, mock.Anything, s.candidate.ID(), expectedResultingStateID).Return(nil).Once()
556-
s.protocolKVStoreDB.On("BatchStore", mock.Anything, mock.Anything, expectedResultingStateID, &s.evolvingState).Return(exception).Once()
556+
s.protocolKVStoreDB.On("BatchStore", mock.Anything, expectedResultingStateID, &s.evolvingState).Return(exception).Once()
557557

558558
deferredDBOps := deferred.NewDeferredBlockPersist()
559559
_, err := s.mutableState.EvolveState(deferredDBOps, s.candidate.ParentID, s.candidate.View, []*flow.Seal{})

state/protocol/protocol_state/state/protocol_state.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -349,15 +349,11 @@ func (s *MutableProtocolState) build(
349349

350350
if parentStateID != resultingStateID {
351351
deferredDBOps.AddNextOperation(func(lctx lockctx.Proof, blockID flow.Identifier, rw storage.ReaderBatchWriter) error {
352-
err := s.kvStoreSnapshots.BatchStore(lctx, rw, resultingStateID, evolvingState)
352+
// no need to hold any lock, because the resultingStateID is a full content hash of the value
353+
err := s.kvStoreSnapshots.BatchStore(rw, resultingStateID, evolvingState)
353354
if err == nil {
354355
return nil
355356
}
356-
// The only error that `ProtocolKVStore.BatchStore` might return is `storage.ErrDataMismatch`.
357-
// Repeated requests to store the same state for the same id should be no-ops. It should be noted
358-
// that the `resultingStateID` is a collision-resistant hash of the encoded state (including the
359-
// state's version). Hence, mismatching data for the same id indicates a security-critical bug
360-
// or state corruption, making continuation impossible.
361357
return irrecoverable.NewExceptionf("unexpected error while trying to store new protocol state: %w", err)
362358
})
363359
}

storage/mock/protocol_kv_store.go

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,58 @@
11
package operation
22

33
import (
4+
"fmt"
5+
6+
"github.com/jordanschalm/lockctx"
7+
48
"github.com/onflow/flow-go/model/flow"
9+
"github.com/onflow/flow-go/module/irrecoverable"
510
"github.com/onflow/flow-go/storage"
611
)
712

8-
// InsertProtocolKVStore inserts a protocol KV store by ID.
9-
// Error returns:
10-
// - storage.ErrAlreadyExists if the key already exists in the database.
11-
// - generic error in case of unexpected failure from the database layer or encoding failure.
12-
func InsertProtocolKVStore(w storage.Writer, protocolKVStoreID flow.Identifier, kvStore *flow.PSKeyValueStoreData) error {
13-
return UpsertByKey(w, MakePrefix(codeProtocolKVStore, protocolKVStoreID), kvStore)
13+
// InsertProtocolKVStore inserts a protocol KV store by protocol kv store ID.
14+
// The caller must ensure the protocolKVStoreID is the hash of the given kvStore,
15+
// This is currently true, see makeVersionedModelID in state/protocol/protocol_state/kvstore/models.go
16+
// No expected error returns during normal operations.
17+
func InsertProtocolKVStore(rw storage.ReaderBatchWriter, protocolKVStoreID flow.Identifier, kvStore *flow.PSKeyValueStoreData) error {
18+
return UpsertByKey(rw.Writer(), MakePrefix(codeProtocolKVStore, protocolKVStoreID), kvStore)
1419
}
1520

1621
// RetrieveProtocolKVStore retrieves a protocol KV store by ID.
17-
// Error returns:
18-
// - storage.ErrNotFound if the key does not exist in the database
19-
// - generic error in case of unexpected failure from the database layer
22+
// Expected error returns during normal operations:
23+
// - [storage.ErrNotFound] if no protocol KV with the given ID store exists
2024
func RetrieveProtocolKVStore(r storage.Reader, protocolKVStoreID flow.Identifier, kvStore *flow.PSKeyValueStoreData) error {
2125
return RetrieveByKey(r, MakePrefix(codeProtocolKVStore, protocolKVStoreID), kvStore)
2226
}
2327

2428
// IndexProtocolKVStore indexes a protocol KV store by block ID.
25-
// Error returns:
26-
// - storage.ErrAlreadyExists if the key already exists in the database.
27-
// - generic error in case of unexpected failure from the database layer
28-
func IndexProtocolKVStore(w storage.Writer, blockID flow.Identifier, protocolKVStoreID flow.Identifier) error {
29-
return UpsertByKey(w, MakePrefix(codeProtocolKVStoreByBlockID, blockID), protocolKVStoreID)
29+
//
30+
// CAUTION: To prevent data corruption, we need to guarantee atomicity of existence-check and the subsequent
31+
// database write. Hence, we require the caller to acquire [storage.LockInsertBlock] and hold it until the
32+
// database write has been committed.
33+
//
34+
// Expected error returns during normal operations:
35+
// - [storage.ErrAlreadyExists] if a KV store for the given blockID has already been indexed
36+
func IndexProtocolKVStore(lctx lockctx.Proof, rw storage.ReaderBatchWriter, blockID flow.Identifier, protocolKVStoreID flow.Identifier) error {
37+
if !lctx.HoldsLock(storage.LockInsertBlock) {
38+
return fmt.Errorf("missing required lock: %s", storage.LockInsertBlock)
39+
}
40+
41+
key := MakePrefix(codeProtocolKVStoreByBlockID, blockID)
42+
exists, err := KeyExists(rw.GlobalReader(), key)
43+
if err != nil {
44+
return fmt.Errorf("could not check if kv-store snapshot with block id (%x) exists: %w", blockID[:], irrecoverable.NewException(err))
45+
}
46+
if exists {
47+
return fmt.Errorf("a kv-store snapshot for block id (%x) already exists: %w", blockID[:], storage.ErrAlreadyExists)
48+
}
49+
50+
return UpsertByKey(rw.Writer(), key, protocolKVStoreID)
3051
}
3152

3253
// LookupProtocolKVStore finds protocol KV store ID by block ID.
33-
// Error returns:
34-
// - storage.ErrNotFound if the key does not exist in the database
35-
// - generic error in case of unexpected failure from the database layer
54+
// Expected error returns during normal operations:
55+
// - [storage.ErrNotFound] if the given ID does not correspond to any known block
3656
func LookupProtocolKVStore(r storage.Reader, blockID flow.Identifier, protocolKVStoreID *flow.Identifier) error {
3757
return RetrieveByKey(r, MakePrefix(codeProtocolKVStoreByBlockID, blockID), protocolKVStoreID)
3858
}

0 commit comments

Comments
 (0)