Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions block/internal/cache/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package cache

import (
"context"
"errors"
"io"
"math/rand/v2"
"testing"

Expand Down Expand Up @@ -98,6 +100,94 @@ func BenchmarkManager_GetPendingData(b *testing.B) {
}
}

func BenchmarkPendingData_Iterator(b *testing.B) {
for _, n := range []int{1_000, 10_000} {
b.Run(benchName(n), func(b *testing.B) {
st := benchSetupStore(b, n, 2, "bench-data-iter")
pd, err := NewPendingData(st, zerolog.Nop())
if err != nil {
b.Fatal(err)
}
ctx := context.Background()
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
iter, err := pd.Iterator(ctx)
if err != nil {
b.Fatal(err)
}
count := 0
for {
val, ok, err := iter.Next()
if errors.Is(err, io.EOF) {
if ok {
b.Fatal("expected ok=false on EOF")
}
break
}
if err != nil {
b.Fatal(err)
}
if !ok {
b.Fatal("expected ok=true before EOF")
}
if len(val.Txs) == 0 {
continue
}
count++
}
if count == 0 {
b.Fatal("unexpected empty iteration result")
}
}
})
}
}

func BenchmarkPendingHeaders_Iterator(b *testing.B) {
for _, n := range []int{1_000, 10_000} {
b.Run(benchName(n), func(b *testing.B) {
st := benchSetupStore(b, n, 1, "bench-headers-iter")
ph, err := NewPendingHeaders(st, zerolog.Nop())
if err != nil {
b.Fatal(err)
}
ctx := context.Background()
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
iter, err := ph.Iterator(ctx)
if err != nil {
b.Fatal(err)
}
count := 0
for {
val, ok, err := iter.Next()
if errors.Is(err, io.EOF) {
if ok {
b.Fatal("expected ok=false on EOF")
}
break
}
if err != nil {
b.Fatal(err)
}
if !ok {
b.Fatal("expected ok=true before EOF")
}
if val == nil {
b.Fatal("unexpected nil header")
}
count++
}
if count == 0 {
b.Fatal("unexpected empty iteration result")
}
}
})
}
}

func BenchmarkManager_PendingEventsSnapshot(b *testing.B) {
st := benchSetupStore(b, 1_000, 1, "bench-events")
m := benchNewManager(b, st)
Expand Down
42 changes: 42 additions & 0 deletions block/internal/cache/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package cache

import (
"context"
"io"
)

type Iterator[T any] struct {
base *pendingBase[T]
ctx context.Context
cursor uint64
limit uint64
exhausted bool
}

func (it *Iterator[T]) Next() (T, bool, error) {
var zero T

if it == nil || it.base == nil {
return zero, false, io.EOF
}
if it.exhausted {
return zero, false, io.EOF
}
if it.cursor > it.limit {
it.exhausted = true
return zero, false, io.EOF
}

val, err := it.base.fetch(it.ctx, it.base.store, it.cursor)
if err != nil {
it.exhausted = true
return zero, false, err
}

it.cursor++
if it.cursor > it.limit {
it.exhausted = true
}

return val, true, nil
}
42 changes: 32 additions & 10 deletions block/internal/cache/pending_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import (
// that need to be published to the DA layer in order. It handles persistence
// of the last submitted height and provides methods for retrieving pending items.
type pendingBase[T any] struct {
logger zerolog.Logger
store store.Store
metaKey string
fetch func(ctx context.Context, store store.Store, height uint64) (T, error)
lastHeight atomic.Uint64
logger zerolog.Logger
store store.Store
metaKey string
fetch func(ctx context.Context, store store.Store, height uint64) (T, error)
lastSubmittedHeight atomic.Uint64
}

// newPendingBase constructs a new pendingBase for a given type.
Expand All @@ -40,7 +40,7 @@ func newPendingBase[T any](store store.Store, logger zerolog.Logger, metaKey str

// getPending returns a sorted slice of pending items of type T.
func (pb *pendingBase[T]) getPending(ctx context.Context) ([]T, error) {
lastSubmitted := pb.lastHeight.Load()
lastSubmitted := pb.lastSubmittedHeight.Load()
height, err := pb.store.Height(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -68,12 +68,12 @@ func (pb *pendingBase[T]) numPending() uint64 {
pb.logger.Error().Err(err).Msg("failed to get height in numPending")
return 0
}
return height - pb.lastHeight.Load()
return height - pb.lastSubmittedHeight.Load()
}

func (pb *pendingBase[T]) setLastSubmittedHeight(ctx context.Context, newLastSubmittedHeight uint64) {
lsh := pb.lastHeight.Load()
if newLastSubmittedHeight > lsh && pb.lastHeight.CompareAndSwap(lsh, newLastSubmittedHeight) {
lsh := pb.lastSubmittedHeight.Load()
if newLastSubmittedHeight > lsh && pb.lastSubmittedHeight.CompareAndSwap(lsh, newLastSubmittedHeight) {
bz := make([]byte, 8)
binary.LittleEndian.PutUint64(bz, newLastSubmittedHeight)
err := pb.store.SetMetadata(ctx, pb.metaKey, bz)
Expand All @@ -98,6 +98,28 @@ func (pb *pendingBase[T]) init() error {
if lsh == 0 {
return nil
}
pb.lastHeight.CompareAndSwap(0, lsh)
pb.lastSubmittedHeight.CompareAndSwap(0, lsh)
return nil
}

func (pb *pendingBase[T]) iterator(ctx context.Context) (Iterator[T], error) {
lastSubmitted := pb.lastSubmittedHeight.Load()
height, err := pb.store.Height(ctx)
if err != nil {
return Iterator[T]{}, err
}
if lastSubmitted == height {
return Iterator[T]{}, nil
}
if lastSubmitted > height {
return Iterator[T]{}, fmt.Errorf("height of last submitted "+
"item (%d) is greater than height of last item (%d)", lastSubmitted, height)
Comment on lines +115 to +116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with a similar error message in getPending (line 52) and for improved readability, it's better to have this error string on a single line.

return Iterator[T]{}, fmt.Errorf("height of last submitted item (%d) is greater than height of last item (%d)", lastSubmitted, height)

}

return Iterator[T]{
base: pb,
ctx: ctx,
cursor: lastSubmitted + 1,
limit: height,
}, nil
}
Loading
Loading