diff --git a/block/internal/cache/bench_test.go b/block/internal/cache/bench_test.go index 22d56c6cd..bfc7036ad 100644 --- a/block/internal/cache/bench_test.go +++ b/block/internal/cache/bench_test.go @@ -2,6 +2,8 @@ package cache import ( "context" + "errors" + "io" "math/rand/v2" "testing" @@ -17,14 +19,18 @@ import ( goos: darwin goarch: arm64 pkg: github.com/evstack/ev-node/block/internal/cache -cpu: Apple M1 Pro -BenchmarkManager_GetPendingHeaders/N=1000-10 278 3922717 ns/op 5064666 B/op 70818 allocs/op -BenchmarkManager_GetPendingHeaders/N=10000-10 28 40704543 ns/op 50639803 B/op 709864 allocs/op -BenchmarkManager_GetPendingData/N=1000-10 279 4258291 ns/op 5869716 B/op 73824 allocs/op -BenchmarkManager_GetPendingData/N=10000-10 26 45428974 ns/op 58719067 B/op 739926 allocs/op -BenchmarkManager_PendingEventsSnapshot-10 336 3251530 ns/op 2365497 B/op 285 allocs/op +cpu: Apple M4 +BenchmarkManager_GetPendingHeaders/N=1000-10 194 5763894 ns/op 3552719 B/op 46916 allocs/op +BenchmarkManager_GetPendingHeaders/N=10000-10 21 53211540 ns/op 35524178 B/op 469941 allocs/op +BenchmarkManager_GetPendingData/N=1000-10 100 10699778 ns/op 5868643 B/op 73823 allocs/op +BenchmarkManager_GetPendingData/N=10000-10 13 138766532 ns/op 58667709 B/op 739950 allocs/op +BenchmarkPendingData_Iterator/N=1000-10 91 12462366 ns/op 5849299 B/op 72828 allocs/op +BenchmarkPendingData_Iterator/N=10000-10 12 111749198 ns/op 58643530 B/op 729975 allocs/op +BenchmarkPendingHeaders_Iterator/N=1000-10 204 6132184 ns/op 3648970 B/op 46924 allocs/op +BenchmarkPendingHeaders_Iterator/N=10000-10 25 53847862 ns/op 36485675 B/op 470017 allocs/op +BenchmarkManager_PendingEventsSnapshot-10 30719360 35.37 ns/op 0 B/op 0 allocs/op PASS -ok github.com/evstack/ev-node/block/internal/cache 25.834s +ok github.com/evstack/ev-node/block/internal/cache 23.693s */ func benchSetupStore(b *testing.B, n int, txsPer int, chainID string) store.Store { @@ -98,6 +104,94 @@ func BenchmarkManager_GetPendingData(b *testing.B) { } } +func BenchmarkPendingData_Iterator(b *testing.B) { + for _, n := range []int{1_000, 10_000} { + b.Run(benchName(n), func(b *testing.B) { + st := benchSetupStore(b, n, 2, "bench-data-iter") + pd, err := NewPendingData(st, zerolog.Nop()) + if err != nil { + b.Fatal(err) + } + ctx := context.Background() + b.ReportAllocs() + b.ResetTimer() + for b.Loop() { + iter, err := pd.Iterator(ctx) + if err != nil { + b.Fatal(err) + } + count := 0 + for { + val, ok, err := iter.Next() + if errors.Is(err, io.EOF) { + if ok { + b.Fatal("expected ok=false on EOF") + } + break + } + if err != nil { + b.Fatal(err) + } + if !ok { + b.Fatal("expected ok=true before EOF") + } + if len(val.Txs) == 0 { + continue + } + count++ + } + if count == 0 { + b.Fatal("unexpected empty iteration result") + } + } + }) + } +} + +func BenchmarkPendingHeaders_Iterator(b *testing.B) { + for _, n := range []int{1_000, 10_000} { + b.Run(benchName(n), func(b *testing.B) { + st := benchSetupStore(b, n, 1, "bench-headers-iter") + ph, err := NewPendingHeaders(st, zerolog.Nop()) + if err != nil { + b.Fatal(err) + } + ctx := context.Background() + b.ReportAllocs() + b.ResetTimer() + for b.Loop() { + iter, err := ph.Iterator(ctx) + if err != nil { + b.Fatal(err) + } + count := 0 + for { + val, ok, err := iter.Next() + if errors.Is(err, io.EOF) { + if ok { + b.Fatal("expected ok=false on EOF") + } + break + } + if err != nil { + b.Fatal(err) + } + if !ok { + b.Fatal("expected ok=true before EOF") + } + if val == nil { + b.Fatal("unexpected nil header") + } + count++ + } + if count == 0 { + b.Fatal("unexpected empty iteration result") + } + } + }) + } +} + func BenchmarkManager_PendingEventsSnapshot(b *testing.B) { st := benchSetupStore(b, 1_000, 1, "bench-events") m := benchNewManager(b, st) diff --git a/block/internal/cache/iterator.go b/block/internal/cache/iterator.go new file mode 100644 index 000000000..0cd61d1b1 --- /dev/null +++ b/block/internal/cache/iterator.go @@ -0,0 +1,42 @@ +package cache + +import ( + "context" + "io" +) + +type Iterator[T any] struct { + base *pendingBase[T] + ctx context.Context + cursor uint64 + limit uint64 + exhausted bool +} + +func (it *Iterator[T]) Next() (T, bool, error) { + var zero T + + if it == nil || it.base == nil { + return zero, false, io.EOF + } + if it.exhausted { + return zero, false, io.EOF + } + if it.cursor > it.limit { + it.exhausted = true + return zero, false, io.EOF + } + + val, err := it.base.fetch(it.ctx, it.base.store, it.cursor) + if err != nil { + it.exhausted = true + return zero, false, err + } + + it.cursor++ + if it.cursor > it.limit { + it.exhausted = true + } + + return val, true, nil +} diff --git a/block/internal/cache/pending_base.go b/block/internal/cache/pending_base.go index ca36e999f..b58c7dc47 100644 --- a/block/internal/cache/pending_base.go +++ b/block/internal/cache/pending_base.go @@ -17,11 +17,11 @@ import ( // that need to be published to the DA layer in order. It handles persistence // of the last submitted height and provides methods for retrieving pending items. type pendingBase[T any] struct { - logger zerolog.Logger - store store.Store - metaKey string - fetch func(ctx context.Context, store store.Store, height uint64) (T, error) - lastHeight atomic.Uint64 + logger zerolog.Logger + store store.Store + metaKey string + fetch func(ctx context.Context, store store.Store, height uint64) (T, error) + lastSubmittedHeight atomic.Uint64 } // newPendingBase constructs a new pendingBase for a given type. @@ -40,7 +40,7 @@ func newPendingBase[T any](store store.Store, logger zerolog.Logger, metaKey str // getPending returns a sorted slice of pending items of type T. func (pb *pendingBase[T]) getPending(ctx context.Context) ([]T, error) { - lastSubmitted := pb.lastHeight.Load() + lastSubmitted := pb.lastSubmittedHeight.Load() height, err := pb.store.Height(ctx) if err != nil { return nil, err @@ -68,12 +68,12 @@ func (pb *pendingBase[T]) numPending() uint64 { pb.logger.Error().Err(err).Msg("failed to get height in numPending") return 0 } - return height - pb.lastHeight.Load() + return height - pb.lastSubmittedHeight.Load() } func (pb *pendingBase[T]) setLastSubmittedHeight(ctx context.Context, newLastSubmittedHeight uint64) { - lsh := pb.lastHeight.Load() - if newLastSubmittedHeight > lsh && pb.lastHeight.CompareAndSwap(lsh, newLastSubmittedHeight) { + lsh := pb.lastSubmittedHeight.Load() + if newLastSubmittedHeight > lsh && pb.lastSubmittedHeight.CompareAndSwap(lsh, newLastSubmittedHeight) { bz := make([]byte, 8) binary.LittleEndian.PutUint64(bz, newLastSubmittedHeight) err := pb.store.SetMetadata(ctx, pb.metaKey, bz) @@ -98,6 +98,28 @@ func (pb *pendingBase[T]) init() error { if lsh == 0 { return nil } - pb.lastHeight.CompareAndSwap(0, lsh) + pb.lastSubmittedHeight.CompareAndSwap(0, lsh) return nil } + +func (pb *pendingBase[T]) iterator(ctx context.Context) (Iterator[T], error) { + lastSubmitted := pb.lastSubmittedHeight.Load() + height, err := pb.store.Height(ctx) + if err != nil { + return Iterator[T]{}, err + } + if lastSubmitted == height { + return Iterator[T]{}, nil + } + if lastSubmitted > height { + return Iterator[T]{}, fmt.Errorf("height of last submitted "+ + "item (%d) is greater than height of last item (%d)", lastSubmitted, height) + } + + return Iterator[T]{ + base: pb, + ctx: ctx, + cursor: lastSubmitted + 1, + limit: height, + }, nil +} diff --git a/block/internal/cache/pending_base_test.go b/block/internal/cache/pending_base_test.go index e73e1110e..e8d7c2b54 100644 --- a/block/internal/cache/pending_base_test.go +++ b/block/internal/cache/pending_base_test.go @@ -3,6 +3,8 @@ package cache import ( "context" "encoding/binary" + "errors" + "io" "testing" "github.com/rs/zerolog" @@ -10,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/evstack/ev-node/pkg/store" + "github.com/evstack/ev-node/types" ) func TestPendingBase_ErrorConditions(t *testing.T) { @@ -75,3 +78,176 @@ func TestPendingBase_PersistLastSubmitted(t *testing.T) { require.NoError(t, err) assert.Equal(t, raw, raw2) } + +type someValue struct { + height uint64 +} + +func TestPendingBaseIterator_EmitsAllItemsInOrder(t *testing.T) { + t.Parallel() + ctx := context.Background() + dsKV, err := store.NewDefaultInMemoryKVStore() + require.NoError(t, err) + st := store.New(dsKV) + require.NoError(t, st.SetHeight(ctx, 4)) + + items := map[uint64]*someValue{ + 2: {height: 2}, + 3: {height: 3}, + 4: {height: 4}, + } + + pb := &pendingBase[*someValue]{ + logger: zerolog.Nop(), + store: st, + metaKey: "Iterator-order", + fetch: func(ctx context.Context, _ store.Store, height uint64) (*someValue, error) { + if height > 4 { + return nil, assert.AnError + } + + return items[height], nil + }, + } + pb.lastSubmittedHeight.Store(1) + + iter, err := pb.iterator(ctx) + require.NoError(t, err) + + var heights []uint64 + for { + val, ok, err := iter.Next() + if errors.Is(err, io.EOF) { + require.False(t, ok) + break + } + require.NoError(t, err) + require.True(t, ok) + heights = append(heights, val.height) + } + + assert.Equal(t, []uint64{2, 3, 4}, heights) +} + +func TestPendingBaseIterator_StopsOnFetchError(t *testing.T) { + t.Parallel() + ctx := context.Background() + dsKV, err := store.NewDefaultInMemoryKVStore() + require.NoError(t, err) + st := store.New(dsKV) + require.NoError(t, st.SetHeight(ctx, 3)) + + pb := &pendingBase[*someValue]{ + logger: zerolog.Nop(), + store: st, + metaKey: "Iterator-error", + fetch: func(ctx context.Context, _ store.Store, height uint64) (*someValue, error) { + if height == 3 { + return nil, assert.AnError + } + return &someValue{height: height}, nil + }, + } + pb.lastSubmittedHeight.Store(1) + + iter, err := pb.iterator(ctx) + require.NoError(t, err) + + first, ok, err := iter.Next() + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, uint64(2), first.height) + + _, _, err = iter.Next() + require.ErrorIs(t, err, assert.AnError) + + val, ok, err := iter.Next() + require.ErrorIs(t, err, io.EOF) + require.False(t, ok) + assert.Nil(t, val) +} + +func TestPendingHeadersIterator(t *testing.T) { + t.Parallel() + ctx := context.Background() + dsKV, err := store.NewDefaultInMemoryKVStore() + require.NoError(t, err) + st := store.New(dsKV) + require.NoError(t, st.SetHeight(ctx, 4)) + + items := map[uint64]*types.SignedHeader{ + 2: {}, + 3: {}, + 4: {}, + } + + base := &pendingBase[*types.SignedHeader]{ + logger: zerolog.Nop(), + store: st, + metaKey: "pending-headers-iterator", + fetch: func(_ context.Context, _ store.Store, height uint64) (*types.SignedHeader, error) { + return items[height], nil + }, + } + base.lastSubmittedHeight.Store(1) + + ph := &PendingHeaders{base: base} + iter, err := ph.Iterator(ctx) + require.NoError(t, err) + + var got []*types.SignedHeader + for { + val, ok, err := iter.Next() + if errors.Is(err, io.EOF) { + require.False(t, ok) + break + } + require.NoError(t, err) + require.True(t, ok) + got = append(got, val) + } + + assert.Equal(t, []*types.SignedHeader{items[2], items[3], items[4]}, got) +} + +func TestPendingDataIterator(t *testing.T) { + t.Parallel() + ctx := context.Background() + dsKV, err := store.NewDefaultInMemoryKVStore() + require.NoError(t, err) + st := store.New(dsKV) + require.NoError(t, st.SetHeight(ctx, 3)) + + items := map[uint64]*types.Data{ + 2: {Metadata: &types.Metadata{Height: 2}}, + 3: {Metadata: &types.Metadata{Height: 3}}, + } + + base := &pendingBase[*types.Data]{ + logger: zerolog.Nop(), + store: st, + metaKey: "pending-data-iterator", + fetch: func(_ context.Context, _ store.Store, height uint64) (*types.Data, error) { + return items[height], nil + }, + } + base.lastSubmittedHeight.Store(1) + + pd := &PendingData{base: base} + iter, err := pd.Iterator(ctx) + require.NoError(t, err) + + var got []*types.Data + for { + val, ok, err := iter.Next() + if errors.Is(err, io.EOF) { + require.False(t, ok) + break + } + require.NoError(t, err) + require.True(t, ok) + got = append(got, val) + } + + assert.Equal(t, []*types.Data{items[2], items[3]}, got) +} diff --git a/block/internal/cache/pending_data.go b/block/internal/cache/pending_data.go index 6f75eb54a..8bb654af9 100644 --- a/block/internal/cache/pending_data.go +++ b/block/internal/cache/pending_data.go @@ -42,10 +42,6 @@ func NewPendingData(store store.Store, logger zerolog.Logger) (*PendingData, err return &PendingData{base: base}, nil } -func (pd *PendingData) init() error { - return pd.base.init() -} - // GetPendingData returns a sorted slice of pending Data. func (pd *PendingData) GetPendingData(ctx context.Context) ([]*types.Data, error) { return pd.base.getPending(ctx) @@ -58,3 +54,8 @@ func (pd *PendingData) NumPendingData() uint64 { func (pd *PendingData) SetLastSubmittedDataHeight(ctx context.Context, newLastSubmittedDataHeight uint64) { pd.base.setLastSubmittedHeight(ctx, newLastSubmittedDataHeight) } + +// Iterator returns an Iterator that walks pending data in ascending height order. +func (pd *PendingData) Iterator(ctx context.Context) (Iterator[*types.Data], error) { + return pd.base.iterator(ctx) +} diff --git a/block/internal/cache/pending_headers.go b/block/internal/cache/pending_headers.go index 446efa42d..e6a52eff5 100644 --- a/block/internal/cache/pending_headers.go +++ b/block/internal/cache/pending_headers.go @@ -52,6 +52,7 @@ func (ph *PendingHeaders) SetLastSubmittedHeaderHeight(ctx context.Context, newL ph.base.setLastSubmittedHeight(ctx, newLastSubmittedHeaderHeight) } -func (ph *PendingHeaders) init() error { - return ph.base.init() +// Iterator returns an iterator that walks pending headers in ascending height order. +func (ph *PendingHeaders) Iterator(ctx context.Context) (Iterator[*types.SignedHeader], error) { + return ph.base.iterator(ctx) }