|
| 1 | +package syncing |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "math/rand/v2" |
| 6 | + "testing" |
| 7 | + "time" |
| 8 | + |
| 9 | + "github.com/evstack/ev-node/block/internal/cache" |
| 10 | + "github.com/evstack/ev-node/block/internal/common" |
| 11 | + "github.com/evstack/ev-node/pkg/config" |
| 12 | + "github.com/evstack/ev-node/pkg/genesis" |
| 13 | + "github.com/evstack/ev-node/pkg/store" |
| 14 | + testmocks "github.com/evstack/ev-node/test/mocks" |
| 15 | + mocks "github.com/evstack/ev-node/test/mocks/external" |
| 16 | + "github.com/evstack/ev-node/types" |
| 17 | + "github.com/ipfs/go-datastore" |
| 18 | + dssync "github.com/ipfs/go-datastore/sync" |
| 19 | + "github.com/rs/zerolog" |
| 20 | + "github.com/stretchr/testify/assert" |
| 21 | + "github.com/stretchr/testify/mock" |
| 22 | + "github.com/stretchr/testify/require" |
| 23 | +) |
| 24 | + |
| 25 | +const daHeightOffset = 100 |
| 26 | + |
| 27 | +func BenchmarkSyncerIO(b *testing.B) { |
| 28 | + cases := map[string]struct { |
| 29 | + heights uint64 |
| 30 | + shuffledTx bool |
| 31 | + daDelay time.Duration |
| 32 | + execDelay time.Duration |
| 33 | + p2pEnabled bool |
| 34 | + p2pDelay time.Duration |
| 35 | + }{ |
| 36 | + "slow producer": {heights: 100, daDelay: 200 * time.Microsecond, execDelay: 0, p2pDelay: 0, p2pEnabled: false}, |
| 37 | + "slow consumer": {heights: 100, daDelay: 0, execDelay: 200 * time.Microsecond, p2pDelay: 0, p2pEnabled: false}, |
| 38 | + } |
| 39 | + for name, spec := range cases { |
| 40 | + b.Run(name, func(b *testing.B) { |
| 41 | + b.ResetTimer() |
| 42 | + for i := 0; i < b.N; i++ { |
| 43 | + fixt := newBenchFixture(b, spec.heights, spec.shuffledTx, spec.daDelay, spec.execDelay, true) |
| 44 | + |
| 45 | + // run both loops |
| 46 | + go fixt.s.processLoop() |
| 47 | + go fixt.s.syncLoop() |
| 48 | + |
| 49 | + require.Eventually(b, func() bool { |
| 50 | + processedHeight, _ := fixt.s.store.Height(b.Context()) |
| 51 | + return processedHeight == spec.heights |
| 52 | + }, 5*time.Second, 50*time.Microsecond) |
| 53 | + fixt.s.cancel() |
| 54 | + |
| 55 | + // Ensure clean end-state per iteration |
| 56 | + require.Len(b, fixt.s.cache.GetPendingEvents(), 0) |
| 57 | + require.Len(b, fixt.s.heightInCh, 0) |
| 58 | + |
| 59 | + assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daHeight) |
| 60 | + gotStoreHeight, err := fixt.s.store.Height(b.Context()) |
| 61 | + require.NoError(b, err) |
| 62 | + assert.Equal(b, spec.heights, gotStoreHeight) |
| 63 | + } |
| 64 | + }) |
| 65 | + } |
| 66 | +} |
| 67 | + |
| 68 | +type benchFixture struct { |
| 69 | + s *Syncer |
| 70 | + st store.Store |
| 71 | + cm cache.Manager |
| 72 | + cancel context.CancelFunc |
| 73 | +} |
| 74 | + |
| 75 | +func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay, execDelay time.Duration, includeP2P bool) *benchFixture { |
| 76 | + b.Helper() |
| 77 | + ctx, cancel := context.WithCancel(b.Context()) |
| 78 | + |
| 79 | + ds := dssync.MutexWrap(datastore.NewMapDatastore()) |
| 80 | + st := store.New(ds) |
| 81 | + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) |
| 82 | + require.NoError(b, err) |
| 83 | + |
| 84 | + addr, pub, signer := buildSyncTestSigner(b) |
| 85 | + cfg := config.DefaultConfig() |
| 86 | + // keep P2P ticker dormant unless we manually inject P2P events |
| 87 | + cfg.Node.BlockTime = config.DurationWrapper{Duration: 1} |
| 88 | + cfg.DA.StartHeight = daHeightOffset |
| 89 | + gen := genesis.Genesis{ChainID: "bchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} |
| 90 | + |
| 91 | + mockExec := testmocks.NewMockExecutor(b) |
| 92 | + // if execDelay > 0, sleep on ExecuteTxs to simulate slow consumer |
| 93 | + mockExec.On("ExecuteTxs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). |
| 94 | + Run(func(args mock.Arguments) { |
| 95 | + if execDelay > 0 { |
| 96 | + time.Sleep(execDelay) |
| 97 | + } |
| 98 | + }). |
| 99 | + Return([]byte("app"), uint64(1024), nil).Maybe() |
| 100 | + |
| 101 | + // Build syncer with mocks |
| 102 | + s := NewSyncer( |
| 103 | + st, |
| 104 | + mockExec, |
| 105 | + nil, // DA injected via mock retriever below |
| 106 | + cm, |
| 107 | + common.NopMetrics(), |
| 108 | + cfg, |
| 109 | + gen, |
| 110 | + nil, // headerStore not used; we inject P2P directly to channel when needed |
| 111 | + nil, // dataStore not used |
| 112 | + zerolog.Nop(), |
| 113 | + common.DefaultBlockOptions(), |
| 114 | + make(chan error, 1), |
| 115 | + ) |
| 116 | + require.NoError(b, s.initializeState()) |
| 117 | + s.ctx, s.cancel = ctx, cancel |
| 118 | + |
| 119 | + // prepare height events to emit |
| 120 | + heightEvents := make([]common.DAHeightEvent, totalHeights) |
| 121 | + for i := uint64(0); i < totalHeights; i++ { |
| 122 | + blockHeight, daHeight := i+gen.InitialHeight, i+daHeightOffset |
| 123 | + _, sh := makeSignedHeaderBytes(b, gen.ChainID, blockHeight, addr, pub, signer, nil, nil) |
| 124 | + d := &types.Data{Metadata: &types.Metadata{ChainID: gen.ChainID, Height: blockHeight, Time: uint64(time.Now().UnixNano())}} |
| 125 | + heightEvents[i] = common.DAHeightEvent{Header: sh, Data: d, DaHeight: daHeight, HeaderDaIncludedHeight: daHeight} |
| 126 | + } |
| 127 | + if shuffledTx { |
| 128 | + rand.New(rand.NewPCG(1, 2)).Shuffle(len(heightEvents), func(i, j int) { //nolint:gosec // false positive |
| 129 | + heightEvents[i], heightEvents[j] = heightEvents[j], heightEvents[i] |
| 130 | + }) |
| 131 | + } |
| 132 | + |
| 133 | + // Mock DA retriever to emit exactly totalHeights events, then HFF and cancel |
| 134 | + daR := newMockdaRetriever(b) |
| 135 | + for i := uint64(0); i < totalHeights; i++ { |
| 136 | + daHeight := i + daHeightOffset |
| 137 | + daR.On("RetrieveFromDA", mock.Anything, daHeight). |
| 138 | + Run(func(_ mock.Arguments) { |
| 139 | + if daDelay > 0 { |
| 140 | + time.Sleep(daDelay) |
| 141 | + } |
| 142 | + }). |
| 143 | + Return([]common.DAHeightEvent{heightEvents[i]}, nil).Once() |
| 144 | + } |
| 145 | + // after last, return height-from-future and stop when queue drains |
| 146 | + daR.On("RetrieveFromDA", mock.Anything, totalHeights+daHeightOffset). |
| 147 | + Return(nil, common.ErrHeightFromFutureStr).Maybe() |
| 148 | + |
| 149 | + // Attach mocks |
| 150 | + s.daRetriever = daR |
| 151 | + s.p2pHandler = newMockp2pHandler(b) // not used directly in this benchmark path |
| 152 | + headerP2PStore := mocks.NewMockStore[*types.SignedHeader](b) |
| 153 | + headerP2PStore.On("Height").Return(uint64(0)).Maybe() |
| 154 | + s.headerStore = headerP2PStore |
| 155 | + dataP2PStore := mocks.NewMockStore[*types.Data](b) |
| 156 | + dataP2PStore.On("Height").Return(uint64(0)).Maybe() |
| 157 | + s.dataStore = dataP2PStore |
| 158 | + return &benchFixture{s: s, st: st, cm: cm, cancel: cancel} |
| 159 | +} |
0 commit comments