Skip to content

Commit 5776877

Browse files
authored
fix(store): stop long receding/advancing on Stop (#326)
Ensures Store stops receding/advancing if the take long time. Apparently, badger, pebble, and any other major datastore implementations do not handle contexts, so we need to do it ourselves. Also adds better debug logging coverage
1 parent 27b8dfe commit 5776877

File tree

2 files changed

+46
-8
lines changed

2 files changed

+46
-8
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ require (
1717
go.opentelemetry.io/otel v1.36.0
1818
go.opentelemetry.io/otel/metric v1.36.0
1919
go.opentelemetry.io/otel/trace v1.36.0
20+
go.uber.org/zap v1.27.0
2021
golang.org/x/crypto v0.39.0
2122
)
2223

@@ -101,7 +102,6 @@ require (
101102
go.uber.org/automaxprocs v1.6.0 // indirect
102103
go.uber.org/mock v0.5.2 // indirect
103104
go.uber.org/multierr v1.11.0 // indirect
104-
go.uber.org/zap v1.27.0 // indirect
105105
golang.org/x/exp v0.0.0-20250606033433-dcc06ee1d476 // indirect
106106
golang.org/x/mod v0.25.0 // indirect
107107
golang.org/x/net v0.41.0 // indirect

store/store.go

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/ipfs/go-datastore"
1313
"github.com/ipfs/go-datastore/namespace"
1414
logging "github.com/ipfs/go-log/v2"
15+
"go.uber.org/zap/zapcore"
1516

1617
"github.com/celestiaorg/go-header"
1718
)
@@ -58,6 +59,7 @@ type Store[H header.Header[H]] struct {
5859
pending *batch[H]
5960
// syncCh is a channel used to synchronize writes
6061
syncCh chan chan struct{}
62+
cancel context.CancelFunc
6163

6264
onDeleteMu sync.Mutex
6365
onDelete []func(context.Context, uint64) error
@@ -132,7 +134,9 @@ func (s *Store[H]) Start(ctx context.Context) error {
132134
return err
133135
}
134136

135-
go s.flushLoop()
137+
ctx, cancel := context.WithCancel(context.Background())
138+
s.cancel = cancel
139+
go s.flushLoop(ctx)
136140
return nil
137141
}
138142

@@ -430,9 +434,8 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
430434
// This way writes are controlled and manageable from one place allowing
431435
// (1) Appends not to be blocked on long disk IO writes and underlying DB compactions
432436
// (2) Batching header writes
433-
func (s *Store[H]) flushLoop() {
437+
func (s *Store[H]) flushLoop(ctx context.Context) {
434438
defer close(s.writesDn)
435-
ctx := context.Background()
436439

437440
flush := func(headers []H) {
438441
s.ensureInit(headers)
@@ -608,37 +611,72 @@ func (s *Store[H]) recedeTail(ctx context.Context) {
608611
func (s *Store[H]) nextHead(ctx context.Context) (head H, changed bool) {
609612
head, err := s.Head(ctx)
610613
if err != nil {
611-
log.Errorw("cannot load head", "err", err)
614+
log.Errorw("cannot load head while advancing", "err", err)
612615
return head, false
613616
}
614617

615-
for {
618+
for ctx.Err() == nil {
616619
h, err := s.getByHeight(ctx, head.Height()+1)
617620
if err != nil {
621+
log.Debugw("next head error", "current", head.Height(), "err", err)
618622
return head, changed
619623
}
624+
625+
if !changed && log.Level() == zapcore.DebugLevel {
626+
now := time.Now()
627+
log.Debugw("advancing head", "start_height", head.Height())
628+
defer func() {
629+
log.Debugw(
630+
"finished advancing head",
631+
"end_height",
632+
head.Height(),
633+
"took(s)",
634+
time.Since(now),
635+
)
636+
}()
637+
}
638+
620639
head = h
621640
changed = true
622641
}
642+
643+
return head, changed
623644
}
624645

625646
// nextTail finds the new contiguous Tail by iterating the current Tail down until the older height Tail is found.
626647
// Returns true if the older one was found.
627648
func (s *Store[H]) nextTail(ctx context.Context) (tail H, changed bool) {
628649
tail, err := s.Tail(ctx)
629650
if err != nil {
630-
log.Errorw("cannot load tail", "err", err)
651+
log.Errorw("cannot load tail while receding", "err", err)
631652
return tail, false
632653
}
633654

634-
for {
655+
for ctx.Err() == nil {
635656
h, err := s.getByHeight(ctx, tail.Height()-1)
636657
if err != nil {
637658
return tail, changed
638659
}
660+
661+
if !changed && log.Level() == zapcore.DebugLevel {
662+
now := time.Now()
663+
log.Debugw("receding tail", "start_height", tail.Height())
664+
defer func() {
665+
log.Debugw(
666+
"finished receding tail",
667+
"end_height",
668+
tail.Height(),
669+
"took(s)",
670+
time.Since(now),
671+
)
672+
}()
673+
}
674+
639675
tail = h
640676
changed = true
641677
}
678+
679+
return tail, changed
642680
}
643681

644682
func (s *Store[H]) loadHeadAndTail(ctx context.Context) error {

0 commit comments

Comments
 (0)