Skip to content

Commit 9394773

Browse files
committed
fix(store): allow partial initialization
1 parent 2f2991f commit 9394773

File tree

2 files changed

+60
-20
lines changed

2 files changed

+60
-20
lines changed

store/store.go

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ func (s *Store[H]) Start(ctx context.Context) error {
129129
default:
130130
}
131131

132-
if err := s.loadHeadAndTail(ctx); err != nil && !errors.Is(err, header.ErrNotFound) {
133-
return err
132+
if err := s.init(ctx); err != nil {
133+
return fmt.Errorf("header/store: initializing: %w", err)
134134
}
135135

136136
go s.flushLoop()
@@ -433,6 +433,7 @@ func (s *Store[H]) flushLoop() {
433433
ctx := context.Background()
434434

435435
flush := func(headers []H) {
436+
log.Debug("flush request", len(headers))
436437
s.ensureInit(headers)
437438
// add headers to the pending and ensure they are accessible
438439
s.pending.Append(headers...)
@@ -557,10 +558,21 @@ func (s *Store[H]) readByKey(ctx context.Context, key datastore.Key) (H, error)
557558

558559
var h header.Hash
559560
if err := h.UnmarshalJSON(b); err != nil {
561+
return zero, fmt.Errorf("unmarshaling header hash at %s key: %w", key, err)
562+
}
563+
564+
hdr, err := s.Get(ctx, h)
565+
if err != nil {
566+
if errors.Is(err, header.ErrNotFound) {
567+
derr := s.ds.Delete(ctx, key)
568+
if derr != nil {
569+
err = errors.Join(err, fmt.Errorf("deleting key %s, header for which was not found: %w", key, derr))
570+
}
571+
}
560572
return zero, err
561573
}
562574

563-
return s.Get(ctx, h)
575+
return hdr, nil
564576
}
565577

566578
func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) {
@@ -661,39 +673,53 @@ func (s *Store[H]) nextTail(ctx context.Context) (tail H, changed bool) {
661673
return tail, changed
662674
}
663675

664-
func (s *Store[H]) loadHeadAndTail(ctx context.Context) error {
676+
// init loads the head and tail headers and sets them on the store.
677+
// allows partial initialization of either tail or head if one of the is not found.
678+
func (s *Store[H]) init(ctx context.Context) error {
665679
head, err := s.readByKey(ctx, headKey)
666-
if err != nil {
667-
return fmt.Errorf("header/store: cannot load headKey: %w", err)
680+
if err != nil && !errors.Is(err, header.ErrNotFound) {
681+
return fmt.Errorf("reading headKey: %w", err)
682+
}
683+
if !head.IsZero() {
684+
s.contiguousHead.Store(&head)
685+
s.heightSub.Init(head.Height())
686+
log.Debugw("initialized head", "height", head.Height())
668687
}
669688

670689
tail, err := s.readByKey(ctx, tailKey)
671-
if err != nil {
672-
return fmt.Errorf("header/store: cannot load tailKey: %w", err)
690+
if err != nil && !errors.Is(err, header.ErrNotFound) {
691+
return fmt.Errorf("reading tailKey: %w", err)
692+
}
693+
if !tail.IsZero() {
694+
s.tailHeader.Store(&tail)
695+
log.Debugw("initialized tail", "height", tail.Height())
673696
}
674697

675-
s.init(head, tail)
676698
return nil
677699
}
678700

701+
// ensureInit initializes the store with the given headers if it is not already initialized.
679702
func (s *Store[H]) ensureInit(headers []H) {
680-
headExist, tailExist := s.contiguousHead.Load() != nil, s.tailHeader.Load() != nil
681-
if len(headers) == 0 || (tailExist && headExist) {
703+
if len(headers) == 0 {
682704
return
683-
} else if tailExist || headExist {
684-
panic("header/store: head and tail must be both present or absent")
685705
}
686706

687-
tail, head := headers[0], headers[len(headers)-1]
688-
s.init(head, tail)
689-
}
707+
if headPtr := s.contiguousHead.Load(); headPtr == nil {
708+
head := headers[len(headers)-1]
709+
if s.contiguousHead.CompareAndSwap(headPtr, &head) {
710+
s.heightSub.Init(head.Height())
711+
log.Debugw("initialized head", "height", head.Height())
712+
}
713+
}
690714

691-
func (s *Store[H]) init(head, tail H) {
692-
s.contiguousHead.Store(&head)
693-
s.heightSub.Init(head.Height())
694-
s.tailHeader.Store(&tail)
715+
if tailPtr := s.tailHeader.Load(); tailPtr == nil {
716+
tail := headers[0]
717+
s.tailHeader.CompareAndSwap(tailPtr, &tail)
718+
log.Debugw("initialized tail", "height", tail.Height())
719+
}
695720
}
696721

722+
// deinit deinitializes the store.
697723
func (s *Store[H]) deinit() {
698724
s.cache.Purge()
699725
s.heightIndex.cache.Purge()

sync/syncer_tail.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ func (s *Syncer[H]) renewTail(ctx context.Context, oldTail, head H) (newTail H,
6161

6262
newTail, err = s.store.Get(ctx, tailHash)
6363
if err == nil {
64+
if oldTail.IsZero() {
65+
// old tail is zero while the requested tail was found locally?
66+
// something may go wrong with store, try to recover it with an append
67+
if err := s.store.Append(ctx, newTail); err != nil {
68+
return newTail, fmt.Errorf("appending tail header %d: %w", newTail.Height(), err)
69+
}
70+
}
6471
return newTail, nil
6572
}
6673
if !errors.Is(err, header.ErrNotFound) {
@@ -86,6 +93,13 @@ func (s *Syncer[H]) renewTail(ctx context.Context, oldTail, head H) (newTail H,
8693
// check if the new tail is below the current head to avoid heightSub blocking
8794
newTail, err = s.store.GetByHeight(ctx, tailHeight)
8895
if err == nil {
96+
if oldTail.IsZero() {
97+
// old tail is zero while the requested tail was found locally?
98+
// something may go wrong with store, try to recover it with an append
99+
if err := s.store.Append(ctx, newTail); err != nil {
100+
return newTail, fmt.Errorf("appending tail header %d: %w", newTail.Height(), err)
101+
}
102+
}
89103
return newTail, nil
90104
}
91105
if !errors.Is(err, header.ErrNotFound) {

0 commit comments

Comments
 (0)