Skip to content

Commit 1ff30ea

Browse files
authored
feat(store)!: add batching for atomicity (#2746)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> --> closes #2274
1 parent 05124cc commit 1ff30ea

19 files changed

+508
-392
lines changed

block/internal/cache/bench_test.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,19 @@ func benchSetupStore(b *testing.B, n int, txsPer int, chainID string) store.Stor
3636
ctx := context.Background()
3737
for i := 1; i <= n; i++ {
3838
h, d := types.GetRandomBlock(uint64(i), txsPer, chainID)
39-
if err := st.SaveBlockData(ctx, h, d, &types.Signature{}); err != nil {
39+
batch, err := st.NewBatch(ctx)
40+
if err != nil {
41+
b.Fatal(err)
42+
}
43+
if err := batch.SaveBlockData(h, d, &types.Signature{}); err != nil {
44+
b.Fatal(err)
45+
}
46+
if err := batch.SetHeight(uint64(i)); err != nil {
47+
b.Fatal(err)
48+
}
49+
if err := batch.Commit(); err != nil {
4050
b.Fatal(err)
4151
}
42-
}
43-
if err := st.SetHeight(ctx, uint64(n)); err != nil {
44-
b.Fatal(err)
4552
}
4653
return st
4754
}

block/internal/cache/manager_test.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,15 +161,20 @@ func TestPendingHeadersAndData_Flow(t *testing.T) {
161161
h2, d2 := types.GetRandomBlock(2, 1, chainID)
162162
h3, d3 := types.GetRandomBlock(3, 2, chainID)
163163

164-
// persist in store and set height
165-
for _, pair := range []struct {
164+
// persist in store and set height using batch
165+
for i, pair := range []struct {
166166
h *types.SignedHeader
167167
d *types.Data
168168
}{{h1, d1}, {h2, d2}, {h3, d3}} {
169-
err := st.SaveBlockData(ctx, pair.h, pair.d, &types.Signature{})
169+
batch, err := st.NewBatch(ctx)
170+
require.NoError(t, err)
171+
err = batch.SaveBlockData(pair.h, pair.d, &types.Signature{})
172+
require.NoError(t, err)
173+
err = batch.SetHeight(uint64(i + 1))
174+
require.NoError(t, err)
175+
err = batch.Commit()
170176
require.NoError(t, err)
171177
}
172-
require.NoError(t, st.SetHeight(ctx, 3))
173178

174179
// construct manager which brings up pending managers
175180
cfg := tempConfig(t)

block/internal/cache/pending_base_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@ func TestPendingBase_PersistLastSubmitted(t *testing.T) {
5858
require.NoError(t, err)
5959

6060
// store height 3 to make numPending meaningful
61-
require.NoError(t, st.SetHeight(ctx, 3))
61+
batch, err := st.NewBatch(ctx)
62+
require.NoError(t, err)
63+
require.NoError(t, batch.SetHeight(3))
64+
require.NoError(t, batch.Commit())
6265
assert.Equal(t, uint64(3), ph.NumPendingHeaders())
6366

6467
// set last submitted higher and ensure metadata is written

block/internal/cache/pending_data_test.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,16 @@ func TestPendingData_BasicFlow(t *testing.T) {
2323
h2, d2 := types.GetRandomBlock(2, 1, chainID)
2424
h3, d3 := types.GetRandomBlock(3, 2, chainID)
2525

26-
for _, p := range []struct {
26+
for i, p := range []struct {
2727
h *types.SignedHeader
2828
d *types.Data
2929
}{{h1, d1}, {h2, d2}, {h3, d3}} {
30-
require.NoError(t, store.SaveBlockData(ctx, p.h, p.d, &types.Signature{}))
30+
batch, err := store.NewBatch(ctx)
31+
require.NoError(t, err)
32+
require.NoError(t, batch.SaveBlockData(p.h, p.d, &types.Signature{}))
33+
require.NoError(t, batch.SetHeight(uint64(i+1)))
34+
require.NoError(t, batch.Commit())
3135
}
32-
require.NoError(t, store.SetHeight(ctx, 3))
3336

3437
pendingData, err := NewPendingData(store, zerolog.Nop())
3538
require.NoError(t, err)
@@ -69,7 +72,10 @@ func TestPendingData_InitFromMetadata(t *testing.T) {
6972
require.NoError(t, store.SetMetadata(ctx, LastSubmittedDataHeightKey, bz))
7073

7174
// store height is 3
72-
require.NoError(t, store.SetHeight(ctx, 3))
75+
batch, err := store.NewBatch(ctx)
76+
require.NoError(t, err)
77+
require.NoError(t, batch.SetHeight(3))
78+
require.NoError(t, batch.Commit())
7379

7480
pendingData, err := NewPendingData(store, zerolog.Nop())
7581
require.NoError(t, err)
@@ -82,7 +88,10 @@ func TestPendingData_GetPending_PropagatesFetchError(t *testing.T) {
8288
store := memStore(t)
8389

8490
// Set height to 1 but do not save any block data
85-
require.NoError(t, store.SetHeight(ctx, 1))
91+
batch, err := store.NewBatch(ctx)
92+
require.NoError(t, err)
93+
require.NoError(t, batch.SetHeight(1))
94+
require.NoError(t, batch.Commit())
8695

8796
pendingData, err := NewPendingData(store, zerolog.Nop())
8897
require.NoError(t, err)

block/internal/cache/pending_headers_test.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,16 @@ func TestPendingHeaders_BasicFlow(t *testing.T) {
2323
h2, d2 := types.GetRandomBlock(2, 1, chainID)
2424
h3, d3 := types.GetRandomBlock(3, 2, chainID)
2525

26-
for _, p := range []struct {
26+
for i, p := range []struct {
2727
h *types.SignedHeader
2828
d *types.Data
2929
}{{h1, d1}, {h2, d2}, {h3, d3}} {
30-
require.NoError(t, store.SaveBlockData(ctx, p.h, p.d, &types.Signature{}))
30+
batch, err := store.NewBatch(ctx)
31+
require.NoError(t, err)
32+
require.NoError(t, batch.SaveBlockData(p.h, p.d, &types.Signature{}))
33+
require.NoError(t, batch.SetHeight(uint64(i+1)))
34+
require.NoError(t, batch.Commit())
3135
}
32-
require.NoError(t, store.SetHeight(ctx, 3))
3336

3437
pendingHeaders, err := NewPendingHeaders(store, zerolog.Nop())
3538
require.NoError(t, err)
@@ -67,8 +70,11 @@ func TestPendingHeaders_EmptyWhenUpToDate(t *testing.T) {
6770
store := memStore(t)
6871

6972
h, d := types.GetRandomBlock(1, 1, "ph-up")
70-
require.NoError(t, store.SaveBlockData(ctx, h, d, &types.Signature{}))
71-
require.NoError(t, store.SetHeight(ctx, 1))
73+
batch, err := store.NewBatch(ctx)
74+
require.NoError(t, err)
75+
require.NoError(t, batch.SaveBlockData(h, d, &types.Signature{}))
76+
require.NoError(t, batch.SetHeight(1))
77+
require.NoError(t, batch.Commit())
7278

7379
pendingHeaders, err := NewPendingHeaders(store, zerolog.Nop())
7480
require.NoError(t, err)

block/internal/executing/executor.go

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -211,10 +211,17 @@ func (e *Executor) initializeState() error {
211211

212212
e.setLastState(state)
213213

214-
// Set store height
215-
if err := e.store.SetHeight(e.ctx, state.LastBlockHeight); err != nil {
214+
// Initialize store height using batch for atomicity
215+
batch, err := e.store.NewBatch(e.ctx)
216+
if err != nil {
217+
return fmt.Errorf("failed to create batch: %w", err)
218+
}
219+
if err := batch.SetHeight(state.LastBlockHeight); err != nil {
216220
return fmt.Errorf("failed to set store height: %w", err)
217221
}
222+
if err := batch.Commit(); err != nil {
223+
return fmt.Errorf("failed to commit batch: %w", err)
224+
}
218225

219226
e.logger.Info().Uint64("height", state.LastBlockHeight).
220227
Str("chain_id", state.ChainID).Msg("initialized state")
@@ -354,8 +361,15 @@ func (e *Executor) produceBlock() error {
354361
}
355362

356363
// saved early for crash recovery, will be overwritten later with the final signature
357-
if err = e.store.SaveBlockData(e.ctx, header, data, &types.Signature{}); err != nil {
358-
return fmt.Errorf("failed to save block: %w", err)
364+
batch, err := e.store.NewBatch(e.ctx)
365+
if err != nil {
366+
return fmt.Errorf("failed to create batch for early save: %w", err)
367+
}
368+
if err = batch.SaveBlockData(header, data, &types.Signature{}); err != nil {
369+
return fmt.Errorf("failed to save block data: %w", err)
370+
}
371+
if err = batch.Commit(); err != nil {
372+
return fmt.Errorf("failed to commit early save batch: %w", err)
359373
}
360374
}
361375

@@ -377,20 +391,31 @@ func (e *Executor) produceBlock() error {
377391
return fmt.Errorf("failed to validate block: %w", err)
378392
}
379393

380-
if err := e.store.SaveBlockData(e.ctx, header, data, &signature); err != nil {
394+
batch, err := e.store.NewBatch(e.ctx)
395+
if err != nil {
396+
return fmt.Errorf("failed to create batch: %w", err)
397+
}
398+
399+
if err := batch.SaveBlockData(header, data, &signature); err != nil {
381400
return fmt.Errorf("failed to save block: %w", err)
382401
}
383402

384-
// Once the SaveBlockData has been saved we must update the height and the state.
385-
// context.TODO() should be reverted to the real context (e.ctx) once https://github.com/evstack/ev-node/issues/2274 has been implemented, this prevents context cancellation
386-
if err := e.store.SetHeight(context.TODO(), newHeight); err != nil {
403+
if err := batch.SetHeight(newHeight); err != nil {
387404
return fmt.Errorf("failed to update store height: %w", err)
388405
}
389406

390-
if err := e.updateState(context.TODO(), newState); err != nil {
407+
if err := batch.UpdateState(newState); err != nil {
391408
return fmt.Errorf("failed to update state: %w", err)
392409
}
393410

411+
if err := batch.Commit(); err != nil {
412+
return fmt.Errorf("failed to commit batch: %w", err)
413+
}
414+
415+
// Update in-memory state after successful commit
416+
e.setLastState(newState)
417+
e.metrics.Height.Set(float64(newState.LastBlockHeight))
418+
394419
// broadcast header and data to P2P network
395420
g, ctx := errgroup.WithContext(e.ctx)
396421
g.Go(func() error { return e.headerBroadcaster.WriteToStoreAndBroadcast(ctx, header) })
@@ -608,18 +633,6 @@ func (e *Executor) validateBlock(lastState types.State, header *types.SignedHead
608633
return nil
609634
}
610635

611-
// updateState saves the new state
612-
func (e *Executor) updateState(ctx context.Context, newState types.State) error {
613-
if err := e.store.UpdateState(ctx, newState); err != nil {
614-
return err
615-
}
616-
617-
e.setLastState(newState)
618-
e.metrics.Height.Set(float64(newState.LastBlockHeight))
619-
620-
return nil
621-
}
622-
623636
// sendCriticalError sends a critical error to the error channel without blocking
624637
func (e *Executor) sendCriticalError(err error) {
625638
if e.errorCh != nil {

block/internal/executing/executor_restart_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,11 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
152152
pendingHeader.DataHash = pendingData.DACommitment()
153153

154154
// Save pending block data (this is what would happen during a crash)
155-
err = memStore.SaveBlockData(context.Background(), pendingHeader, pendingData, &types.Signature{})
155+
batch, err := memStore.NewBatch(context.Background())
156+
require.NoError(t, err)
157+
err = batch.SaveBlockData(pendingHeader, pendingData, &types.Signature{})
158+
require.NoError(t, err)
159+
err = batch.Commit()
156160
require.NoError(t, err)
157161

158162
// Stop first executor (simulating crash/restart)

block/internal/submitting/da_submitter_integration_test.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,20 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted(
6767
// persist to store
6868
sig1t := types.Signature(sig1)
6969
sig2t := types.Signature(sig2)
70-
require.NoError(t, st.SaveBlockData(context.Background(), hdr1, data1, &sig1t))
71-
require.NoError(t, st.SaveBlockData(context.Background(), hdr2, data2, &sig2t))
72-
require.NoError(t, st.SetHeight(context.Background(), 2))
70+
71+
// Save block 1
72+
batch1, err := st.NewBatch(context.Background())
73+
require.NoError(t, err)
74+
require.NoError(t, batch1.SaveBlockData(hdr1, data1, &sig1t))
75+
require.NoError(t, batch1.SetHeight(1))
76+
require.NoError(t, batch1.Commit())
77+
78+
// Save block 2
79+
batch2, err := st.NewBatch(context.Background())
80+
require.NoError(t, err)
81+
require.NoError(t, batch2.SaveBlockData(hdr2, data2, &sig2t))
82+
require.NoError(t, batch2.SetHeight(2))
83+
require.NoError(t, batch2.Commit())
7384

7485
// Dummy DA
7586
dummyDA := coreda.NewDummyDA(10_000_000, 0, 0, 10*time.Millisecond)

block/internal/submitting/da_submitter_test.go

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -167,12 +167,23 @@ func TestDASubmitter_SubmitHeaders_Success(t *testing.T) {
167167
// Save to store to make them pending
168168
sig1 := header1.Signature
169169
sig2 := header2.Signature
170-
require.NoError(t, st.SaveBlockData(ctx, header1, data1, &sig1))
171-
require.NoError(t, st.SaveBlockData(ctx, header2, data2, &sig2))
172-
require.NoError(t, st.SetHeight(ctx, 2))
170+
171+
// Save block 1
172+
batch1, err := st.NewBatch(ctx)
173+
require.NoError(t, err)
174+
require.NoError(t, batch1.SaveBlockData(header1, data1, &sig1))
175+
require.NoError(t, batch1.SetHeight(1))
176+
require.NoError(t, batch1.Commit())
177+
178+
// Save block 2
179+
batch2, err := st.NewBatch(ctx)
180+
require.NoError(t, err)
181+
require.NoError(t, batch2.SaveBlockData(header2, data2, &sig2))
182+
require.NoError(t, batch2.SetHeight(2))
183+
require.NoError(t, batch2.Commit())
173184

174185
// Submit headers
175-
err := submitter.SubmitHeaders(ctx, cm)
186+
err = submitter.SubmitHeaders(ctx, cm)
176187
require.NoError(t, err)
177188

178189
// Verify headers are marked as DA included
@@ -253,12 +264,23 @@ func TestDASubmitter_SubmitData_Success(t *testing.T) {
253264
// Save to store to make them pending
254265
sig1 := types.Signature([]byte("sig1"))
255266
sig2 := types.Signature([]byte("sig2"))
256-
require.NoError(t, st.SaveBlockData(ctx, header1, data1, &sig1))
257-
require.NoError(t, st.SaveBlockData(ctx, header2, data2, &sig2))
258-
require.NoError(t, st.SetHeight(ctx, 2))
267+
268+
// Save block 1
269+
batch1, err := st.NewBatch(ctx)
270+
require.NoError(t, err)
271+
require.NoError(t, batch1.SaveBlockData(header1, data1, &sig1))
272+
require.NoError(t, batch1.SetHeight(1))
273+
require.NoError(t, batch1.Commit())
274+
275+
// Save block 2
276+
batch2, err := st.NewBatch(ctx)
277+
require.NoError(t, err)
278+
require.NoError(t, batch2.SaveBlockData(header2, data2, &sig2))
279+
require.NoError(t, batch2.SetHeight(2))
280+
require.NoError(t, batch2.Commit())
259281

260282
// Submit data
261-
err := submitter.SubmitData(ctx, cm, signer, gen)
283+
err = submitter.SubmitData(ctx, cm, signer, gen)
262284
require.NoError(t, err)
263285

264286
// Verify data is marked as DA included
@@ -302,11 +324,14 @@ func TestDASubmitter_SubmitData_SkipsEmptyData(t *testing.T) {
302324

303325
// Save to store
304326
sig := types.Signature([]byte("sig"))
305-
require.NoError(t, st.SaveBlockData(ctx, header, emptyData, &sig))
306-
require.NoError(t, st.SetHeight(ctx, 1))
327+
batch, err := st.NewBatch(ctx)
328+
require.NoError(t, err)
329+
require.NoError(t, batch.SaveBlockData(header, emptyData, &sig))
330+
require.NoError(t, batch.SetHeight(1))
331+
require.NoError(t, batch.Commit())
307332

308333
// Submit data - should succeed but skip empty data
309-
err := submitter.SubmitData(ctx, cm, signer, gen)
334+
err = submitter.SubmitData(ctx, cm, signer, gen)
310335
require.NoError(t, err)
311336

312337
// Empty data should not be marked as DA included (it's implicitly included)
@@ -353,11 +378,14 @@ func TestDASubmitter_SubmitData_NilSigner(t *testing.T) {
353378

354379
// Save to store
355380
sig := types.Signature([]byte("sig"))
356-
require.NoError(t, st.SaveBlockData(ctx, header, data, &sig))
357-
require.NoError(t, st.SetHeight(ctx, 1))
381+
batch, err := st.NewBatch(ctx)
382+
require.NoError(t, err)
383+
require.NoError(t, batch.SaveBlockData(header, data, &sig))
384+
require.NoError(t, batch.SetHeight(1))
385+
require.NoError(t, batch.Commit())
358386

359387
// Submit data with nil signer - should fail
360-
err := submitter.SubmitData(ctx, cm, nil, gen)
388+
err = submitter.SubmitData(ctx, cm, nil, gen)
361389
require.Error(t, err)
362390
assert.Contains(t, err.Error(), "signer is nil")
363391
}

0 commit comments

Comments
 (0)