Skip to content

Commit f69494b

Browse files
authored
Merge pull request #7964 from onflow/leo/refactor-epoch-protocol-state
[Storage] Refactor epoch protocol state
2 parents 1043cd7 + 1599b94 commit f69494b

File tree

11 files changed

+264
-131
lines changed

11 files changed

+264
-131
lines changed

state/protocol/badger/state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ func bootstrapProtocolState(
256256
for _, proposal := range segment.AllBlocks() {
257257
blockID := proposal.Block.ID()
258258
protocolStateEntryWrapper := segment.ProtocolStateEntries[proposal.Block.Payload.ProtocolStateID]
259-
err := epochProtocolStateSnapshots.BatchIndex(rw, blockID, protocolStateEntryWrapper.EpochEntry.ID())
259+
err := epochProtocolStateSnapshots.BatchIndex(lctx, rw, blockID, protocolStateEntryWrapper.EpochEntry.ID())
260260
if err != nil {
261261
return fmt.Errorf("could not index root protocol state: %w", err)
262262
}

state/protocol/protocol_state/epochs/statemachine.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,8 @@ func NewEpochStateMachine(
242242
func (e *EpochStateMachine) Build() (*deferred.DeferredBlockPersist, error) {
243243
updatedEpochState, updatedStateID, hasChanges := e.activeStateMachine.Build()
244244

245-
e.pendingDBUpdates.AddNextOperation(func(_ lockctx.Proof, blockID flow.Identifier, rw storage.ReaderBatchWriter) error {
246-
return e.epochProtocolStateDB.BatchIndex(rw, blockID, updatedStateID)
245+
e.pendingDBUpdates.AddNextOperation(func(lctx lockctx.Proof, blockID flow.Identifier, rw storage.ReaderBatchWriter) error {
246+
return e.epochProtocolStateDB.BatchIndex(lctx, rw, blockID, updatedStateID)
247247
})
248248

249249
if hasChanges {

state/protocol/protocol_state/epochs/statemachine_test.go

Lines changed: 55 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"errors"
55
"testing"
66

7+
"github.com/jordanschalm/lockctx"
78
"github.com/stretchr/testify/assert"
89
mocks "github.com/stretchr/testify/mock"
910
"github.com/stretchr/testify/require"
@@ -16,6 +17,7 @@ import (
1617
"github.com/onflow/flow-go/state/protocol/protocol_state/epochs"
1718
"github.com/onflow/flow-go/state/protocol/protocol_state/epochs/mock"
1819
protocol_statemock "github.com/onflow/flow-go/state/protocol/protocol_state/mock"
20+
"github.com/onflow/flow-go/storage"
1921
storagemock "github.com/onflow/flow-go/storage/mock"
2022
"github.com/onflow/flow-go/utils/unittest"
2123
)
@@ -40,6 +42,7 @@ type EpochStateMachineSuite struct {
4042
happyPathStateMachineFactory *mock.StateMachineFactoryMethod
4143
fallbackPathStateMachineFactory *mock.StateMachineFactoryMethod
4244
candidate *flow.Header
45+
lockManager lockctx.Manager
4346

4447
stateMachine *epochs.EpochStateMachine
4548
}
@@ -56,6 +59,7 @@ func (s *EpochStateMachineSuite) SetupTest() {
5659
s.happyPathStateMachine = mock.NewStateMachine(s.T())
5760
s.happyPathStateMachineFactory = mock.NewStateMachineFactoryMethod(s.T())
5861
s.fallbackPathStateMachineFactory = mock.NewStateMachineFactoryMethod(s.T())
62+
s.lockManager = storage.NewTestingLockManager()
5963

6064
s.epochStateDB.On("ByBlockID", mocks.Anything).Return(func(_ flow.Identifier) *flow.RichEpochStateEntry {
6165
return s.parentEpochState
@@ -97,17 +101,21 @@ func (s *EpochStateMachineSuite) TestBuild_NoChanges() {
97101

98102
rw := storagemock.NewReaderBatchWriter(s.T())
99103

100-
s.epochStateDB.On("BatchIndex", rw, s.candidate.ID(), s.parentEpochState.ID()).Return(nil).Once()
101-
s.mutator.On("SetEpochStateID", s.parentEpochState.ID()).Return(nil).Once()
104+
// Create a proper lock context proof for the BatchIndex operation
105+
err = unittest.WithLock(s.T(), s.lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error {
106+
s.epochStateDB.On("BatchIndex", lctx, rw, s.candidate.ID(), s.parentEpochState.ID()).Return(nil).Once()
107+
s.mutator.On("SetEpochStateID", s.parentEpochState.ID()).Return(nil).Once()
102108

103-
dbUpdates, err := s.stateMachine.Build()
104-
require.NoError(s.T(), err)
109+
dbUpdates, err := s.stateMachine.Build()
110+
require.NoError(s.T(), err)
105111

106-
// Provide the blockID and execute the resulting `dbUpdates`. Thereby, the expected mock methods should be called,
107-
// which is asserted by the testify framework. Passing nil lockctx proof because no operations require lock;
108-
// operations are deferred only because block ID is not known yet.
109-
blockID := s.candidate.ID()
110-
err = dbUpdates.Execute(nil, blockID, rw)
112+
// Storage operations are deferred, because block ID is not known when the block is newly constructed. Only at the
113+
// end after the block is fully constructed, its ID can be computed. We emulate this step here to verify that the
114+
// deferred `dbOps` have been correctly constructed. Thereby, the expected mock methods should be called,
115+
// which is asserted by the testify framework.
116+
blockID := s.candidate.ID()
117+
return dbUpdates.Execute(lctx, blockID, rw)
118+
})
111119
require.NoError(s.T(), err)
112120
}
113121

@@ -139,19 +147,22 @@ func (s *EpochStateMachineSuite) TestBuild_HappyPath() {
139147
err := s.stateMachine.EvolveState([]flow.ServiceEvent{epochSetup.ServiceEvent(), epochCommit.ServiceEvent()})
140148
require.NoError(s.T(), err)
141149

142-
// prepare a DB update for epoch state
143-
s.epochStateDB.On("BatchIndex", rw, s.candidate.ID(), updatedStateID).Return(nil).Once()
144-
s.epochStateDB.On("BatchStore", w, updatedStateID, updatedState.MinEpochStateEntry).Return(nil).Once()
145-
s.mutator.On("SetEpochStateID", updatedStateID).Return(nil).Once()
150+
// Create a proper lock context proof for the BatchIndex operation
151+
err = unittest.WithLock(s.T(), s.lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error {
152+
// prepare a DB update for epoch state
153+
s.epochStateDB.On("BatchIndex", lctx, rw, s.candidate.ID(), updatedStateID).Return(nil).Once()
154+
s.epochStateDB.On("BatchStore", w, updatedStateID, updatedState.MinEpochStateEntry).Return(nil).Once()
155+
s.mutator.On("SetEpochStateID", updatedStateID).Return(nil).Once()
146156

147-
dbUpdates, err := s.stateMachine.Build()
148-
require.NoError(s.T(), err)
157+
dbUpdates, err := s.stateMachine.Build()
158+
require.NoError(s.T(), err)
149159

150-
// Provide the blockID and execute the resulting `dbUpdates`. Thereby, the expected mock methods should be called,
151-
// which is asserted by the testify framework. Passing nil lockctx proof because no operations require lock;
152-
// operations are deferred only because block ID is not known yet.
153-
blockID := s.candidate.ID()
154-
err = dbUpdates.Execute(nil, blockID, rw)
160+
// Provide the blockID and execute the resulting `dbUpdates`. Thereby, the expected mock methods should be called,
161+
// which is asserted by the testify framework. The lock context proof is passed to verify that the BatchIndex
162+
// operation receives the proper lock context as required by the storage layer.
163+
blockID := s.candidate.ID()
164+
return dbUpdates.Execute(lctx, blockID, rw)
165+
})
155166
require.NoError(s.T(), err)
156167
}
157168

@@ -532,29 +543,34 @@ func (s *EpochStateMachineSuite) TestEvolveStateTransitionToNextEpoch_WithInvali
532543
err = stateMachine.EvolveState([]flow.ServiceEvent{invalidServiceEvent.ServiceEvent()})
533544
require.NoError(s.T(), err)
534545

535-
s.epochStateDB.On("BatchIndex", mocks.Anything, s.candidate.ID(), mocks.Anything).Return(nil).Once()
546+
// Create a proper lock context proof for the BatchIndex operation
547+
err = unittest.WithLock(s.T(), s.lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error {
548+
s.epochStateDB.On("BatchIndex", lctx, mocks.Anything, s.candidate.ID(), mocks.Anything).Return(nil).Once()
536549

537-
expectedEpochState := &flow.MinEpochStateEntry{
538-
PreviousEpoch: s.parentEpochState.CurrentEpoch.Copy(),
539-
CurrentEpoch: *s.parentEpochState.NextEpoch.Copy(),
540-
NextEpoch: nil,
541-
EpochFallbackTriggered: true,
542-
}
550+
expectedEpochState := &flow.MinEpochStateEntry{
551+
PreviousEpoch: s.parentEpochState.CurrentEpoch.Copy(),
552+
CurrentEpoch: *s.parentEpochState.NextEpoch.Copy(),
553+
NextEpoch: nil,
554+
EpochFallbackTriggered: true,
555+
}
543556

544-
s.epochStateDB.On("BatchStore", mocks.Anything, expectedEpochState.ID(), expectedEpochState).Return(nil).Once()
545-
s.mutator.On("SetEpochStateID", expectedEpochState.ID()).Return().Once()
557+
s.epochStateDB.On("BatchStore", mocks.Anything, expectedEpochState.ID(), expectedEpochState).Return(nil).Once()
558+
s.mutator.On("SetEpochStateID", expectedEpochState.ID()).Return().Once()
546559

547-
dbOps, err := stateMachine.Build()
548-
require.NoError(s.T(), err)
560+
dbOps, err := stateMachine.Build()
561+
require.NoError(s.T(), err)
549562

550-
w := storagemock.NewWriter(s.T())
551-
rw := storagemock.NewReaderBatchWriter(s.T())
552-
rw.On("Writer").Return(w).Once() // called by epochStateDB.BatchStore
563+
w := storagemock.NewWriter(s.T())
564+
rw := storagemock.NewReaderBatchWriter(s.T())
565+
rw.On("Writer").Return(w).Once() // called by epochStateDB.BatchStore
553566

554-
// Provide the blockID and execute the resulting `dbUpdates`. Thereby, the expected mock methods should be called,
555-
// which is asserted by the testify framework. Passing nil lockctx proof because no operations require lock;
556-
// operations are deferred only because block ID is not known yet
557-
blockID := s.candidate.ID()
558-
err = dbOps.Execute(nil, blockID, rw)
567+
// Storage operations are deferred, because block ID is not known when the block is newly constructed. Only at the
568+
// end after the block is fully constructed, its ID can be computed. We emulate this step here to verify that the
569+
// deferred `dbOps` have been correctly constructed. Thereby, the expected mock methods should be called,
570+
// which is asserted by the testify framework.
571+
blockID := s.candidate.ID()
572+
return dbOps.Execute(lctx, blockID, rw)
573+
})
559574
require.NoError(s.T(), err)
575+
560576
}

storage/epoch_protocol_state.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,23 @@
11
package storage
22

33
import (
4+
"github.com/jordanschalm/lockctx"
5+
46
"github.com/onflow/flow-go/model/flow"
57
)
68

79
// EpochProtocolStateEntries represents persistent, fork-aware storage for the Epoch-related
810
// sub-state of the overall of the overall Protocol State (KV Store).
911
type EpochProtocolStateEntries interface {
12+
1013
// BatchStore persists the given epoch protocol state entry as part of a DB batch. Per convention, the identities in
11-
// the flow.MinEpochStateEntry must be in canonical order for the current and next epoch (if present),
12-
// otherwise an exception is returned.
14+
// the flow.MinEpochStateEntry must be in canonical order for the current and next epoch (if present), otherwise an
15+
// exception is returned.
16+
//
17+
// CAUTION: The caller must ensure `epochProtocolStateID` is a collision-resistant hash of the provided
18+
// `epochProtocolStateEntry`! This method silently overrides existing data, which is safe only if for the same
19+
// key, we always write the same value.
20+
//
1321
// No errors are expected during normal operation.
1422
BatchStore(w Writer, epochProtocolStateID flow.Identifier, epochProtocolStateEntry *flow.MinEpochStateEntry) error
1523

@@ -21,11 +29,22 @@ type EpochProtocolStateEntries interface {
2129
// the protocol state changes if we seal some execution results emitting service events.
2230
// - For the key `blockID`, we use the identity of block B which _proposes_ this Protocol State. As value,
2331
// the hash of the resulting protocol state at the end of processing B is to be used.
24-
// - CAUTION: The protocol state requires confirmation by a QC and will only become active at the child block,
32+
// - IMPORTANT: The protocol state requires confirmation by a QC and will only become active at the child block,
2533
// _after_ validating the QC.
2634
//
27-
// No errors are expected during normal operation.
28-
BatchIndex(rw ReaderBatchWriter, blockID flow.Identifier, epochProtocolStateID flow.Identifier) error
35+
// CAUTION:
36+
// - The caller must acquire the lock [storage.LockInsertBlock] and hold it until the database write has been committed.
37+
// - OVERWRITES existing data (potential for data corruption):
38+
// This method silently overrides existing data without any sanity checks whether data for the same key already exits.
39+
// Note that the Flow protocol mandates that for a previously persisted key, the data is never changed to a different
40+
// value. Changing data could cause the node to publish inconsistent data and to be slashed, or the protocol to be
41+
// compromised as a whole. This method does not contain any safeguards to prevent such data corruption. The lock proof
42+
// serves as a reminder that the CALLER is responsible to ensure that the DEDUPLICATION CHECK is done elsewhere
43+
// ATOMICALLY with this write operation.
44+
//
45+
// Expected errors during normal operations:
46+
// - [storage.ErrDataMismatch] if a _different_ KV store for the given stateID has already been persisted
47+
BatchIndex(lctx lockctx.Proof, rw ReaderBatchWriter, blockID flow.Identifier, epochProtocolStateID flow.Identifier) error
2948

3049
// ByID returns the flow.RichEpochStateEntry by its ID.
3150
// Expected errors during normal operations:

storage/mock/epoch_protocol_state_entries.go

Lines changed: 7 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

storage/operation/children_test.go

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -247,24 +247,31 @@ func TestDirectChildren(t *testing.T) {
247247
// TestChildrenWrongLockIsRejected verifies that operations fail when called with the wrong lock type.
248248
// This ensures that IndexNewBlock requires LockInsertBlock and IndexNewClusterBlock requires LockInsertOrFinalizeClusterBlock.
249249
func TestChildrenWrongLockIsRejected(t *testing.T) {
250-
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
251-
lockManager := storage.NewTestingLockManager()
252-
253-
parentID := unittest.IdentifierFixture()
254-
childID := unittest.IdentifierFixture()
250+
for _, opPair := range getTestOperationPairs() {
251+
t.Run(opPair.name, func(t *testing.T) {
252+
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
253+
lockManager := storage.NewTestingLockManager()
255254

256-
err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error {
257-
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
258-
return operation.IndexNewClusterBlock(lctx, rw, childID, parentID)
259-
})
260-
})
261-
require.Error(t, err)
255+
parentID := unittest.IdentifierFixture()
256+
childID := unittest.IdentifierFixture()
262257

263-
err = unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
264-
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
265-
return operation.IndexNewBlock(lctx, rw, childID, parentID)
258+
// Use the wrong lock type for each operation
259+
var wrongLockType string
260+
if opPair.lockType == storage.LockInsertBlock {
261+
// For IndexNewBlock, use the cluster block lock (wrong)
262+
wrongLockType = storage.LockInsertOrFinalizeClusterBlock
263+
} else {
264+
// For IndexNewClusterBlock, use the regular block lock (wrong)
265+
wrongLockType = storage.LockInsertBlock
266+
}
267+
268+
err := unittest.WithLock(t, lockManager, wrongLockType, func(lctx lockctx.Context) error {
269+
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
270+
return opPair.indexFunc(lctx, rw, childID, parentID)
271+
})
272+
})
273+
require.Error(t, err)
266274
})
267275
})
268-
require.Error(t, err)
269-
})
276+
}
270277
}

storage/operation/epoch_protocol_state.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package operation
22

33
import (
4+
"fmt"
5+
6+
"github.com/jordanschalm/lockctx"
7+
48
"github.com/onflow/flow-go/model/flow"
59
"github.com/onflow/flow-go/storage"
610
)
@@ -15,22 +19,34 @@ func InsertEpochProtocolState(w storage.Writer, entryID flow.Identifier, entry *
1519
// RetrieveEpochProtocolState retrieves an epoch protocol state entry by ID.
1620
// Error returns:
1721
// - storage.ErrNotFound if the key does not exist in the database
18-
// - generic error in case of unexpected failure from the database layer
1922
func RetrieveEpochProtocolState(r storage.Reader, entryID flow.Identifier, entry *flow.MinEpochStateEntry) error {
2023
return RetrieveByKey(r, MakePrefix(codeEpochProtocolState, entryID), entry)
2124
}
2225

2326
// IndexEpochProtocolState indexes an epoch protocol state entry by block ID.
24-
// Error returns:
25-
// - generic error in case of unexpected failure from the database layer or encoding failure.
26-
func IndexEpochProtocolState(w storage.Writer, blockID flow.Identifier, epochProtocolStateEntryID flow.Identifier) error {
27+
//
28+
// CAUTION:
29+
// - The caller must acquire the lock [storage.LockInsertBlock] and hold it until the database write has been committed.
30+
// - OVERWRITES existing data (potential for data corruption):
31+
// This method silently overrides existing data without any sanity checks whether data for the same key already exits.
32+
// Note that the Flow protocol mandates that for a previously persisted key, the data is never changed to a different
33+
// value. Changing data could cause the node to publish inconsistent data and to be slashed, or the protocol to be
34+
// compromised as a whole. This method does not contain any safeguards to prevent such data corruption. The lock proof
35+
// serves as a reminder that the CALLER is responsible to ensure that the DEDUPLICATION CHECK is done elsewhere
36+
// ATOMICALLY with this write operation.
37+
//
38+
// No error returns are expected during normal operation.
39+
func IndexEpochProtocolState(lctx lockctx.Proof, w storage.Writer, blockID flow.Identifier, epochProtocolStateEntryID flow.Identifier) error {
40+
if !lctx.HoldsLock(storage.LockInsertBlock) {
41+
return fmt.Errorf("missing required lock: %s", storage.LockInsertBlock)
42+
}
43+
2744
return UpsertByKey(w, MakePrefix(codeEpochProtocolStateByBlockID, blockID), epochProtocolStateEntryID)
2845
}
2946

3047
// LookupEpochProtocolState finds an epoch protocol state entry ID by block ID.
3148
// Error returns:
3249
// - storage.ErrNotFound if the key does not exist in the database
33-
// - generic error in case of unexpected failure from the database layer
3450
func LookupEpochProtocolState(r storage.Reader, blockID flow.Identifier, epochProtocolStateEntryID *flow.Identifier) error {
3551
return RetrieveByKey(r, MakePrefix(codeEpochProtocolStateByBlockID, blockID), epochProtocolStateEntryID)
3652
}

0 commit comments

Comments
 (0)