Skip to content

Commit ad137b1

Browse files
committed
fix(store): allow partial initialization
1 parent 62b7b99 commit ad137b1

File tree

2 files changed

+60
-20
lines changed

2 files changed

+60
-20
lines changed

store/store.go

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ func (s *Store[H]) Start(ctx context.Context) error {
129129
default:
130130
}
131131

132-
if err := s.loadHeadAndTail(ctx); err != nil && !errors.Is(err, header.ErrNotFound) {
133-
return err
132+
if err := s.init(ctx); err != nil {
133+
return fmt.Errorf("header/store: initializing: %w", err)
134134
}
135135

136136
go s.flushLoop()
@@ -436,6 +436,7 @@ func (s *Store[H]) flushLoop() {
436436
ctx := context.Background()
437437

438438
flush := func(headers []H) {
439+
log.Debug("flush request", len(headers))
439440
s.ensureInit(headers)
440441
// add headers to the pending and ensure they are accessible
441442
s.pending.Append(headers...)
@@ -560,10 +561,21 @@ func (s *Store[H]) readByKey(ctx context.Context, key datastore.Key) (H, error)
560561

561562
var h header.Hash
562563
if err := h.UnmarshalJSON(b); err != nil {
564+
return zero, fmt.Errorf("unmarshaling header hash at %s key: %w", key, err)
565+
}
566+
567+
hdr, err := s.Get(ctx, h)
568+
if err != nil {
569+
if errors.Is(err, header.ErrNotFound) {
570+
derr := s.ds.Delete(ctx, key)
571+
if derr != nil {
572+
err = errors.Join(err, fmt.Errorf("deleting key %s, header for which was not found: %w", key, derr))
573+
}
574+
}
563575
return zero, err
564576
}
565577

566-
return s.Get(ctx, h)
578+
return hdr, nil
567579
}
568580

569581
func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) {
@@ -664,39 +676,53 @@ func (s *Store[H]) nextTail(ctx context.Context) (tail H, changed bool) {
664676
return tail, changed
665677
}
666678

667-
func (s *Store[H]) loadHeadAndTail(ctx context.Context) error {
679+
// init loads the head and tail headers and sets them on the store.
680+
// allows partial initialization of either tail or head if one of the is not found.
681+
func (s *Store[H]) init(ctx context.Context) error {
668682
head, err := s.readByKey(ctx, headKey)
669-
if err != nil {
670-
return fmt.Errorf("header/store: cannot load headKey: %w", err)
683+
if err != nil && !errors.Is(err, header.ErrNotFound) {
684+
return fmt.Errorf("reading headKey: %w", err)
685+
}
686+
if !head.IsZero() {
687+
s.contiguousHead.Store(&head)
688+
s.heightSub.Init(head.Height())
689+
log.Debugw("initialized head", "height", head.Height())
671690
}
672691

673692
tail, err := s.readByKey(ctx, tailKey)
674-
if err != nil {
675-
return fmt.Errorf("header/store: cannot load tailKey: %w", err)
693+
if err != nil && !errors.Is(err, header.ErrNotFound) {
694+
return fmt.Errorf("reading tailKey: %w", err)
695+
}
696+
if !tail.IsZero() {
697+
s.tailHeader.Store(&tail)
698+
log.Debugw("initialized tail", "height", tail.Height())
676699
}
677700

678-
s.init(head, tail)
679701
return nil
680702
}
681703

704+
// ensureInit initializes the store with the given headers if it is not already initialized.
682705
func (s *Store[H]) ensureInit(headers []H) {
683-
headExist, tailExist := s.contiguousHead.Load() != nil, s.tailHeader.Load() != nil
684-
if len(headers) == 0 || (tailExist && headExist) {
706+
if len(headers) == 0 {
685707
return
686-
} else if tailExist || headExist {
687-
panic("header/store: head and tail must be both present or absent")
688708
}
689709

690-
tail, head := headers[0], headers[len(headers)-1]
691-
s.init(head, tail)
692-
}
710+
if headPtr := s.contiguousHead.Load(); headPtr == nil {
711+
head := headers[len(headers)-1]
712+
if s.contiguousHead.CompareAndSwap(headPtr, &head) {
713+
s.heightSub.Init(head.Height())
714+
log.Debugw("initialized head", "height", head.Height())
715+
}
716+
}
693717

694-
func (s *Store[H]) init(head, tail H) {
695-
s.contiguousHead.Store(&head)
696-
s.heightSub.Init(head.Height())
697-
s.tailHeader.Store(&tail)
718+
if tailPtr := s.tailHeader.Load(); tailPtr == nil {
719+
tail := headers[0]
720+
s.tailHeader.CompareAndSwap(tailPtr, &tail)
721+
log.Debugw("initialized tail", "height", tail.Height())
722+
}
698723
}
699724

725+
// deinit deinitializes the store.
700726
func (s *Store[H]) deinit() {
701727
s.cache.Purge()
702728
s.heightIndex.cache.Purge()

sync/syncer_tail.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ func (s *Syncer[H]) renewTail(ctx context.Context, oldTail, head H) (newTail H,
6161

6262
newTail, err = s.store.Get(ctx, tailHash)
6363
if err == nil {
64+
if oldTail.IsZero() {
65+
// old tail is zero while the requested tail was found locally?
66+
// something may go wrong with store, try to recover it with an append
67+
if err := s.store.Append(ctx, newTail); err != nil {
68+
return newTail, fmt.Errorf("appending tail header %d: %w", newTail.Height(), err)
69+
}
70+
}
6471
return newTail, nil
6572
}
6673
if !errors.Is(err, header.ErrNotFound) {
@@ -86,6 +93,13 @@ func (s *Syncer[H]) renewTail(ctx context.Context, oldTail, head H) (newTail H,
8693
// check if the new tail is below the current head to avoid heightSub blocking
8794
newTail, err = s.store.GetByHeight(ctx, tailHeight)
8895
if err == nil {
96+
if oldTail.IsZero() {
97+
// old tail is zero while the requested tail was found locally?
98+
// something may go wrong with store, try to recover it with an append
99+
if err := s.store.Append(ctx, newTail); err != nil {
100+
return newTail, fmt.Errorf("appending tail header %d: %w", newTail.Height(), err)
101+
}
102+
}
89103
return newTail, nil
90104
}
91105
if !errors.Is(err, header.ErrNotFound) {

0 commit comments

Comments
 (0)