Skip to content

Commit 36804fe

Browse files
committed
refactor inserting execution fork evidence
1 parent 7a3c1ee commit 36804fe

File tree

5 files changed

+38
-25
lines changed

5 files changed

+38
-25
lines changed

module/mempool/consensus/exec_fork_suppressor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type ExecForkSuppressor struct {
4646
execForkDetected atomic.Bool
4747
onExecFork ExecForkActor
4848
execForkEvidenceStore storage.ExecutionForkEvidence
49+
lockManager storage.LockManager
4950
log zerolog.Logger
5051
}
5152

@@ -61,6 +62,7 @@ func NewExecStateForkSuppressor(
6162
seals mempool.IncorporatedResultSeals,
6263
onExecFork ExecForkActor,
6364
db storage.DB,
65+
lockManager storage.LockManager,
6466
log zerolog.Logger,
6567
) (*ExecForkSuppressor, error) {
6668
executionForkEvidenceStore := store.NewExecutionForkEvidence(db)
@@ -83,6 +85,7 @@ func NewExecStateForkSuppressor(
8385
execForkDetected: *atomic.NewBool(execForkDetectedFlag),
8486
onExecFork: onExecFork,
8587
execForkEvidenceStore: executionForkEvidenceStore,
88+
lockManager: lockManager,
8689
log: log.With().Str("mempool", "ExecForkSuppressor").Logger(),
8790
}
8891

storage/execution_fork_evidence.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package storage
22

3-
import "github.com/onflow/flow-go/model/flow"
3+
import (
4+
"github.com/jordanschalm/lockctx"
5+
"github.com/onflow/flow-go/model/flow"
6+
)
47

58
// ExecutionForkEvidence represents persistent storage for execution fork evidence.
69
// CAUTION: Not safe for concurrent use by multiple goroutines.
@@ -9,8 +12,9 @@ type ExecutionForkEvidence interface {
912
// if no execution fork evidence is currently stored in the database.
1013
// This function is a no-op if evidence is already stored, because
1114
// only one execution fork evidence can be stored at a time.
15+
// The caller must hold the [storage.LockInsertExecutionForkEvidence] lock.
1216
// No errors are expected during normal operations.
13-
StoreIfNotExists(conflictingSeals []*flow.IncorporatedResultSeal) error
17+
StoreIfNotExists(lctx lockctx.Proof, conflictingSeals []*flow.IncorporatedResultSeal) error
1418

1519
// Retrieve reads conflicting seals from the database.
1620
// No error is returned if database record doesn't exist.

storage/locks.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ const (
2929
LockBootstrapping = "lock_bootstrapping"
3030
// LockInsertChunkDataPack protects the insertion of chunk data packs (not yet used anywhere
3131
LockInsertChunkDataPack = "lock_insert_chunk_data_pack"
32+
// LockInsertExecutionForkEvidence protects the insertion of execution fork evidence
33+
LockInsertExecutionForkEvidence = "lock_insert_execution_fork_evidence"
3234
)
3335

3436
// Locks returns a list of all named locks used by the storage layer.
@@ -42,6 +44,7 @@ func Locks() []string {
4244
LockInsertCollection,
4345
LockBootstrapping,
4446
LockInsertChunkDataPack,
47+
LockInsertExecutionForkEvidence,
4548
}
4649
}
4750

storage/operation/execution_fork_evidence.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package operation
22

33
import (
4+
"fmt"
5+
6+
"github.com/jordanschalm/lockctx"
47
"github.com/onflow/flow-go/model/flow"
58
"github.com/onflow/flow-go/storage"
69
)
@@ -28,8 +31,24 @@ func RemoveExecutionForkEvidence(w storage.Writer) error {
2831
}
2932

3033
// InsertExecutionForkEvidence upserts conflicting seals to the database.
31-
// If a record already exists, it is overwritten; otherwise a new record is created.
34+
// If a record already exists, it is NOT overwritten, the new record is ignored.
35+
// The caller must hold the [storage.LockInsertExecutionForkEvidence] lock.
3236
// No errors are expected during normal operations.
33-
func InsertExecutionForkEvidence(w storage.Writer, conflictingSeals []*flow.IncorporatedResultSeal) error {
34-
return UpsertByKey(w, MakePrefix(codeExecutionFork), conflictingSeals)
37+
func InsertExecutionForkEvidence(lctx lockctx.Proof, rw storage.ReaderBatchWriter, conflictingSeals []*flow.IncorporatedResultSeal) error {
38+
if !lctx.HoldsLock(storage.LockInsertExecutionForkEvidence) {
39+
return fmt.Errorf("InsertExecutionForkEvidence requires LockInsertBlock to be held")
40+
}
41+
key := MakePrefix(codeExecutionFork)
42+
exist, err := KeyExists(rw.GlobalReader(), MakePrefix(codeExecutionFork))
43+
if err != nil {
44+
return fmt.Errorf("failed to check if execution fork evidence exists: %w", err)
45+
}
46+
47+
if exist {
48+
// Some evidence about execution fork already stored;
49+
// We only keep the first evidence => nothing more to do
50+
return nil
51+
}
52+
53+
return UpsertByKey(rw.Writer(), key, conflictingSeals)
3554
}

storage/store/execution_fork_evidence.go

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"errors"
55
"fmt"
66

7+
"github.com/jordanschalm/lockctx"
78
"github.com/onflow/flow-go/model/flow"
89
"github.com/onflow/flow-go/storage"
910
"github.com/onflow/flow-go/storage/operation"
@@ -23,28 +24,11 @@ func NewExecutionForkEvidence(db storage.DB) *ExecutionForkEvidence {
2324

2425
// StoreIfNotExists stores the given conflictingSeals to the database.
2526
// This is a no-op if there is already a record in the database with the same key.
27+
// The caller must hold the [storage.LockInsertExecutionForkEvidence] lock.
2628
// No errors are expected during normal operations.
27-
// CAUTION: This function is not safe for concurrent use by multiple goroutines. For safety,
28-
// we rely on the fact that Execution Fork Evidence has a very small surface area of use
29-
// in the Execution Fork Suppressor. The Execution Fork Suppressor is responsible for
30-
// ensuring mutually exclusive access to this function.
31-
func (efe *ExecutionForkEvidence) StoreIfNotExists(conflictingSeals []*flow.IncorporatedResultSeal) error {
29+
func (efe *ExecutionForkEvidence) StoreIfNotExists(lctx lockctx.Proof, conflictingSeals []*flow.IncorporatedResultSeal) error {
3230
return efe.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
33-
found, err := operation.HasExecutionForkEvidence(rw.GlobalReader())
34-
if err != nil {
35-
return fmt.Errorf("failed to check if evidence about execution fork exists: %w", err)
36-
}
37-
if found {
38-
// Some evidence about execution fork already stored;
39-
// We only keep the first evidence => nothing more to do
40-
return nil
41-
}
42-
43-
err = operation.InsertExecutionForkEvidence(rw.Writer(), conflictingSeals)
44-
if err != nil {
45-
return fmt.Errorf("failed to store evidence about execution fork: %w", err)
46-
}
47-
return nil
31+
return operation.InsertExecutionForkEvidence(lctx, rw, conflictingSeals)
4832
})
4933
}
5034

0 commit comments

Comments
 (0)