Skip to content

Commit 78c4a13

Browse files
authored
Merge pull request #8017 from onflow/leo/refactor-execution-fork-evidence
[Storage] Refactor execution fork evidence
2 parents da3c9e1 + c26fc13 commit 78c4a13

15 files changed

+112
-70
lines changed

cmd/consensus/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ func main() {
396396
multipleReceiptsFilterMempool,
397397
consensusMempools.LogForkAndCrash(node.Logger),
398398
node.ProtocolDB,
399+
node.StorageLockMgr,
399400
node.Logger,
400401
)
401402
if err != nil {

module/mempool/consensus/exec_fork_suppressor.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"sync"
77

8+
"github.com/jordanschalm/lockctx"
89
"github.com/rs/zerolog"
910
"github.com/rs/zerolog/log"
1011
"go.uber.org/atomic"
@@ -46,6 +47,7 @@ type ExecForkSuppressor struct {
4647
execForkDetected atomic.Bool
4748
onExecFork ExecForkActor
4849
execForkEvidenceStore storage.ExecutionForkEvidence
50+
lockManager storage.LockManager
4951
log zerolog.Logger
5052
}
5153

@@ -61,6 +63,7 @@ func NewExecStateForkSuppressor(
6163
seals mempool.IncorporatedResultSeals,
6264
onExecFork ExecForkActor,
6365
db storage.DB,
66+
lockManager storage.LockManager,
6467
log zerolog.Logger,
6568
) (*ExecForkSuppressor, error) {
6669
executionForkEvidenceStore := store.NewExecutionForkEvidence(db)
@@ -83,6 +86,7 @@ func NewExecStateForkSuppressor(
8386
execForkDetected: *atomic.NewBool(execForkDetectedFlag),
8487
onExecFork: onExecFork,
8588
execForkEvidenceStore: executionForkEvidenceStore,
89+
lockManager: lockManager,
8690
log: log.With().Str("mempool", "ExecForkSuppressor").Logger(),
8791
}
8892

@@ -365,9 +369,13 @@ func (s *ExecForkSuppressor) filterConflictingSeals(sealsByBlockID map[flow.Iden
365369
s.execForkDetected.Store(true)
366370
s.Clear()
367371
conflictingSeals = append(sealsList{candidateSeal}, conflictingSeals...)
368-
err := s.execForkEvidenceStore.StoreIfNotExists(conflictingSeals)
372+
373+
// Acquire lock and store execution fork evidence
374+
err := storage.WithLock(s.lockManager, storage.LockInsertExecutionForkEvidence, func(lctx lockctx.Context) error {
375+
return s.execForkEvidenceStore.StoreIfNotExists(lctx, conflictingSeals)
376+
})
369377
if err != nil {
370-
panic("failed to store execution fork evidence")
378+
s.log.Fatal().Msg("failed to store execution fork evidence")
371379
}
372380
s.onExecFork(conflictingSeals)
373381
return nil

module/mempool/consensus/exec_fork_suppressor_test.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -255,11 +255,12 @@ func Test_ForkDetectionPersisted(t *testing.T) {
255255

256256
// This function stores conflicting seals to the underlying database.
257257
func(t *testing.T, db storage.DB) {
258+
lockManager := storage.NewTestingLockManager()
258259

259260
// initialize ExecForkSuppressor
260261
wrappedMempool := &poolmock.IncorporatedResultSeals{}
261262
execForkActor := &actormock.ExecForkActor{}
262-
wrapper, _ := NewExecStateForkSuppressor(wrappedMempool, execForkActor.OnExecFork, db, zerolog.New(os.Stderr))
263+
wrapper, _ := NewExecStateForkSuppressor(wrappedMempool, execForkActor.OnExecFork, db, lockManager, zerolog.New(os.Stderr))
263264

264265
// add seal
265266
wrappedMempool.On("Add", sealA).Return(true, nil).Once()
@@ -285,14 +286,16 @@ func Test_ForkDetectionPersisted(t *testing.T) {
285286

286287
// This function retrieves conflicting seals from the same underlying database with a new instance of storage.DB.
287288
func(t *testing.T, db storage.DB) {
289+
lockManager := storage.NewTestingLockManager()
290+
288291
wrappedMempool2 := &poolmock.IncorporatedResultSeals{}
289292
execForkActor2 := &actormock.ExecForkActor{}
290293
execForkActor2.On("OnExecFork", mock.Anything).
291294
Run(func(args mock.Arguments) {
292295
conflictingSeals := args.Get(0).([]*flow.IncorporatedResultSeal)
293296
require.ElementsMatch(t, []*flow.IncorporatedResultSeal{sealA, sealB}, conflictingSeals)
294297
}).Return().Once()
295-
wrapper2, _ := NewExecStateForkSuppressor(wrappedMempool2, execForkActor2.OnExecFork, db, zerolog.New(os.Stderr))
298+
wrapper2, _ := NewExecStateForkSuppressor(wrappedMempool2, execForkActor2.OnExecFork, db, lockManager, zerolog.New(os.Stderr))
296299

297300
// add another (non-conflicting) seal to ExecForkSuppressor
298301
// fail test if seal is added to wrapped mempool
@@ -318,8 +321,10 @@ func Test_AddRemove_SmokeTest(t *testing.T) {
318321
require.Fail(t, "no call to onExecFork expected ")
319322
}
320323
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
324+
lockManager := storage.NewTestingLockManager()
325+
321326
wrappedMempool := stdmap.NewIncorporatedResultSeals(100)
322-
wrapper, err := NewExecStateForkSuppressor(wrappedMempool, onExecFork, db, zerolog.New(os.Stderr))
327+
wrapper, err := NewExecStateForkSuppressor(wrappedMempool, onExecFork, db, lockManager, zerolog.New(os.Stderr))
323328
require.NoError(t, err)
324329
require.NotNil(t, wrapper)
325330

@@ -355,6 +360,8 @@ func Test_AddRemove_SmokeTest(t *testing.T) {
355360
// Test adding conflicting seals with different number of matching receipts.
356361
func Test_ConflictingSeal_SmokeTest(t *testing.T) {
357362
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
363+
lockManager := storage.NewTestingLockManager()
364+
358365
executingForkDetected := atomic.NewBool(false)
359366
onExecFork := func([]*flow.IncorporatedResultSeal) {
360367
executingForkDetected.Store(true)
@@ -363,7 +370,7 @@ func Test_ConflictingSeal_SmokeTest(t *testing.T) {
363370
rawMempool := stdmap.NewIncorporatedResultSeals(100)
364371
receiptsDB := mockstorage.NewExecutionReceipts(t)
365372
wrappedMempool := NewIncorporatedResultSeals(rawMempool, receiptsDB)
366-
wrapper, err := NewExecStateForkSuppressor(wrappedMempool, onExecFork, db, zerolog.New(os.Stderr))
373+
wrapper, err := NewExecStateForkSuppressor(wrappedMempool, onExecFork, db, lockManager, zerolog.New(os.Stderr))
367374
require.NoError(t, err)
368375
require.NotNil(t, wrapper)
369376

@@ -426,9 +433,11 @@ func Test_ConflictingSeal_SmokeTest(t *testing.T) {
426433
// 4. executes the `testLogic`
427434
func WithExecStateForkSuppressor(t *testing.T, testLogic func(wrapper *ExecForkSuppressor, wrappedMempool *poolmock.IncorporatedResultSeals, execForkActor *actormock.ExecForkActor)) {
428435
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
436+
lockManager := storage.NewTestingLockManager()
437+
429438
wrappedMempool := &poolmock.IncorporatedResultSeals{}
430439
execForkActor := &actormock.ExecForkActor{}
431-
wrapper, err := NewExecStateForkSuppressor(wrappedMempool, execForkActor.OnExecFork, db, zerolog.New(os.Stderr))
440+
wrapper, err := NewExecStateForkSuppressor(wrappedMempool, execForkActor.OnExecFork, db, lockManager, zerolog.New(os.Stderr))
432441
require.NoError(t, err)
433442
require.NotNil(t, wrapper)
434443
testLogic(wrapper, wrappedMempool, execForkActor)

storage/epoch_protocol_state.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,12 @@ type EpochProtocolStateEntries interface {
3535
// CAUTION:
3636
// - The caller must acquire the lock [storage.LockInsertBlock] and hold it until the database write has been committed.
3737
// - OVERWRITES existing data (potential for data corruption):
38-
// This method silently overrides existing data without any sanity checks whether data for the same key already exits.
39-
// Note that the Flow protocol mandates that for a previously persisted key, the data is never changed to a different
40-
// value. Changing data could cause the node to publish inconsistent data and to be slashed, or the protocol to be
41-
// compromised as a whole. This method does not contain any safeguards to prevent such data corruption. The lock proof
42-
// serves as a reminder that the CALLER is responsible to ensure that the DEDUPLICATION CHECK is done elsewhere
43-
// ATOMICALLY with this write operation.
38+
// The lock proof serves as a reminder that the CALLER is responsible to ensure that the DEDUPLICATION CHECK is done elsewhere
39+
// ATOMICALLY within this write operation. Currently it's done by operation.InsertHeader where it performs a check
40+
// to ensure the blockID is new, therefore any data indexed by this blockID is new as well.
4441
//
4542
// Expected errors during normal operations:
46-
// - [storage.ErrDataMismatch] if a _different_ KV store for the given stateID has already been persisted
43+
// No expected errors during normal operations.
4744
BatchIndex(lctx lockctx.Proof, rw ReaderBatchWriter, blockID flow.Identifier, epochProtocolStateID flow.Identifier) error
4845

4946
// ByID returns the flow.RichEpochStateEntry by its ID.

storage/execution_fork_evidence.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package storage
22

3-
import "github.com/onflow/flow-go/model/flow"
3+
import (
4+
"github.com/jordanschalm/lockctx"
5+
6+
"github.com/onflow/flow-go/model/flow"
7+
)
48

59
// ExecutionForkEvidence represents persistent storage for execution fork evidence.
610
// CAUTION: Not safe for concurrent use by multiple goroutines.
@@ -9,8 +13,9 @@ type ExecutionForkEvidence interface {
913
// if no execution fork evidence is currently stored in the database.
1014
// This function is a no-op if evidence is already stored, because
1115
// only one execution fork evidence can be stored at a time.
16+
// The caller must hold the [storage.LockInsertExecutionForkEvidence] lock.
1217
// No errors are expected during normal operations.
13-
StoreIfNotExists(conflictingSeals []*flow.IncorporatedResultSeal) error
18+
StoreIfNotExists(lctx lockctx.Proof, conflictingSeals []*flow.IncorporatedResultSeal) error
1419

1520
// Retrieve reads conflicting seals from the database.
1621
// No error is returned if database record doesn't exist.

storage/locks.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ const (
2929
LockBootstrapping = "lock_bootstrapping"
3030
// LockInsertChunkDataPack protects the insertion of chunk data packs (not yet used anywhere
3131
LockInsertChunkDataPack = "lock_insert_chunk_data_pack"
32-
LockInsertSafetyData = "lock_insert_safety_data"
33-
LockInsertLivenessData = "lock_insert_liveness_data"
32+
// LockInsertExecutionForkEvidence protects the insertion of execution fork evidence
33+
LockInsertExecutionForkEvidence = "lock_insert_execution_fork_evidence"
34+
LockInsertSafetyData = "lock_insert_safety_data"
35+
LockInsertLivenessData = "lock_insert_liveness_data"
3436
)
3537

3638
// Locks returns a list of all named locks used by the storage layer.
@@ -44,6 +46,7 @@ func Locks() []string {
4446
LockInsertCollection,
4547
LockBootstrapping,
4648
LockInsertChunkDataPack,
49+
LockInsertExecutionForkEvidence,
4750
LockInsertSafetyData,
4851
LockInsertLivenessData,
4952
}

storage/mock/execution_fork_evidence.go

Lines changed: 7 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

storage/operation/epoch_protocol_state.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,9 @@ func RetrieveEpochProtocolState(r storage.Reader, entryID flow.Identifier, entry
2828
// CAUTION:
2929
// - The caller must acquire the lock [storage.LockInsertBlock] and hold it until the database write has been committed.
3030
// - OVERWRITES existing data (potential for data corruption):
31-
// This method silently overrides existing data without any sanity checks whether data for the same key already exits.
32-
// Note that the Flow protocol mandates that for a previously persisted key, the data is never changed to a different
33-
// value. Changing data could cause the node to publish inconsistent data and to be slashed, or the protocol to be
34-
// compromised as a whole. This method does not contain any safeguards to prevent such data corruption. The lock proof
35-
// serves as a reminder that the CALLER is responsible to ensure that the DEDUPLICATION CHECK is done elsewhere
36-
// ATOMICALLY with this write operation.
31+
// The lock proof serves as a reminder that the CALLER is responsible to ensure that the DEDUPLICATION CHECK is done elsewhere
32+
// ATOMICALLY within this write operation. Currently it's done by operation.InsertHeader where it performs a check
33+
// to ensure the blockID is new, therefore any data indexed by this blockID is new as well.
3734
//
3835
// No error returns are expected during normal operation.
3936
func IndexEpochProtocolState(lctx lockctx.Proof, w storage.Writer, blockID flow.Identifier, epochProtocolStateEntryID flow.Identifier) error {

storage/operation/execution_fork_evidence.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package operation
22

33
import (
4+
"fmt"
5+
6+
"github.com/jordanschalm/lockctx"
7+
48
"github.com/onflow/flow-go/model/flow"
59
"github.com/onflow/flow-go/storage"
610
)
@@ -28,8 +32,24 @@ func RemoveExecutionForkEvidence(w storage.Writer) error {
2832
}
2933

3034
// InsertExecutionForkEvidence upserts conflicting seals to the database.
31-
// If a record already exists, it is overwritten; otherwise a new record is created.
35+
// If a record already exists, it is NOT overwritten, the new record is ignored.
36+
// The caller must hold the [storage.LockInsertExecutionForkEvidence] lock.
3237
// No errors are expected during normal operations.
33-
func InsertExecutionForkEvidence(w storage.Writer, conflictingSeals []*flow.IncorporatedResultSeal) error {
34-
return UpsertByKey(w, MakePrefix(codeExecutionFork), conflictingSeals)
38+
func InsertExecutionForkEvidence(lctx lockctx.Proof, rw storage.ReaderBatchWriter, conflictingSeals []*flow.IncorporatedResultSeal) error {
39+
if !lctx.HoldsLock(storage.LockInsertExecutionForkEvidence) {
40+
return fmt.Errorf("InsertExecutionForkEvidence requires LockInsertExecutionForkEvidence to be held")
41+
}
42+
key := MakePrefix(codeExecutionFork)
43+
exist, err := KeyExists(rw.GlobalReader(), key)
44+
if err != nil {
45+
return fmt.Errorf("failed to check if execution fork evidence exists: %w", err)
46+
}
47+
48+
if exist {
49+
// Some evidence about execution fork already stored;
50+
// We only keep the first evidence => nothing more to do
51+
return nil
52+
}
53+
54+
return UpsertByKey(rw.Writer(), key, conflictingSeals)
3555
}

storage/operation/execution_fork_evidence_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package operation_test
33
import (
44
"testing"
55

6+
"github.com/jordanschalm/lockctx"
7+
68
"github.com/onflow/flow-go/model/flow"
79
"github.com/onflow/flow-go/storage"
810
"github.com/onflow/flow-go/storage/operation"
@@ -27,6 +29,7 @@ func Test_ExecutionForkEvidenceOperations(t *testing.T) {
2729

2830
t.Run("Write evidence and retrieve", func(t *testing.T) {
2931
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
32+
lockManager := storage.NewTestingLockManager()
3033
exists, err := operation.HasExecutionForkEvidence(db.Reader())
3134
require.NoError(t, err)
3235
require.False(t, exists)
@@ -41,8 +44,10 @@ func Test_ExecutionForkEvidenceOperations(t *testing.T) {
4144
unittest.WithBlock(block))))
4245
}
4346

44-
err = db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
45-
return operation.InsertExecutionForkEvidence(rw.Writer(), conflictingSeals)
47+
err = unittest.WithLock(t, lockManager, storage.LockInsertExecutionForkEvidence, func(lctx lockctx.Context) error {
48+
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
49+
return operation.InsertExecutionForkEvidence(lctx, rw, conflictingSeals)
50+
})
4651
})
4752
require.NoError(t, err)
4853

0 commit comments

Comments
 (0)