Skip to content

Commit 4aed9fd

Browse files
authored
refactor(block): speed-up cache by retrieving only next pending event (#2702)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview Speed-up cache by retrieving only next pending event <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> -->
1 parent 2ccc6e6 commit 4aed9fd

File tree

8 files changed

+101
-107
lines changed

8 files changed

+101
-107
lines changed

block/internal/cache/bench_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cache
22

33
import (
44
"context"
5+
"math/rand/v2"
56
"testing"
67

78
"github.com/rs/zerolog"
@@ -107,10 +108,9 @@ func BenchmarkManager_PendingEventsSnapshot(b *testing.B) {
107108
b.ReportAllocs()
108109
b.ResetTimer()
109110
for b.Loop() {
110-
ev := m.GetPendingEvents()
111-
if len(ev) == 0 {
112-
b.Fatal("unexpected empty events")
113-
}
111+
// Test getting next pending event at various heights
112+
height := rand.N(uint64(50_000)) + 1 //nolint:gosec // this is a benchmark test
113+
_ = m.GetNextPendingEvent(height)
114114
}
115115
}
116116

block/internal/cache/generic_cache.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,6 @@ func (c *Cache[T]) setItem(height uint64, item *T) {
4646
c.itemsByHeight.Store(height, item)
4747
}
4848

49-
// deleteItem deletes an item from the cache by height
50-
func (c *Cache[T]) deleteItem(height uint64) {
51-
c.itemsByHeight.Delete(height)
52-
}
53-
5449
// rangeByHeight iterates over items keyed by height in an unspecified order and calls fn for each.
5550
// If fn returns false, iteration stops early.
5651
func (c *Cache[T]) rangeByHeight(fn func(height uint64, item *T) bool) {
@@ -67,6 +62,20 @@ func (c *Cache[T]) rangeByHeight(fn func(height uint64, item *T) bool) {
6762
})
6863
}
6964

65+
// getNextItem returns the item at the specified height and removes it from cache if found.
66+
// Returns nil if not found.
67+
func (c *Cache[T]) getNextItem(height uint64) *T {
68+
item, loaded := c.itemsByHeight.LoadAndDelete(height)
69+
if !loaded {
70+
return nil
71+
}
72+
val, ok := item.(*T)
73+
if !ok {
74+
return nil
75+
}
76+
return val
77+
}
78+
7079
// isSeen returns true if the hash has been seen
7180
func (c *Cache[T]) isSeen(hash string) bool {
7281
seen, ok := c.hashes.Load(hash)

block/internal/cache/generic_cache_test.go

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"os"
77
"path/filepath"
8-
"sync"
98
"testing"
109
)
1110

@@ -15,32 +14,6 @@ func init() {
1514
gob.Register(&testItem{})
1615
}
1716

18-
// TestCache_ConcurrentOperations exercises concurrent Set, Delete, and Range operations.
19-
func TestCache_ConcurrentOperations(t *testing.T) {
20-
c := NewCache[testItem]()
21-
22-
const N = 2000
23-
var wg sync.WaitGroup
24-
25-
// writers
26-
writer := func(start int) {
27-
defer wg.Done()
28-
for i := start; i < N; i += 2 {
29-
v := &testItem{V: i}
30-
c.setItem(uint64(i), v)
31-
if i%10 == 0 {
32-
// randomly delete some keys
33-
c.deleteItem(uint64(i))
34-
}
35-
}
36-
}
37-
38-
wg.Add(2)
39-
go writer(0)
40-
go writer(1)
41-
wg.Wait()
42-
}
43-
4417
// TestCache_TypeSafety ensures methods gracefully handle invalid underlying types.
4518
func TestCache_TypeSafety(t *testing.T) {
4619
c := NewCache[testItem]()
@@ -120,6 +93,6 @@ func TestCache_LargeDataset(t *testing.T) {
12093
}
12194
// Delete a range in the middle
12295
for i := 5000; i < 10000; i += 2 {
123-
c.deleteItem(uint64(i))
96+
c.getNextItem(uint64(i))
12497
}
12598
}

block/internal/cache/manager.go

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,8 @@ type Manager interface {
5858
NumPendingData() uint64
5959

6060
// Pending events syncing coordination
61+
GetNextPendingEvent(height uint64) *common.DAHeightEvent
6162
SetPendingEvent(height uint64, event *common.DAHeightEvent)
62-
GetPendingEvents() map[uint64]*common.DAHeightEvent
63-
DeletePendingEvent(height uint64)
6463

6564
// Cleanup operations
6665
SaveToDisk() error
@@ -194,23 +193,15 @@ func (m *implementation) NumPendingData() uint64 {
194193
return m.pendingData.NumPendingData()
195194
}
196195

197-
// Pending events operations
196+
// SetPendingEvent sets the event at the specified height.
198197
func (m *implementation) SetPendingEvent(height uint64, event *common.DAHeightEvent) {
199198
m.pendingEventsCache.setItem(height, event)
200199
}
201200

202-
func (m *implementation) GetPendingEvents() map[uint64]*common.DAHeightEvent {
203-
204-
events := make(map[uint64]*common.DAHeightEvent)
205-
m.pendingEventsCache.rangeByHeight(func(height uint64, event *common.DAHeightEvent) bool {
206-
events[height] = event
207-
return true
208-
})
209-
return events
210-
}
211-
212-
func (m *implementation) DeletePendingEvent(height uint64) {
213-
m.pendingEventsCache.deleteItem(height)
201+
// GetNextPendingEvent efficiently retrieves and removes the event at the specified height.
202+
// Returns nil if no event exists at that height.
203+
func (m *implementation) GetNextPendingEvent(height uint64) *common.DAHeightEvent {
204+
return m.pendingEventsCache.getNextItem(height)
214205
}
215206

216207
func (m *implementation) SaveToDisk() error {

block/internal/cache/manager_test.go

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,18 +68,22 @@ func TestManager_PendingEventsCRUD(t *testing.T) {
6868
m.SetPendingEvent(1, evt1)
6969
m.SetPendingEvent(3, evt3)
7070

71-
got := m.GetPendingEvents()
72-
require.Len(t, got, 3)
73-
assert.Equal(t, evt1, got[1])
74-
assert.Equal(t, evt3, got[3])
75-
assert.Equal(t, evt5, got[5])
76-
77-
// delete and re-check
78-
m.DeletePendingEvent(3)
79-
got = m.GetPendingEvents()
80-
require.Len(t, got, 2)
81-
_, ok := got[3]
82-
assert.False(t, ok)
71+
// Test getting specific events
72+
got1 := m.GetNextPendingEvent(1)
73+
require.NotNil(t, got1)
74+
assert.Equal(t, evt1.DaHeight, got1.DaHeight)
75+
76+
got3 := m.GetNextPendingEvent(3)
77+
require.NotNil(t, got3)
78+
assert.Equal(t, evt3.DaHeight, got3.DaHeight)
79+
80+
got5 := m.GetNextPendingEvent(5)
81+
require.NotNil(t, got5)
82+
assert.Equal(t, evt5.DaHeight, got5.DaHeight)
83+
84+
// Events should be removed after GetNextPendingEvent
85+
got1Again := m.GetNextPendingEvent(1)
86+
assert.Nil(t, got1Again)
8387
}
8488

8589
func TestManager_SaveAndLoadFromDisk(t *testing.T) {
@@ -120,9 +124,10 @@ func TestManager_SaveAndLoadFromDisk(t *testing.T) {
120124
_, ok2 := m2.GetDataDAIncluded("D2")
121125
assert.True(t, ok2)
122126

123-
events := m2.GetPendingEvents()
124-
require.Len(t, events, 1)
125-
assert.Equal(t, uint64(2), events[2].Header.Height())
127+
// Verify pending event was loaded
128+
loadedEvent := m2.GetNextPendingEvent(2)
129+
require.NotNil(t, loadedEvent)
130+
assert.Equal(t, uint64(2), loadedEvent.Header.Height())
126131

127132
// directories exist under cfg.RootDir/data/cache/...
128133
base := filepath.Join(cfg.RootDir, "data", "cache")
@@ -131,6 +136,19 @@ func TestManager_SaveAndLoadFromDisk(t *testing.T) {
131136
assert.DirExists(t, filepath.Join(base, "pending_da_events"))
132137
}
133138

139+
func TestManager_GetNextPendingEvent_NonExistent(t *testing.T) {
140+
t.Parallel()
141+
cfg := tempConfig(t)
142+
st := memStore(t)
143+
144+
m, err := NewManager(cfg, st, zerolog.Nop())
145+
require.NoError(t, err)
146+
147+
// Try to get non-existent event
148+
event := m.GetNextPendingEvent(999)
149+
assert.Nil(t, event)
150+
}
151+
134152
func TestPendingHeadersAndData_Flow(t *testing.T) {
135153
t.Parallel()
136154
st := memStore(t)

block/internal/syncing/syncer.go

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -371,25 +371,25 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
371371
// If this is not the next block in sequence, store as pending event
372372
// This check is crucial as trySyncNextBlock simply attempts to sync the next block
373373
if height != currentHeight+1 {
374-
// Create a DAHeightEvent that matches the cache interface
375-
pendingEvent := &common.DAHeightEvent{
376-
Header: event.Header,
377-
Data: event.Data,
378-
DaHeight: event.DaHeight,
379-
HeaderDaIncludedHeight: event.HeaderDaIncludedHeight,
380-
}
381-
s.cache.SetPendingEvent(height, pendingEvent)
374+
s.cache.SetPendingEvent(height, event)
382375
s.logger.Debug().Uint64("height", height).Uint64("current_height", currentHeight).Msg("stored as pending event")
383376
return
384377
}
385378

386379
// Try to sync the next block
387380
if err := s.trySyncNextBlock(event); err != nil {
388381
s.logger.Error().Err(err).Msg("failed to sync next block")
382+
// If the error is not due to an validation error, re-store the event as pending
383+
if !errors.Is(err, errInvalidBlock) {
384+
s.cache.SetPendingEvent(height, event)
385+
}
389386
return
390387
}
391388
}
392389

390+
// errInvalidBlock is returned when a block is failing validation
391+
var errInvalidBlock = errors.New("invalid block")
392+
393393
// trySyncNextBlock attempts to sync the next available block
394394
// the event is always the next block in sequence as processHeightEvent ensures it.
395395
func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
@@ -410,7 +410,7 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
410410
// here only the previous block needs to be applied to proceed to the verification.
411411
// The header validation must be done before applying the block to avoid executing gibberish
412412
if err := s.validateBlock(currentState, header, data); err != nil {
413-
return fmt.Errorf("failed to validate block: %w", err)
413+
return errors.Join(errInvalidBlock, fmt.Errorf("failed to validate block: %w", err))
414414
}
415415

416416
// Mark as DA included
@@ -551,34 +551,30 @@ func (s *Syncer) isHeightFromFutureError(err error) bool {
551551

552552
// processPendingEvents fetches and processes pending events from cache
553553
func (s *Syncer) processPendingEvents() {
554-
pendingEvents := s.cache.GetPendingEvents()
554+
currentHeight, err := s.store.Height(s.ctx)
555+
if err != nil {
556+
s.logger.Error().Err(err).Msg("failed to get current height for pending events")
557+
return
558+
}
555559

556-
for height, event := range pendingEvents {
557-
currentHeight, err := s.store.Height(s.ctx)
558-
if err != nil {
559-
s.logger.Error().Err(err).Msg("failed to get current height for pending events")
560-
continue
560+
// Try to get the next processable event (currentHeight + 1)
561+
nextHeight := currentHeight + 1
562+
if event := s.cache.GetNextPendingEvent(nextHeight); event != nil {
563+
heightEvent := common.DAHeightEvent{
564+
Header: event.Header,
565+
Data: event.Data,
566+
DaHeight: event.DaHeight,
567+
HeaderDaIncludedHeight: event.HeaderDaIncludedHeight,
561568
}
562569

563-
// Only process events for blocks we haven't synced yet
564-
if height > currentHeight {
565-
heightEvent := common.DAHeightEvent{
566-
Header: event.Header,
567-
Data: event.Data,
568-
DaHeight: event.DaHeight,
569-
HeaderDaIncludedHeight: event.HeaderDaIncludedHeight,
570-
}
571-
572-
select {
573-
case s.heightInCh <- heightEvent:
574-
// Remove from pending events once sent
575-
s.cache.DeletePendingEvent(height)
576-
case <-s.ctx.Done():
577-
return
578-
}
579-
} else {
580-
// Clean up events for blocks we've already processed
581-
s.cache.DeletePendingEvent(height)
570+
select {
571+
case s.heightInCh <- heightEvent:
572+
// Event was successfully sent and already removed by GetNextPendingEvent
573+
s.logger.Debug().Uint64("height", nextHeight).Msg("sent pending event to processing")
574+
case <-s.ctx.Done():
575+
s.cache.SetPendingEvent(nextHeight, event)
576+
default:
577+
s.cache.SetPendingEvent(nextHeight, event)
582578
}
583579
}
584580
}

block/internal/syncing/syncer_benchmark_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,11 @@ func BenchmarkSyncerIO(b *testing.B) {
5252
}, 5*time.Second, 50*time.Microsecond)
5353
fixt.s.cancel()
5454

55-
// Ensure clean end-state per iteration
56-
require.Len(b, fixt.s.cache.GetPendingEvents(), 0)
55+
// Ensure clean end-state per iteration - verify no pending events remain
56+
for i := uint64(1); i <= spec.heights; i++ {
57+
event := fixt.s.cache.GetNextPendingEvent(i)
58+
require.Nil(b, event, "expected no pending event at height %d", i)
59+
}
5760
require.Len(b, fixt.s.heightInCh, 0)
5861

5962
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daHeight)

block/internal/syncing/syncer_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,9 @@ func TestSyncer_processPendingEvents(t *testing.T) {
144144
t.Fatal("expected a forwarded pending event")
145145
}
146146

147-
remaining := cm.GetPendingEvents()
148-
assert.Len(t, remaining, 0)
147+
// Verify the event was removed by trying to get it again
148+
remaining := cm.GetNextPendingEvent(2)
149+
assert.Nil(t, remaining)
149150
}
150151

151152
func TestSyncLoopPersistState(t *testing.T) {
@@ -230,8 +231,11 @@ func TestSyncLoopPersistState(t *testing.T) {
230231
daRtrMock.AssertExpectations(t)
231232
p2pHndlMock.AssertExpectations(t)
232233

233-
// and all processed
234-
assert.Len(t, syncerInst1.cache.GetPendingEvents(), 0)
234+
// and all processed - verify no events remain at heights we tested
235+
event1 := syncerInst1.cache.GetNextPendingEvent(1)
236+
assert.Nil(t, event1)
237+
event2 := syncerInst1.cache.GetNextPendingEvent(2)
238+
assert.Nil(t, event2)
235239
assert.Len(t, syncerInst1.heightInCh, 0)
236240

237241
// and when new instance is up on restart

0 commit comments

Comments
 (0)