Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 101 additions & 7 deletions block/internal/cache/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package cache

import (
"context"
"errors"
"io"
"math/rand/v2"
"testing"

Expand All @@ -17,14 +19,18 @@ import (
goos: darwin
goarch: arm64
pkg: github.com/evstack/ev-node/block/internal/cache
cpu: Apple M1 Pro
BenchmarkManager_GetPendingHeaders/N=1000-10 278 3922717 ns/op 5064666 B/op 70818 allocs/op
BenchmarkManager_GetPendingHeaders/N=10000-10 28 40704543 ns/op 50639803 B/op 709864 allocs/op
BenchmarkManager_GetPendingData/N=1000-10 279 4258291 ns/op 5869716 B/op 73824 allocs/op
BenchmarkManager_GetPendingData/N=10000-10 26 45428974 ns/op 58719067 B/op 739926 allocs/op
BenchmarkManager_PendingEventsSnapshot-10 336 3251530 ns/op 2365497 B/op 285 allocs/op
cpu: Apple M4
BenchmarkManager_GetPendingHeaders/N=1000-10 194 5763894 ns/op 3552719 B/op 46916 allocs/op
BenchmarkManager_GetPendingHeaders/N=10000-10 21 53211540 ns/op 35524178 B/op 469941 allocs/op
BenchmarkManager_GetPendingData/N=1000-10 100 10699778 ns/op 5868643 B/op 73823 allocs/op
BenchmarkManager_GetPendingData/N=10000-10 13 138766532 ns/op 58667709 B/op 739950 allocs/op
BenchmarkPendingData_Iterator/N=1000-10 91 12462366 ns/op 5849299 B/op 72828 allocs/op
BenchmarkPendingData_Iterator/N=10000-10 12 111749198 ns/op 58643530 B/op 729975 allocs/op
BenchmarkPendingHeaders_Iterator/N=1000-10 204 6132184 ns/op 3648970 B/op 46924 allocs/op
BenchmarkPendingHeaders_Iterator/N=10000-10 25 53847862 ns/op 36485675 B/op 470017 allocs/op
BenchmarkManager_PendingEventsSnapshot-10 30719360 35.37 ns/op 0 B/op 0 allocs/op
PASS
ok github.com/evstack/ev-node/block/internal/cache 25.834s
ok github.com/evstack/ev-node/block/internal/cache 23.693s
*/

func benchSetupStore(b *testing.B, n int, txsPer int, chainID string) store.Store {
Expand Down Expand Up @@ -98,6 +104,94 @@ func BenchmarkManager_GetPendingData(b *testing.B) {
}
}

func BenchmarkPendingData_Iterator(b *testing.B) {
for _, n := range []int{1_000, 10_000} {
b.Run(benchName(n), func(b *testing.B) {
st := benchSetupStore(b, n, 2, "bench-data-iter")
pd, err := NewPendingData(st, zerolog.Nop())
if err != nil {
b.Fatal(err)
}
ctx := context.Background()
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
iter, err := pd.Iterator(ctx)
if err != nil {
b.Fatal(err)
}
count := 0
for {
val, ok, err := iter.Next()
if errors.Is(err, io.EOF) {
if ok {
b.Fatal("expected ok=false on EOF")
}
break
}
if err != nil {
b.Fatal(err)
}
if !ok {
b.Fatal("expected ok=true before EOF")
}
if len(val.Txs) == 0 {
continue
}
count++
}
if count == 0 {
b.Fatal("unexpected empty iteration result")
}
}
})
}
}

func BenchmarkPendingHeaders_Iterator(b *testing.B) {
for _, n := range []int{1_000, 10_000} {
b.Run(benchName(n), func(b *testing.B) {
st := benchSetupStore(b, n, 1, "bench-headers-iter")
ph, err := NewPendingHeaders(st, zerolog.Nop())
if err != nil {
b.Fatal(err)
}
ctx := context.Background()
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
iter, err := ph.Iterator(ctx)
if err != nil {
b.Fatal(err)
}
count := 0
for {
val, ok, err := iter.Next()
if errors.Is(err, io.EOF) {
if ok {
b.Fatal("expected ok=false on EOF")
}
break
}
if err != nil {
b.Fatal(err)
}
if !ok {
b.Fatal("expected ok=true before EOF")
}
if val == nil {
b.Fatal("unexpected nil header")
}
count++
}
if count == 0 {
b.Fatal("unexpected empty iteration result")
}
}
})
}
}

func BenchmarkManager_PendingEventsSnapshot(b *testing.B) {
st := benchSetupStore(b, 1_000, 1, "bench-events")
m := benchNewManager(b, st)
Expand Down
42 changes: 42 additions & 0 deletions block/internal/cache/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package cache

import (
"context"
"io"
)

type Iterator[T any] struct {
base *pendingBase[T]
ctx context.Context
cursor uint64
limit uint64
exhausted bool
}

func (it *Iterator[T]) Next() (T, bool, error) {
var zero T

if it == nil || it.base == nil {
return zero, false, io.EOF
}
if it.exhausted {
return zero, false, io.EOF
}
if it.cursor > it.limit {
it.exhausted = true
return zero, false, io.EOF
}

val, err := it.base.fetch(it.ctx, it.base.store, it.cursor)
if err != nil {
it.exhausted = true
return zero, false, err
}

it.cursor++
if it.cursor > it.limit {
it.exhausted = true
}

return val, true, nil
}
42 changes: 32 additions & 10 deletions block/internal/cache/pending_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import (
// that need to be published to the DA layer in order. It handles persistence
// of the last submitted height and provides methods for retrieving pending items.
type pendingBase[T any] struct {
logger zerolog.Logger
store store.Store
metaKey string
fetch func(ctx context.Context, store store.Store, height uint64) (T, error)
lastHeight atomic.Uint64
logger zerolog.Logger
store store.Store
metaKey string
fetch func(ctx context.Context, store store.Store, height uint64) (T, error)
lastSubmittedHeight atomic.Uint64
}

// newPendingBase constructs a new pendingBase for a given type.
Expand All @@ -40,7 +40,7 @@ func newPendingBase[T any](store store.Store, logger zerolog.Logger, metaKey str

// getPending returns a sorted slice of pending items of type T.
func (pb *pendingBase[T]) getPending(ctx context.Context) ([]T, error) {
lastSubmitted := pb.lastHeight.Load()
lastSubmitted := pb.lastSubmittedHeight.Load()
height, err := pb.store.Height(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -68,12 +68,12 @@ func (pb *pendingBase[T]) numPending() uint64 {
pb.logger.Error().Err(err).Msg("failed to get height in numPending")
return 0
}
return height - pb.lastHeight.Load()
return height - pb.lastSubmittedHeight.Load()
}

func (pb *pendingBase[T]) setLastSubmittedHeight(ctx context.Context, newLastSubmittedHeight uint64) {
lsh := pb.lastHeight.Load()
if newLastSubmittedHeight > lsh && pb.lastHeight.CompareAndSwap(lsh, newLastSubmittedHeight) {
lsh := pb.lastSubmittedHeight.Load()
if newLastSubmittedHeight > lsh && pb.lastSubmittedHeight.CompareAndSwap(lsh, newLastSubmittedHeight) {
bz := make([]byte, 8)
binary.LittleEndian.PutUint64(bz, newLastSubmittedHeight)
err := pb.store.SetMetadata(ctx, pb.metaKey, bz)
Expand All @@ -98,6 +98,28 @@ func (pb *pendingBase[T]) init() error {
if lsh == 0 {
return nil
}
pb.lastHeight.CompareAndSwap(0, lsh)
pb.lastSubmittedHeight.CompareAndSwap(0, lsh)
return nil
}

func (pb *pendingBase[T]) iterator(ctx context.Context) (Iterator[T], error) {
lastSubmitted := pb.lastSubmittedHeight.Load()
height, err := pb.store.Height(ctx)
if err != nil {
return Iterator[T]{}, err
}
if lastSubmitted == height {
return Iterator[T]{}, nil
}
if lastSubmitted > height {
return Iterator[T]{}, fmt.Errorf("height of last submitted "+
"item (%d) is greater than height of last item (%d)", lastSubmitted, height)
Comment on lines +115 to +116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with a similar error message in getPending (line 52) and for improved readability, it's better to have this error string on a single line.

return Iterator[T]{}, fmt.Errorf("height of last submitted item (%d) is greater than height of last item (%d)", lastSubmitted, height)

}

return Iterator[T]{
base: pb,
ctx: ctx,
cursor: lastSubmitted + 1,
limit: height,
}, nil
}
Loading
Loading