Skip to content

Commit 75c573a

Browse files
committed
fix tests
1 parent 36804fe commit 75c573a

File tree

8 files changed

+44
-12
lines changed

8 files changed

+44
-12
lines changed

cmd/consensus/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ func main() {
396396
multipleReceiptsFilterMempool,
397397
consensusMempools.LogForkAndCrash(node.Logger),
398398
node.ProtocolDB,
399+
node.StorageLockMgr,
399400
node.Logger,
400401
)
401402
if err != nil {

module/mempool/consensus/exec_fork_suppressor.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"sync"
77

8+
"github.com/jordanschalm/lockctx"
89
"github.com/rs/zerolog"
910
"github.com/rs/zerolog/log"
1011
"go.uber.org/atomic"
@@ -368,9 +369,13 @@ func (s *ExecForkSuppressor) filterConflictingSeals(sealsByBlockID map[flow.Iden
368369
s.execForkDetected.Store(true)
369370
s.Clear()
370371
conflictingSeals = append(sealsList{candidateSeal}, conflictingSeals...)
371-
err := s.execForkEvidenceStore.StoreIfNotExists(conflictingSeals)
372+
373+
// Acquire lock and store execution fork evidence
374+
err := storage.WithLock(s.lockManager, storage.LockInsertExecutionForkEvidence, func(lctx lockctx.Context) error {
375+
return s.execForkEvidenceStore.StoreIfNotExists(lctx, conflictingSeals)
376+
})
372377
if err != nil {
373-
panic("failed to store execution fork evidence")
378+
s.log.Fatal().Msg("failed to store execution fork evidence")
374379
}
375380
s.onExecFork(conflictingSeals)
376381
return nil

module/mempool/consensus/exec_fork_suppressor_test.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -255,11 +255,12 @@ func Test_ForkDetectionPersisted(t *testing.T) {
255255

256256
// This function stores conflicting seals to the underlying database.
257257
func(t *testing.T, db storage.DB) {
258+
lockManager := storage.NewTestingLockManager()
258259

259260
// initialize ExecForkSuppressor
260261
wrappedMempool := &poolmock.IncorporatedResultSeals{}
261262
execForkActor := &actormock.ExecForkActor{}
262-
wrapper, _ := NewExecStateForkSuppressor(wrappedMempool, execForkActor.OnExecFork, db, zerolog.New(os.Stderr))
263+
wrapper, _ := NewExecStateForkSuppressor(wrappedMempool, execForkActor.OnExecFork, db, lockManager, zerolog.New(os.Stderr))
263264

264265
// add seal
265266
wrappedMempool.On("Add", sealA).Return(true, nil).Once()
@@ -285,14 +286,16 @@ func Test_ForkDetectionPersisted(t *testing.T) {
285286

286287
// This function retrieves conflicting seals from the same underlying database with a new instance of storage.DB.
287288
func(t *testing.T, db storage.DB) {
289+
lockManager2 := storage.NewTestingLockManager()
290+
288291
wrappedMempool2 := &poolmock.IncorporatedResultSeals{}
289292
execForkActor2 := &actormock.ExecForkActor{}
290293
execForkActor2.On("OnExecFork", mock.Anything).
291294
Run(func(args mock.Arguments) {
292295
conflictingSeals := args.Get(0).([]*flow.IncorporatedResultSeal)
293296
require.ElementsMatch(t, []*flow.IncorporatedResultSeal{sealA, sealB}, conflictingSeals)
294297
}).Return().Once()
295-
wrapper2, _ := NewExecStateForkSuppressor(wrappedMempool2, execForkActor2.OnExecFork, db, zerolog.New(os.Stderr))
298+
wrapper2, _ := NewExecStateForkSuppressor(wrappedMempool2, execForkActor2.OnExecFork, db, lockManager2, zerolog.New(os.Stderr))
296299

297300
// add another (non-conflicting) seal to ExecForkSuppressor
298301
// fail test if seal is added to wrapped mempool
@@ -318,8 +321,10 @@ func Test_AddRemove_SmokeTest(t *testing.T) {
318321
require.Fail(t, "no call to onExecFork expected ")
319322
}
320323
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
324+
lockManager := storage.NewTestingLockManager()
325+
321326
wrappedMempool := stdmap.NewIncorporatedResultSeals(100)
322-
wrapper, err := NewExecStateForkSuppressor(wrappedMempool, onExecFork, db, zerolog.New(os.Stderr))
327+
wrapper, err := NewExecStateForkSuppressor(wrappedMempool, onExecFork, db, lockManager, zerolog.New(os.Stderr))
323328
require.NoError(t, err)
324329
require.NotNil(t, wrapper)
325330

@@ -355,6 +360,8 @@ func Test_AddRemove_SmokeTest(t *testing.T) {
355360
// Test adding conflicting seals with different number of matching receipts.
356361
func Test_ConflictingSeal_SmokeTest(t *testing.T) {
357362
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
363+
lockManager := storage.NewTestingLockManager()
364+
358365
executingForkDetected := atomic.NewBool(false)
359366
onExecFork := func([]*flow.IncorporatedResultSeal) {
360367
executingForkDetected.Store(true)
@@ -363,7 +370,7 @@ func Test_ConflictingSeal_SmokeTest(t *testing.T) {
363370
rawMempool := stdmap.NewIncorporatedResultSeals(100)
364371
receiptsDB := mockstorage.NewExecutionReceipts(t)
365372
wrappedMempool := NewIncorporatedResultSeals(rawMempool, receiptsDB)
366-
wrapper, err := NewExecStateForkSuppressor(wrappedMempool, onExecFork, db, zerolog.New(os.Stderr))
373+
wrapper, err := NewExecStateForkSuppressor(wrappedMempool, onExecFork, db, lockManager, zerolog.New(os.Stderr))
367374
require.NoError(t, err)
368375
require.NotNil(t, wrapper)
369376

@@ -426,9 +433,11 @@ func Test_ConflictingSeal_SmokeTest(t *testing.T) {
426433
// 4. executes the `testLogic`
427434
func WithExecStateForkSuppressor(t *testing.T, testLogic func(wrapper *ExecForkSuppressor, wrappedMempool *poolmock.IncorporatedResultSeals, execForkActor *actormock.ExecForkActor)) {
428435
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
436+
lockManager := storage.NewTestingLockManager()
437+
429438
wrappedMempool := &poolmock.IncorporatedResultSeals{}
430439
execForkActor := &actormock.ExecForkActor{}
431-
wrapper, err := NewExecStateForkSuppressor(wrappedMempool, execForkActor.OnExecFork, db, zerolog.New(os.Stderr))
440+
wrapper, err := NewExecStateForkSuppressor(wrappedMempool, execForkActor.OnExecFork, db, lockManager, zerolog.New(os.Stderr))
432441
require.NoError(t, err)
433442
require.NotNil(t, wrapper)
434443
testLogic(wrapper, wrappedMempool, execForkActor)

storage/execution_fork_evidence.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package storage
22

33
import (
44
"github.com/jordanschalm/lockctx"
5+
56
"github.com/onflow/flow-go/model/flow"
67
)
78

storage/operation/execution_fork_evidence.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55

66
"github.com/jordanschalm/lockctx"
7+
78
"github.com/onflow/flow-go/model/flow"
89
"github.com/onflow/flow-go/storage"
910
)

storage/operation/execution_fork_evidence_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package operation_test
33
import (
44
"testing"
55

6+
"github.com/jordanschalm/lockctx"
7+
68
"github.com/onflow/flow-go/model/flow"
79
"github.com/onflow/flow-go/storage"
810
"github.com/onflow/flow-go/storage/operation"
@@ -27,6 +29,7 @@ func Test_ExecutionForkEvidenceOperations(t *testing.T) {
2729

2830
t.Run("Write evidence and retrieve", func(t *testing.T) {
2931
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
32+
lockManager := storage.NewTestingLockManager()
3033
exists, err := operation.HasExecutionForkEvidence(db.Reader())
3134
require.NoError(t, err)
3235
require.False(t, exists)
@@ -41,8 +44,10 @@ func Test_ExecutionForkEvidenceOperations(t *testing.T) {
4144
unittest.WithBlock(block))))
4245
}
4346

44-
err = db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
45-
return operation.InsertExecutionForkEvidence(rw.Writer(), conflictingSeals)
47+
err = unittest.WithLock(t, lockManager, storage.LockInsertExecutionForkEvidence, func(lctx lockctx.Context) error {
48+
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
49+
return operation.InsertExecutionForkEvidence(lctx, rw, conflictingSeals)
50+
})
4651
})
4752
require.NoError(t, err)
4853

storage/store/execution_fork_evidence.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66

77
"github.com/jordanschalm/lockctx"
8+
89
"github.com/onflow/flow-go/model/flow"
910
"github.com/onflow/flow-go/storage"
1011
"github.com/onflow/flow-go/storage/operation"

storage/store/execution_fork_evidence_test.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package store_test
33
import (
44
"testing"
55

6+
"github.com/jordanschalm/lockctx"
67
"github.com/stretchr/testify/require"
78

89
"github.com/onflow/flow-go/model/flow"
@@ -25,6 +26,7 @@ func TestExecutionForkEvidenceStoreAndRetrieve(t *testing.T) {
2526

2627
t.Run("Store and read evidence", func(t *testing.T) {
2728
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
29+
lockManager := storage.NewTestingLockManager()
2830
block := unittest.BlockFixture()
2931

3032
conflictingSeals := make([]*flow.IncorporatedResultSeal, 2)
@@ -37,7 +39,9 @@ func TestExecutionForkEvidenceStoreAndRetrieve(t *testing.T) {
3739

3840
evidenceStore := store.NewExecutionForkEvidence(db)
3941

40-
err := evidenceStore.StoreIfNotExists(conflictingSeals)
42+
err := unittest.WithLock(t, lockManager, storage.LockInsertExecutionForkEvidence, func(lctx lockctx.Context) error {
43+
return evidenceStore.StoreIfNotExists(lctx, conflictingSeals)
44+
})
4145
require.NoError(t, err)
4246

4347
retrievedConflictingSeals, err := evidenceStore.Retrieve()
@@ -48,6 +52,7 @@ func TestExecutionForkEvidenceStoreAndRetrieve(t *testing.T) {
4852

4953
t.Run("Don't overwrite evidence", func(t *testing.T) {
5054
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
55+
lockManager := storage.NewTestingLockManager()
5156
block := unittest.BlockFixture()
5257

5358
conflictingSeals := make([]*flow.IncorporatedResultSeal, 2)
@@ -70,7 +75,9 @@ func TestExecutionForkEvidenceStoreAndRetrieve(t *testing.T) {
7075

7176
// Store and read evidence.
7277
{
73-
err := evidenceStore.StoreIfNotExists(conflictingSeals)
78+
err := unittest.WithLock(t, lockManager, storage.LockInsertExecutionForkEvidence, func(lctx lockctx.Context) error {
79+
return evidenceStore.StoreIfNotExists(lctx, conflictingSeals)
80+
})
7481
require.NoError(t, err)
7582

7683
retrievedConflictingSeals, err := evidenceStore.Retrieve()
@@ -80,7 +87,9 @@ func TestExecutionForkEvidenceStoreAndRetrieve(t *testing.T) {
8087

8188
// Overwriting existing evidence is no-op.
8289
{
83-
err := evidenceStore.StoreIfNotExists(conflictingSeals2)
90+
err := unittest.WithLock(t, lockManager, storage.LockInsertExecutionForkEvidence, func(lctx lockctx.Context) error {
91+
return evidenceStore.StoreIfNotExists(lctx, conflictingSeals2)
92+
})
8493
require.NoError(t, err)
8594

8695
retrievedConflictingSeals, err := evidenceStore.Retrieve()

0 commit comments

Comments
 (0)