Skip to content

Commit b914cfe

Browse files
feat(store)!: do better init (#274)
Fixes #243 - Delete `Init` method and `Init` helpers func - Load tail and head on `store.Start` in memory - On `store.Append` check if tail and head are loaded. - If not, initialize store with those pointers --------- Co-authored-by: Hlib Kanunnikov <[email protected]>
1 parent 6b9671f commit b914cfe

File tree

8 files changed

+127
-204
lines changed

8 files changed

+127
-204
lines changed

interface.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ var (
5050
// ErrNotFound is returned when there is no requested header.
5151
ErrNotFound = errors.New("header: not found")
5252

53-
// ErrNoHead is returned when Store is empty (does not contain any known header).
54-
ErrNoHead = errors.New("header/store: no chain head")
53+
// ErrEmptyStore is returned when Store is empty (does not contain any known header).
54+
ErrEmptyStore = errors.New("header/store: store is empty")
5555

5656
// ErrHeadersLimitExceeded is returned when ExchangeServer receives header request for more
5757
// than maxRequestSize headers.
@@ -67,9 +67,6 @@ type Store[H Header[H]] interface {
6767
// Getter encompasses all getter methods for headers.
6868
Getter[H]
6969

70-
// Init initializes Store with the given head, meaning it is initialized with the genesis header.
71-
Init(context.Context, H) error
72-
7370
// Tail returns the oldest known header.
7471
Tail(context.Context) (H, error)
7572

p2p/exchange_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,5 +677,5 @@ func (t *timedOutStore) Head(
677677
...header.HeadOption[*headertest.DummyHeader],
678678
) (*headertest.DummyHeader, error) {
679679
time.Sleep(t.timeout)
680-
return nil, header.ErrNoHead
680+
return nil, header.ErrEmptyStore
681681
}

p2p/server_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ func TestExchangeServer_handleRequestTimeout(t *testing.T) {
1919
require.NoError(t, err)
2020
head := headertest.RandDummyHeader(t)
2121
head.HeightI %= 1000 // make it a bit lower
22-
err = s.Init(context.Background(), head)
22+
err = s.Append(context.Background(), head)
2323
require.NoError(t, err)
24+
time.Sleep(100 * time.Millisecond)
2425
server, err := NewExchangeServer[*headertest.DummyHeader](
2526
peer[0],
2627
s,

store/init.go

Lines changed: 0 additions & 30 deletions
This file was deleted.

store/init_test.go

Lines changed: 0 additions & 56 deletions
This file was deleted.

store/store.go

Lines changed: 78 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -113,27 +113,6 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store
113113
}, nil
114114
}
115115

116-
func (s *Store[H]) Init(ctx context.Context, initial H) error {
117-
if s.heightSub.Height() != 0 {
118-
return errors.New("store already initialized")
119-
}
120-
121-
// initialize with the initial head before first flush.
122-
s.contiguousHead.Store(&initial)
123-
s.heightSub.Init(initial.Height())
124-
125-
// trust the given header as the initial head
126-
err := s.flush(ctx, initial)
127-
if err != nil {
128-
return err
129-
}
130-
131-
s.tailHeader.Store(&initial)
132-
133-
log.Infow("initialized head", "height", initial.Height(), "hash", initial.Hash())
134-
return nil
135-
}
136-
137116
func (s *Store[H]) Start(ctx context.Context) error {
138117
// closed s.writesDn means that store was stopped before, recreate chan.
139118
select {
@@ -142,11 +121,8 @@ func (s *Store[H]) Start(ctx context.Context) error {
142121
default:
143122
}
144123

145-
if err := s.loadContiguousHead(ctx); err != nil {
146-
// we might start on an empty datastore, no key is okay.
147-
if !errors.Is(err, header.ErrNotFound) {
148-
return fmt.Errorf("header/store: cannot load headKey: %w", err)
149-
}
124+
if err := s.loadHeadAndTail(ctx); err != nil && !errors.Is(err, header.ErrNotFound) {
125+
return err
150126
}
151127

152128
go s.flushLoop()
@@ -175,52 +151,34 @@ func (s *Store[H]) Stop(ctx context.Context) error {
175151
// cleanup caches
176152
s.cache.Purge()
177153
s.heightIndex.cache.Purge()
154+
s.contiguousHead.Store(nil)
155+
s.tailHeader.Store(nil)
178156
return s.metrics.Close()
179157
}
180158

181159
func (s *Store[H]) Height() uint64 {
182160
return s.heightSub.Height()
183161
}
184162

185-
func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) {
186-
if head := s.contiguousHead.Load(); head != nil {
187-
return *head, nil
188-
}
189-
190-
head, err := s.GetByHeight(ctx, s.heightSub.Height())
191-
if err == nil {
192-
return head, nil
163+
func (s *Store[H]) Head(_ context.Context, _ ...header.HeadOption[H]) (H, error) {
164+
head := s.contiguousHead.Load()
165+
if head == nil {
166+
var zero H
167+
return zero, header.ErrEmptyStore
193168
}
194169

195-
var zero H
196-
head, err = s.readHead(ctx)
197-
switch {
198-
default:
199-
return zero, err
200-
case errors.Is(err, datastore.ErrNotFound), errors.Is(err, header.ErrNotFound):
201-
return zero, header.ErrNoHead
202-
case err == nil:
203-
s.heightSub.SetHeight(head.Height())
204-
log.Infow("loaded head", "height", head.Height(), "hash", head.Hash())
205-
return head, nil
206-
}
170+
return *head, nil
207171
}
208172

209173
// Tail implements [header.Store] interface.
210-
func (s *Store[H]) Tail(ctx context.Context) (H, error) {
211-
tailPtr := s.tailHeader.Load()
212-
if tailPtr != nil {
213-
return *tailPtr, nil
214-
}
215-
216-
tail, err := s.readTail(ctx)
217-
if err != nil {
174+
func (s *Store[H]) Tail(_ context.Context) (H, error) {
175+
tail := s.tailHeader.Load()
176+
if tail == nil {
218177
var zero H
219-
return zero, err
178+
return zero, header.ErrEmptyStore
220179
}
221180

222-
s.tailHeader.Store(&tail)
223-
return tail, nil
181+
return *tail, nil
224182
}
225183

226184
func (s *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
@@ -239,8 +197,7 @@ func (s *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
239197
}
240198

241199
h := header.New[H]()
242-
err = h.UnmarshalBinary(b)
243-
if err != nil {
200+
if err := h.UnmarshalBinary(b); err != nil {
244201
return zero, err
245202
}
246203

@@ -481,6 +438,7 @@ func (s *Store[H]) flushLoop() {
481438
defer close(s.writesDn)
482439
ctx := context.Background()
483440
for headers := range s.writes {
441+
s.ensureInit(headers)
484442
// add headers to the pending and ensure they are accessible
485443
s.pending.Append(headers...)
486444
// always inform heightSub about new headers seen.
@@ -547,15 +505,13 @@ func (s *Store[H]) flush(ctx context.Context, headers ...H) error {
547505
}
548506
}
549507

550-
// marshal and add to batch reference to the new head
508+
// marshal and add to batch reference to the new head and tail
551509
head := *s.contiguousHead.Load()
552-
b, err := head.Hash().MarshalJSON()
553-
if err != nil {
510+
if err := writeHeaderHashTo(ctx, batch, head, headKey); err != nil {
554511
return err
555512
}
556-
557-
err = batch.Put(ctx, headKey, b)
558-
if err != nil {
513+
tail := *s.tailHeader.Load()
514+
if err := writeHeaderHashTo(ctx, batch, tail, tailKey); err != nil {
559515
return err
560516
}
561517

@@ -568,55 +524,23 @@ func (s *Store[H]) flush(ctx context.Context, headers ...H) error {
568524
return batch.Commit(ctx)
569525
}
570526

571-
// loadContiguousHead from the disk and sets contiguousHead and heightSub.
572-
func (s *Store[H]) loadContiguousHead(ctx context.Context) error {
573-
h, err := s.readHead(ctx)
574-
if err != nil {
575-
return err
576-
}
577-
578-
s.contiguousHead.Store(&h)
579-
s.heightSub.SetHeight(h.Height())
580-
return nil
581-
}
582-
583-
// readHead loads the head from the datastore.
584-
func (s *Store[H]) readHead(ctx context.Context) (H, error) {
585-
var zero H
586-
b, err := s.ds.Get(ctx, headKey)
587-
if err != nil {
588-
if errors.Is(err, datastore.ErrNotFound) {
589-
return zero, header.ErrNotFound
590-
}
591-
return zero, err
592-
}
593-
594-
var head header.Hash
595-
err = head.UnmarshalJSON(b)
596-
if err != nil {
597-
return zero, err
598-
}
599-
600-
return s.Get(ctx, head)
601-
}
602-
603-
// readTail loads the tail from the datastore.
604-
func (s *Store[H]) readTail(ctx context.Context) (H, error) {
527+
// readByKey the hash under the given key from datastore and fetch the header by hash.
528+
func (s *Store[H]) readByKey(ctx context.Context, key datastore.Key) (H, error) {
605529
var zero H
606-
b, err := s.ds.Get(ctx, tailKey)
530+
b, err := s.ds.Get(ctx, key)
607531
if err != nil {
608532
if errors.Is(err, datastore.ErrNotFound) {
609533
return zero, header.ErrNotFound
610534
}
611535
return zero, err
612536
}
613537

614-
var tail header.Hash
615-
if err := tail.UnmarshalJSON(b); err != nil {
538+
var h header.Hash
539+
if err := h.UnmarshalJSON(b); err != nil {
616540
return zero, err
617541
}
618542

619-
return s.Get(ctx, tail)
543+
return s.Get(ctx, h)
620544
}
621545

622546
func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) {
@@ -663,6 +587,57 @@ func (s *Store[H]) nextContiguousHead(ctx context.Context, height uint64) H {
663587
return newHead
664588
}
665589

590+
func (s *Store[H]) loadHeadAndTail(ctx context.Context) error {
591+
head, err := s.readByKey(ctx, headKey)
592+
if err != nil {
593+
return fmt.Errorf("header/store: cannot load headKey: %w", err)
594+
}
595+
596+
tail, err := s.readByKey(ctx, tailKey)
597+
if err != nil {
598+
return fmt.Errorf("header/store: cannot load tailKey: %w", err)
599+
}
600+
601+
s.init(head, tail)
602+
return nil
603+
}
604+
605+
func (s *Store[H]) ensureInit(headers []H) {
606+
headExist, tailExist := s.contiguousHead.Load() != nil, s.tailHeader.Load() != nil
607+
if tailExist && headExist {
608+
return
609+
} else if tailExist || headExist {
610+
panic("header/store: head and tail must be both present or absent")
611+
}
612+
613+
tail, head := headers[0], headers[len(headers)-1]
614+
s.init(head, tail)
615+
}
616+
617+
func (s *Store[H]) init(head, tail H) {
618+
s.contiguousHead.Store(&head)
619+
s.heightSub.Init(head.Height())
620+
s.tailHeader.Store(&tail)
621+
}
622+
623+
func writeHeaderHashTo[H header.Header[H]](
624+
ctx context.Context,
625+
batch datastore.Batch,
626+
h H,
627+
key datastore.Key,
628+
) error {
629+
hashBytes, err := h.Hash().MarshalJSON()
630+
if err != nil {
631+
return err
632+
}
633+
634+
if err := batch.Put(ctx, key, hashBytes); err != nil {
635+
return err
636+
}
637+
638+
return nil
639+
}
640+
666641
// indexTo saves mapping between header Height and Hash to the given batch.
667642
func indexTo[H header.Header[H]](ctx context.Context, batch datastore.Batch, headers ...H) error {
668643
for _, h := range headers {

0 commit comments

Comments
 (0)