Skip to content

Commit 46595e9

Browse files
committed
remove consistency check in InsertProtocolKVStore
1 parent 7c66669 commit 46595e9

File tree

9 files changed

+43
-196
lines changed

9 files changed

+43
-196
lines changed

state/protocol/badger/state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func bootstrapProtocolState(
232232
// The sealing segment contains a protocol state entry for every block in the segment, including the root block.
233233
for protocolStateID, stateEntry := range segment.ProtocolStateEntries {
234234
// Store the protocol KV Store entry
235-
err := protocolKVStoreSnapshots.BatchStore(lctx, rw, protocolStateID, &stateEntry.KVStore)
235+
err := protocolKVStoreSnapshots.BatchStore(rw, protocolStateID, &stateEntry.KVStore)
236236
if err != nil {
237237
return fmt.Errorf("could not store protocol state kvstore: %w", err)
238238
}

state/protocol/protocol_state/kvstore/kvstore_storage.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package kvstore
33
import (
44
"fmt"
55

6-
"github.com/jordanschalm/lockctx"
7-
86
"github.com/onflow/flow-go/model/flow"
97
"github.com/onflow/flow-go/state/protocol"
108
"github.com/onflow/flow-go/state/protocol/protocol_state"
@@ -37,20 +35,15 @@ func NewProtocolKVStore(protocolStateSnapshots storage.ProtocolKVStore) *Protoco
3735
// data blob. If the encoding fails, an error is returned.
3836
// BatchStore is idempotent, i.e. it accepts repeated calls with the same pairs of (stateID, kvStore).
3937
// Here, the ID is expected to be a collision-resistant hash of the snapshot (including the
40-
// ProtocolStateVersion). Hence, for the same ID (key), BatchStore will reject changing the data (value).
41-
//
42-
// CAUTION: To prevent data corruption, we need to guarantee atomicity of existence-check and the subsequent
43-
// database write. Hence, we require the caller to acquire [storage.LockInsertBlock] and hold it until the
44-
// database write has been committed.
38+
// ProtocolStateVersion).
4539
//
46-
// Expected error returns during normal operations:
47-
// - [storage.ErrDataMismatch] if a _different_ KV store for the given stateID has already been persisted
48-
func (p *ProtocolKVStore) BatchStore(lctx lockctx.Proof, rw storage.ReaderBatchWriter, stateID flow.Identifier, kvStore protocol.KVStoreReader) error {
40+
// No error is exepcted during normal operations
41+
func (p *ProtocolKVStore) BatchStore(rw storage.ReaderBatchWriter, stateID flow.Identifier, kvStore protocol.KVStoreReader) error {
4942
version, data, err := kvStore.VersionedEncode()
5043
if err != nil {
5144
return fmt.Errorf("failed to VersionedEncode protocol state: %w", err)
5245
}
53-
return p.ProtocolKVStore.BatchStore(lctx, rw, stateID, &flow.PSKeyValueStoreData{
46+
return p.ProtocolKVStore.BatchStore(rw, stateID, &flow.PSKeyValueStoreData{
5447
Version: version,
5548
Data: data,
5649
})

state/protocol/protocol_state/kvstore_storage.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,8 @@ type ProtocolKVStore interface {
2020
// implementations of [protocol.KVStoreReader] should be able to successfully encode their state into a
2121
// data blob. If the encoding fails, an error is returned.
2222
//
23-
// CAUTION: To prevent data corruption, we need to guarantee atomicity of existence-check and the subsequent database
24-
// write. Hence, we require the caller to acquire the [storage.LockInsertBlock] lock and hold it until the database
25-
// write has been committed.
26-
//
27-
// Expected error returns during normal operations:
28-
// - [storage.ErrAlreadyExists] if a KV store with the given ID has already been stored
29-
BatchStore(lctx lockctx.Proof, rw storage.ReaderBatchWriter, stateID flow.Identifier, kvStore protocol.KVStoreReader) error
23+
// No error is expected during normal operations
24+
BatchStore(rw storage.ReaderBatchWriter, stateID flow.Identifier, kvStore protocol.KVStoreReader) error
3025

3126
// BatchIndex writes the blockID->stateID index to the input write batch.
3227
// In a nutshell, we want to maintain a map from `blockID` to `stateID`, where `blockID` references the

state/protocol/protocol_state/state/protocol_state.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -349,15 +349,11 @@ func (s *MutableProtocolState) build(
349349

350350
if parentStateID != resultingStateID {
351351
deferredDBOps.AddNextOperation(func(lctx lockctx.Proof, blockID flow.Identifier, rw storage.ReaderBatchWriter) error {
352-
err := s.kvStoreSnapshots.BatchStore(lctx, rw, resultingStateID, evolvingState)
352+
// no need to hold any lock, because the resultingStateID is a full content hash of the value
353+
err := s.kvStoreSnapshots.BatchStore(rw, resultingStateID, evolvingState)
353354
if err == nil {
354355
return nil
355356
}
356-
// The only error that `ProtocolKVStore.BatchStore` might return is `storage.ErrDataMismatch`.
357-
// Repeated requests to store the same state for the same id should be no-ops. It should be noted
358-
// that the `resultingStateID` is a collision-resistant hash of the encoded state (including the
359-
// state's version). Hence, mismatching data for the same id indicates a security-critical bug
360-
// or state corruption, making continuation impossible.
361357
return irrecoverable.NewExceptionf("unexpected error while trying to store new protocol state: %w", err)
362358
})
363359
}

storage/operation/protocol_kv_store.go

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,11 @@ import (
1111
)
1212

1313
// InsertProtocolKVStore inserts a protocol KV store by protocol kv store ID.
14-
// Error returns:
15-
// - [storage.ErrAlreadyExists] if the key already exists in the database.
16-
func InsertProtocolKVStore(lctx lockctx.Proof, rw storage.ReaderBatchWriter, protocolKVStoreID flow.Identifier, kvStore *flow.PSKeyValueStoreData) error {
17-
if !lctx.HoldsLock(storage.LockInsertBlock) {
18-
return fmt.Errorf("missing required lock: %s", storage.LockInsertBlock)
19-
}
20-
21-
key := MakePrefix(codeProtocolKVStore, protocolKVStoreID)
22-
exists, err := KeyExists(rw.GlobalReader(), key)
23-
if err != nil {
24-
return fmt.Errorf("could not check if kv-store snapshot %x exists: %w", protocolKVStoreID[:], irrecoverable.NewException(err))
25-
}
26-
if exists {
27-
return fmt.Errorf("a kv-store snapshot with id %x already exists: %w", protocolKVStoreID[:], storage.ErrAlreadyExists)
28-
}
29-
30-
return UpsertByKey(rw.Writer(), key, kvStore)
14+
// The caller must ensure the protocolKVStoreID is the hash of the given kvStore,
15+
// This is currently true, see makeVersionedModelID in state/protocol/protocol_state/kvstore/models.go
16+
// No expected error returns during normal operations.
17+
func InsertProtocolKVStore(rw storage.ReaderBatchWriter, protocolKVStoreID flow.Identifier, kvStore *flow.PSKeyValueStoreData) error {
18+
return UpsertByKey(rw.Writer(), MakePrefix(codeProtocolKVStore, protocolKVStoreID), kvStore)
3119
}
3220

3321
// RetrieveProtocolKVStore retrieves a protocol KV store by ID.

storage/operation/protocol_kv_store_test.go

Lines changed: 4 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,8 @@ func TestInsertProtocolKVStore(t *testing.T) {
2424
}
2525

2626
kvStoreStateID := unittest.IdentifierFixture()
27-
err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error {
28-
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
29-
return operation.InsertProtocolKVStore(lctx, rw, kvStoreStateID, expected)
30-
})
27+
err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
28+
return operation.InsertProtocolKVStore(rw, kvStoreStateID, expected)
3129
})
3230
require.NoError(t, err)
3331

@@ -53,47 +51,6 @@ func TestInsertProtocolKVStore(t *testing.T) {
5351
})
5452
}
5553

56-
// TestInsertProtocolKVStore_ErrAlreadyExists tests that InsertProtocolKVStore returns ErrAlreadyExists
57-
// when attempting to insert a protocol KV store that already exists.
58-
func TestInsertProtocolKVStore_ErrAlreadyExists(t *testing.T) {
59-
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
60-
lockManager := storage.NewTestingLockManager()
61-
expected := &flow.PSKeyValueStoreData{
62-
Version: 2,
63-
Data: unittest.RandomBytes(32),
64-
}
65-
66-
kvStoreStateID := unittest.IdentifierFixture()
67-
68-
// First insertion should succeed
69-
err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error {
70-
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
71-
return operation.InsertProtocolKVStore(lctx, rw, kvStoreStateID, expected)
72-
})
73-
})
74-
require.NoError(t, err)
75-
76-
// Second insertion with same ID should fail with ErrAlreadyExists
77-
differentData := &flow.PSKeyValueStoreData{
78-
Version: 3,
79-
Data: unittest.RandomBytes(32),
80-
}
81-
err = unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error {
82-
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
83-
return operation.InsertProtocolKVStore(lctx, rw, kvStoreStateID, differentData)
84-
})
85-
})
86-
require.Error(t, err)
87-
require.ErrorIs(t, err, storage.ErrAlreadyExists)
88-
89-
// Verify original data is still there and unchanged
90-
var actual flow.PSKeyValueStoreData
91-
err = operation.RetrieveProtocolKVStore(db.Reader(), kvStoreStateID, &actual)
92-
require.NoError(t, err)
93-
assert.Equal(t, expected, &actual)
94-
})
95-
}
96-
9754
// TestIndexProtocolKVStore_ErrAlreadyExists tests that IndexProtocolKVStore returns ErrAlreadyExists
9855
// when attempting to index a protocol KV store for a block ID that already has an index.
9956
func TestIndexProtocolKVStore_ErrAlreadyExists(t *testing.T) {
@@ -108,10 +65,8 @@ func TestIndexProtocolKVStore_ErrAlreadyExists(t *testing.T) {
10865
blockID := unittest.IdentifierFixture()
10966

11067
// Insert the protocol KV store first
111-
err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error {
112-
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
113-
return operation.InsertProtocolKVStore(lctx, rw, kvStoreStateID, expected)
114-
})
68+
err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
69+
return operation.InsertProtocolKVStore(rw, kvStoreStateID, expected)
11570
})
11671
require.NoError(t, err)
11772

@@ -141,29 +96,6 @@ func TestIndexProtocolKVStore_ErrAlreadyExists(t *testing.T) {
14196
})
14297
}
14398

144-
// TestInsertProtocolKVStore_MissingLock tests that InsertProtocolKVStore requires LockInsertBlock.
145-
func TestInsertProtocolKVStore_MissingLock(t *testing.T) {
146-
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
147-
lockManager := storage.NewTestingLockManager()
148-
expected := &flow.PSKeyValueStoreData{
149-
Version: 2,
150-
Data: unittest.RandomBytes(32),
151-
}
152-
153-
kvStoreStateID := unittest.IdentifierFixture()
154-
155-
// Attempt to insert without holding the required lock
156-
lctx := lockManager.NewContext()
157-
defer lctx.Release()
158-
159-
err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
160-
return operation.InsertProtocolKVStore(lctx, rw, kvStoreStateID, expected)
161-
})
162-
require.Error(t, err)
163-
require.Contains(t, err.Error(), storage.LockInsertBlock)
164-
})
165-
}
166-
16799
// TestIndexProtocolKVStore_MissingLock tests that IndexProtocolKVStore requires LockInsertBlock.
168100
func TestIndexProtocolKVStore_MissingLock(t *testing.T) {
169101
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
@@ -183,28 +115,6 @@ func TestIndexProtocolKVStore_MissingLock(t *testing.T) {
183115
})
184116
}
185117

186-
// TestInsertProtocolKVStore_WrongLock tests that InsertProtocolKVStore fails when holding wrong locks.
187-
func TestInsertProtocolKVStore_WrongLock(t *testing.T) {
188-
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
189-
lockManager := storage.NewTestingLockManager()
190-
expected := &flow.PSKeyValueStoreData{
191-
Version: 2,
192-
Data: unittest.RandomBytes(32),
193-
}
194-
195-
kvStoreStateID := unittest.IdentifierFixture()
196-
197-
// Test with LockFinalizeBlock (wrong lock)
198-
err := unittest.WithLock(t, lockManager, storage.LockFinalizeBlock, func(lctx lockctx.Context) error {
199-
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
200-
return operation.InsertProtocolKVStore(lctx, rw, kvStoreStateID, expected)
201-
})
202-
})
203-
require.Error(t, err)
204-
require.Contains(t, err.Error(), storage.LockInsertBlock)
205-
})
206-
}
207-
208118
// TestIndexProtocolKVStore_WrongLock tests that IndexProtocolKVStore fails when holding wrong locks.
209119
func TestIndexProtocolKVStore_WrongLock(t *testing.T) {
210120
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {

storage/protocol_kv_store.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,10 @@ type ProtocolKVStore interface {
1717
// BatchStore persists the KV-store snapshot in the database using the given ID as key.
1818
// BatchStore is idempotent, i.e. it accepts repeated calls with the same pairs of (stateID, kvStore).
1919
// Here, the ID is expected to be a collision-resistant hash of the snapshot (including the
20-
// ProtocolStateVersion). Hence, for the same ID (key), BatchStore will reject changing the data (value).
20+
// ProtocolStateVersion).
2121
//
22-
// CAUTION: To prevent data corruption, we need to guarantee atomicity of existence-check and the subsequent
23-
// database write. Hence, we require the caller to acquire [storage.LockInsertBlock] and hold it until the
24-
// database write has been committed.
25-
//
26-
// Expected error returns during normal operations:
27-
// - [storage.ErrDataMismatch] if a _different_ KV store for the given stateID has already been persisted
28-
BatchStore(lctx lockctx.Proof, rw ReaderBatchWriter, stateID flow.Identifier, data *flow.PSKeyValueStoreData) error
22+
// No error is expected during normal operations.
23+
BatchStore(rw ReaderBatchWriter, stateID flow.Identifier, data *flow.PSKeyValueStoreData) error
2924

3025
// BatchIndex appends the following operation to the provided write batch:
3126
// we extend the map from `blockID` to `stateID`, where `blockID` references the
@@ -45,7 +40,7 @@ type ProtocolKVStore interface {
4540
// database write has been committed.
4641
//
4742
// Expected error returns during normal operations:
48-
// - storage.ErrDataMismatch if a _different_ KV store for the given stateID has already been persisted
43+
// - [storage.ErrAlreadyExists] if a KV store for the given blockID has already been indexed
4944
BatchIndex(lctx lockctx.Proof, rw ReaderBatchWriter, blockID flow.Identifier, stateID flow.Identifier) error
5045

5146
// ByID retrieves the KV store snapshot with the given ID.

storage/store/protocol_kv_store.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func NewProtocolKVStore(collector module.CacheMetrics,
7070
db: db,
7171
cache: newCache(collector, metrics.ResourceProtocolKVStore,
7272
withLimit[flow.Identifier, *flow.PSKeyValueStoreData](kvStoreCacheSize),
73-
withStoreWithLock(operation.InsertProtocolKVStore),
73+
withStore(operation.InsertProtocolKVStore),
7474
withRetrieve(retrieveByStateID)),
7575
byBlockIdCache: newCache(collector, metrics.ResourceProtocolKVStoreByBlockID,
7676
withLimit[flow.Identifier, flow.Identifier](kvStoreByBlockIDCacheSize),
@@ -84,15 +84,10 @@ func NewProtocolKVStore(collector module.CacheMetrics,
8484
// Here, the ID is expected to be a collision-resistant hash of the snapshot (including the
8585
// ProtocolStateVersion). Hence, for the same ID, BatchStore will reject changing the data.
8686
//
87-
// CAUTION: To prevent data corruption, we need to guarantee atomicity of existence-check and the subsequent database
88-
// write. Hence, we require the caller to acquire the [storage.LockInsertBlock] lock and hold it until the database
89-
// write has been committed.
90-
//
91-
// Expected error returns during normal operations:
92-
// - [storage.ErrAlreadyExist] if a KV store with the given ID has already been stored
93-
func (s *ProtocolKVStore) BatchStore(lctx lockctx.Proof, rw storage.ReaderBatchWriter, stateID flow.Identifier, data *flow.PSKeyValueStoreData) error {
87+
// No error is expected during normal operations.
88+
func (s *ProtocolKVStore) BatchStore(rw storage.ReaderBatchWriter, stateID flow.Identifier, data *flow.PSKeyValueStoreData) error {
9489
return s.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
95-
return s.cache.PutWithLockTx(lctx, rw, stateID, data)
90+
return s.cache.PutTx(rw, stateID, data)
9691
})
9792
}
9893

storage/store/protocol_kv_store_test.go

Lines changed: 17 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ func TestKeyValueStoreStorage(t *testing.T) {
3131
// store protocol state and auxiliary info
3232
err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error {
3333
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
34-
err := store.BatchStore(lctx, rw, stateID, expected)
35-
require.NoError(t, err)
34+
err := store.BatchStore(rw, stateID, expected)
35+
if err != nil {
36+
return err
37+
}
3638
return store.BatchIndex(lctx, rw, blockID, stateID)
3739
})
3840
})
@@ -50,13 +52,11 @@ func TestKeyValueStoreStorage(t *testing.T) {
5052
})
5153
}
5254

53-
// TestProtocolKVStore_StoreTx tests that StoreTx handles storage request correctly, when a snapshot with
54-
// the given id has already been stored:
55-
// - if the KV-store snapshot is exactly the same as the one already stored (incl. the version), `BatchStore` should return without an error
56-
// - if we request to store a _different_ KV-store snapshot, an `storage.ErrDataMismatch` should be returned.
55+
// TestProtocolKVStore_StoreTx tests that StoreTx handles storage request correctly.
56+
// Since BatchStore is now idempotent and doesn't return errors for duplicate data,
57+
// we test that it can be called multiple times without issues.
5758
func TestProtocolKVStore_StoreTx(t *testing.T) {
5859
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
59-
lockManager := storage.NewTestingLockManager()
6060
metrics := metrics.NewNoopCollector()
6161
store := NewProtocolKVStore(metrics, db, DefaultProtocolKVStoreCacheSize, DefaultProtocolKVStoreByBlockIDCacheSize)
6262

@@ -67,46 +67,21 @@ func TestProtocolKVStore_StoreTx(t *testing.T) {
6767
}
6868

6969
// Store initial data
70-
err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error {
71-
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
72-
return store.BatchStore(lctx, rw, stateID, expected)
73-
})
70+
err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
71+
return store.BatchStore(rw, stateID, expected)
7472
})
7573
require.NoError(t, err)
7674

77-
// Store same data again - should error
78-
err = unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error {
79-
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
80-
return store.BatchStore(lctx, rw, stateID, expected)
81-
})
75+
// Store same data again - should succeed (idempotent)
76+
err = db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
77+
return store.BatchStore(rw, stateID, expected)
8278
})
83-
require.ErrorIs(t, err, storage.ErrAlreadyExists)
84-
85-
// Attempt to store different data with the same stateID
86-
dataDifferent := &flow.PSKeyValueStoreData{
87-
Version: 2,
88-
Data: unittest.RandomBytes(32),
89-
}
90-
91-
err = unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error {
92-
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
93-
return store.BatchStore(lctx, rw, stateID, dataDifferent)
94-
})
95-
})
96-
require.ErrorIs(t, err, storage.ErrAlreadyExists)
97-
98-
// Attempt to store different version with the same stateID
99-
versionDifferent := &flow.PSKeyValueStoreData{
100-
Version: 3,
101-
Data: expected.Data,
102-
}
79+
require.NoError(t, err)
10380

104-
err = unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error {
105-
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
106-
return store.BatchStore(lctx, rw, stateID, versionDifferent)
107-
})
108-
})
109-
require.ErrorIs(t, err, storage.ErrAlreadyExists)
81+
// Verify the data can still be retrieved
82+
actual, err := store.ByID(stateID)
83+
require.NoError(t, err)
84+
assert.Equal(t, expected, actual)
11085
})
11186
}
11287

0 commit comments

Comments
 (0)