diff --git a/headertest/store.go b/headertest/store.go index f3705f2b..8ce96213 100644 --- a/headertest/store.go +++ b/headertest/store.go @@ -20,7 +20,7 @@ type Store[H header.Header[H]] struct { TailHeight uint64 onDeleteMu sync.Mutex - onDelete []func(context.Context, []H) error + onDelete []func(context.Context, uint64) error } // NewDummyStore creates a store for DummyHeader. @@ -80,26 +80,26 @@ func (m *Store[H]) GetByHeight(_ context.Context, height uint64) (H, error) { } func (m *Store[H]) DeleteTo(ctx context.Context, to uint64) error { - var deleted []H for h := m.TailHeight; h < to; h++ { - hdr, ok := m.Headers[h] - if ok { - delete(m.Headers, h) - deleted = append(deleted, hdr) + _, ok := m.Headers[h] + if !ok { + continue } - } - m.TailHeight = to - for _, deleteFn := range m.onDelete { - err := deleteFn(ctx, deleted) - if err != nil { - return err + for _, deleteFn := range m.onDelete { + err := deleteFn(ctx, h) + if err != nil { + return err + } } + delete(m.Headers, h) // must be after deleteFn } + + m.TailHeight = to return nil } -func (m *Store[H]) OnDelete(fn func(context.Context, []H) error) { +func (m *Store[H]) OnDelete(fn func(context.Context, uint64) error) { m.onDeleteMu.Lock() defer m.onDeleteMu.Unlock() diff --git a/interface.go b/interface.go index 33e7af2a..1c719fa6 100644 --- a/interface.go +++ b/interface.go @@ -88,8 +88,10 @@ type Store[H Header[H]] interface { // DeleteTo deletes the range [Tail():to). DeleteTo(ctx context.Context, to uint64) error - // OnDelete registers given handler to be called whenever headers are removed from the Store. - OnDelete(func(context.Context, []H) error) + // OnDelete registers given handler to be called whenever a header with the height is being removed. + // OnDelete guarantees that the header is accessible for the handler with GetByHeight and is removed + // only after the handler terminates with nil error. + OnDelete(handler func(ctx context.Context, height uint64) error) } // Getter contains the behavior necessary for a component to retrieve diff --git a/p2p/server_test.go b/p2p/server_test.go index 13796b75..0af96b11 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -197,4 +197,4 @@ func (timeoutStore[H]) DeleteTo(ctx context.Context, _ uint64) error { return ctx.Err() } -func (timeoutStore[H]) OnDelete(fn func(context.Context, []H) error) {} +func (timeoutStore[H]) OnDelete(fn func(context.Context, uint64) error) {} diff --git a/p2p/subscriber.go b/p2p/subscriber.go index a5559242..ba135c88 100644 --- a/p2p/subscriber.go +++ b/p2p/subscriber.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "time" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" @@ -85,7 +86,7 @@ func NewSubscriber[H header.Header[H]]( // be called separately to ensure a validator is mounted on the topic. func (s *Subscriber[H]) Start(context.Context) (err error) { log.Debugw("joining topic", "topic ID", s.pubsubTopicID) - err = s.pubsub.RegisterTopicValidator(s.pubsubTopicID, s.verifyMessage) + err = s.pubsub.RegisterTopicValidator(s.pubsubTopicID, s.verifyMessage, pubsub.WithValidatorTimeout(time.Minute)) if err != nil { return err } diff --git a/store/height_indexer.go b/store/height_indexer.go index 755dbf80..1e2c19ae 100644 --- a/store/height_indexer.go +++ b/store/height_indexer.go @@ -34,7 +34,11 @@ func newHeightIndexer[H header.Header[H]]( } // HashByHeight loads a header hash corresponding to the given height. -func (hi *heightIndexer[H]) HashByHeight(ctx context.Context, h uint64) (header.Hash, error) { +func (hi *heightIndexer[H]) HashByHeight( + ctx context.Context, + h uint64, + cache bool, +) (header.Hash, error) { if v, ok := hi.cache.Get(h); ok { return v, nil } @@ -44,6 +48,8 @@ func (hi *heightIndexer[H]) HashByHeight(ctx context.Context, h uint64) (header. return nil, err } - hi.cache.Add(h, header.Hash(val)) + if cache { + hi.cache.Add(h, header.Hash(val)) + } return val, nil } diff --git a/store/store.go b/store/store.go index 9f0f8f63..146f18fb 100644 --- a/store/store.go +++ b/store/store.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "slices" "sync" "sync/atomic" "time" @@ -13,6 +12,7 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" logging "github.com/ipfs/go-log/v2" + "go.uber.org/zap/zapcore" "github.com/celestiaorg/go-header" ) @@ -59,9 +59,10 @@ type Store[H header.Header[H]] struct { pending *batch[H] // syncCh is a channel used to synchronize writes syncCh chan chan struct{} + cancel context.CancelFunc onDeleteMu sync.Mutex - onDelete []func(context.Context, []H) error + onDelete []func(context.Context, uint64) error Params Parameters } @@ -129,11 +130,14 @@ func (s *Store[H]) Start(ctx context.Context) error { default: } - if err := s.loadHeadAndTail(ctx); err != nil && !errors.Is(err, header.ErrNotFound) { - return err + if err := s.init(ctx); err != nil { + return fmt.Errorf("header/store: initializing: %w", err) } - go s.flushLoop() + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + + go s.flushLoop(ctx) return nil } @@ -146,6 +150,7 @@ func (s *Store[H]) Stop(ctx context.Context) error { // signal to prevent further writes to Store select { case s.writes <- nil: + s.cancel() case <-ctx.Done(): return ctx.Err() } @@ -271,7 +276,7 @@ func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) { return h, nil } - hash, err := s.heightIndex.HashByHeight(ctx, height) + hash, err := s.heightIndex.HashByHeight(ctx, height, true) if err != nil { var zero H if errors.Is(err, datastore.ErrNotFound) { @@ -357,149 +362,7 @@ func (s *Store[H]) HasAt(ctx context.Context, height uint64) bool { return head.Height() >= height && height >= tail.Height() } -func (s *Store[H]) OnDelete(fn func(context.Context, []H) error) { - s.onDeleteMu.Lock() - defer s.onDeleteMu.Unlock() - - s.onDelete = append(s.onDelete, func(ctx context.Context, h []H) (rerr error) { - defer func() { - err := recover() - if err != nil { - rerr = fmt.Errorf("header/store: user provided onDelete panicked with: %s", err) - } - }() - return fn(ctx, h) - }) -} - -// DeleteTo implements [header.Store] interface. -func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error { - // ensure all the pending headers are synchronized - err := s.Sync(ctx) - if err != nil { - return err - } - - head, err := s.Head(ctx) - if err != nil { - return fmt.Errorf("header/store: reading head: %w", err) - } - if head.Height()+1 < to { - _, err := s.getByHeight(ctx, to) - if err != nil { - return fmt.Errorf( - "header/store: delete to %d beyond current head(%d)", - to, - head.Height(), - ) - } - - // if `to` is bigger than the current head and is stored - allow delete, making `to` a new head - } - - tail, err := s.Tail(ctx) - if err != nil { - return fmt.Errorf("header/store: reading tail: %w", err) - } - if tail.Height() >= to { - return fmt.Errorf("header/store: delete to %d below current tail(%d)", to, tail.Height()) - } - - if err := s.deleteRange(ctx, tail.Height(), to); err != nil { - return fmt.Errorf("header/store: delete to height %d: %w", to, err) - } - - if head.Height()+1 == to { - // this is the case where we have deleted all the headers - // wipe the store - if err := s.wipe(ctx); err != nil { - return fmt.Errorf("header/store: wipe: %w", err) - } - } - - return nil -} - -var maxHeadersLoadedPerDelete uint64 = 512 - -func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) error { - log.Infow("deleting headers", "from_height", from, "to_height", to) - for from < to { - amount := min(to-from, maxHeadersLoadedPerDelete) - toDelete := from + amount - headers := make([]H, 0, amount) - - for height := from; height < toDelete; height++ { - // take headers individually instead of range - // as getRangeByHeight can't deal with potentially missing headers - h, err := s.getByHeight(ctx, height) - if errors.Is(err, header.ErrNotFound) { - log.Warnf("header/store: attempt to delete header that's not found", height) - continue - } - if err != nil { - return fmt.Errorf("getting header while deleting: %w", err) - } - - headers = append(headers, h) - } - - batch, err := s.ds.Batch(ctx) - if err != nil { - return fmt.Errorf("new batch: %w", err) - } - - s.onDeleteMu.Lock() - onDelete := slices.Clone(s.onDelete) - s.onDeleteMu.Unlock() - for _, deleteFn := range onDelete { - if err := deleteFn(ctx, headers); err != nil { - // abort deletion if onDelete handler fails - // to ensure atomicity between stored headers and user specific data - // TODO(@Wondertan): Batch is not actually atomic and could write some data at this point - // but its fine for now: https://github.com/celestiaorg/go-header/issues/307 - // TODO2(@Wondertan): Once we move to txn, find a way to pass txn through context, - // so that users can use it in their onDelete handlers - // to ensure atomicity between deleted headers and user specific data - return fmt.Errorf("on delete handler: %w", err) - } - } - - for _, h := range headers { - if err := batch.Delete(ctx, hashKey(h.Hash())); err != nil { - return fmt.Errorf("delete hash key (%X): %w", h.Hash(), err) - } - if err := batch.Delete(ctx, heightKey(h.Height())); err != nil { - return fmt.Errorf("delete height key (%d): %w", h.Height(), err) - } - } - - err = s.setTail(ctx, batch, toDelete) - if err != nil { - return fmt.Errorf("setting tail to %d: %w", toDelete, err) - } - - if err := batch.Commit(ctx); err != nil { - return fmt.Errorf("committing delete batch [%d:%d): %w", from, toDelete, err) - } - - // cleanup caches after disk is flushed - for _, h := range headers { - s.cache.Remove(h.Hash().String()) - s.heightIndex.cache.Remove(h.Height()) - } - s.pending.DeleteRange(from, toDelete) - - log.Infow("deleted header range", "from_height", from, "to_height", toDelete) - - // move iterator - from = toDelete - } - - return nil -} - -func (s *Store[H]) setTail(ctx context.Context, batch datastore.Batch, to uint64) error { +func (s *Store[H]) setTail(ctx context.Context, write datastore.Write, to uint64) error { newTail, err := s.getByHeight(ctx, to) if errors.Is(err, header.ErrNotFound) { return nil @@ -510,17 +373,16 @@ func (s *Store[H]) setTail(ctx context.Context, batch datastore.Batch, to uint64 // set directly to `to`, avoiding iteration in recedeTail s.tailHeader.Store(&newTail) - if err := writeHeaderHashTo(ctx, batch, newTail, tailKey); err != nil { + if err := writeHeaderHashTo(ctx, write, newTail, tailKey); err != nil { return fmt.Errorf("writing tailKey in batch: %w", err) } + log.Infow("new tail", "height", newTail.Height(), "hash", newTail.Hash()) + s.metrics.newTail(newTail.Height()) // update head as well, if delete went over it - head, err := s.Head(ctx) - if err != nil { - return err - } - if to > head.Height() { - if err := writeHeaderHashTo(ctx, batch, newTail, headKey); err != nil { + head, _ := s.Head(ctx) + if head.IsZero() || to > head.Height() { + if err := writeHeaderHashTo(ctx, write, newTail, headKey); err != nil { return fmt.Errorf("writing headKey in batch: %w", err) } s.contiguousHead.Store(&newTail) @@ -574,9 +436,8 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { // This way writes are controlled and manageable from one place allowing // (1) Appends not to be blocked on long disk IO writes and underlying DB compactions // (2) Batching header writes -func (s *Store[H]) flushLoop() { +func (s *Store[H]) flushLoop(ctx context.Context) { defer close(s.writesDn) - ctx := context.Background() flush := func(headers []H) { s.ensureInit(headers) @@ -703,10 +564,21 @@ func (s *Store[H]) readByKey(ctx context.Context, key datastore.Key) (H, error) var h header.Hash if err := h.UnmarshalJSON(b); err != nil { + return zero, fmt.Errorf("unmarshaling header hash at %s key: %w", key, err) + } + + hdr, err := s.Get(ctx, h) + if err != nil { + if errors.Is(err, header.ErrNotFound) { + derr := s.ds.Delete(ctx, key) + if derr != nil { + err = errors.Join(err, fmt.Errorf("deleting key %s, header for which was not found: %w", key, derr)) + } + } return zero, err } - return s.Get(ctx, h) + return hdr, nil } func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { @@ -756,14 +628,26 @@ func (s *Store[H]) nextHead(ctx context.Context) (head H, changed bool) { return head, false } - for { + for ctx.Err() == nil { h, err := s.getByHeight(ctx, head.Height()+1) if err != nil { + log.Debugw("next head error", "current", head.Height(), "err", err) return head, changed } + + if !changed && log.Level() == zapcore.DebugLevel { + now := time.Now() + log.Debugw("advancing head", "start_height", head.Height()) + defer func() { + log.Debugw("finished advancing head", "end_height", head.Height(), "took(s)", time.Since(now)) + }() + } + head = h changed = true } + + return head, changed } // nextTail finds the new contiguous Tail by iterating the current Tail down until the older height Tail is found. @@ -775,49 +659,76 @@ func (s *Store[H]) nextTail(ctx context.Context) (tail H, changed bool) { return tail, false } - for { + for ctx.Err() == nil { h, err := s.getByHeight(ctx, tail.Height()-1) if err != nil { + log.Debugw("next tail error", "current", tail.Height(), "err", err) return tail, changed } + + if !changed && log.Level() == zapcore.DebugLevel { + now := time.Now() + log.Debugw("receding tail", "start_height", tail.Height()) + defer func() { + log.Debugw("finished receding tail", "end_height", tail.Height(), "took(s)", time.Since(now)) + }() + } + tail = h changed = true } + + log.Debugw("just left next tail", "current", tail.Height(), "err", ctx.Err()) + return tail, changed } -func (s *Store[H]) loadHeadAndTail(ctx context.Context) error { +// init loads the head and tail headers and sets them on the store. +// allows partial initialization of either tail or head if one of the is not found. +func (s *Store[H]) init(ctx context.Context) error { head, err := s.readByKey(ctx, headKey) - if err != nil { - return fmt.Errorf("header/store: cannot load headKey: %w", err) + if err != nil && !errors.Is(err, header.ErrNotFound) { + return fmt.Errorf("reading headKey: %w", err) + } + if !head.IsZero() { + s.contiguousHead.Store(&head) + s.heightSub.Init(head.Height()) + log.Debugw("initialized head", "height", head.Height()) } tail, err := s.readByKey(ctx, tailKey) - if err != nil { - return fmt.Errorf("header/store: cannot load tailKey: %w", err) + if err != nil && !errors.Is(err, header.ErrNotFound) { + return fmt.Errorf("reading tailKey: %w", err) + } + if !tail.IsZero() { + s.tailHeader.Store(&tail) + log.Debugw("initialized tail", "height", tail.Height()) } - s.init(head, tail) return nil } +// ensureInit initializes the store with the given headers if it is not already initialized. func (s *Store[H]) ensureInit(headers []H) { - headExist, tailExist := s.contiguousHead.Load() != nil, s.tailHeader.Load() != nil - if len(headers) == 0 || (tailExist && headExist) { + if len(headers) == 0 { return - } else if tailExist || headExist { - panic("header/store: head and tail must be both present or absent") } - tail, head := headers[0], headers[len(headers)-1] - s.init(head, tail) -} + if headPtr := s.contiguousHead.Load(); headPtr == nil { + head := headers[len(headers)-1] + if s.contiguousHead.CompareAndSwap(headPtr, &head) { + s.heightSub.Init(head.Height()) + log.Debugw("initialized head", "height", head.Height()) + } + } -func (s *Store[H]) init(head, tail H) { - s.contiguousHead.Store(&head) - s.heightSub.Init(head.Height()) - s.tailHeader.Store(&tail) + if tailPtr := s.tailHeader.Load(); tailPtr == nil { + tail := headers[0] + s.tailHeader.CompareAndSwap(tailPtr, &tail) + log.Debugw("initialized tail", "height", tail.Height()) + } } +// deinit deinitializes the store. func (s *Store[H]) deinit() { s.cache.Purge() s.heightIndex.cache.Purge() @@ -828,7 +739,7 @@ func (s *Store[H]) deinit() { func writeHeaderHashTo[H header.Header[H]]( ctx context.Context, - batch datastore.Batch, + write datastore.Write, h H, key datastore.Key, ) error { @@ -837,7 +748,7 @@ func writeHeaderHashTo[H header.Header[H]]( return err } - if err := batch.Put(ctx, key, hashBytes); err != nil { + if err := write.Put(ctx, key, hashBytes); err != nil { return err } diff --git a/store/store_delete.go b/store/store_delete.go new file mode 100644 index 00000000..3d40f466 --- /dev/null +++ b/store/store_delete.go @@ -0,0 +1,297 @@ +package store + +import ( + "context" + "errors" + "fmt" + "runtime" + "slices" + "sync" + "sync/atomic" + "time" + + "github.com/ipfs/go-datastore" + + "github.com/celestiaorg/go-header" +) + +// OnDelete implements [header.Store] interface. +func (s *Store[H]) OnDelete(fn func(context.Context, uint64) error) { + s.onDeleteMu.Lock() + defer s.onDeleteMu.Unlock() + + s.onDelete = append(s.onDelete, func(ctx context.Context, height uint64) (rerr error) { + defer func() { + err := recover() + if err != nil { + rerr = fmt.Errorf( + "header/store: user provided onDelete panicked on %d with: %s", + height, + err, + ) + } + }() + return fn(ctx, height) + }) +} + +// DeleteTo implements [header.Store] interface. +func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error { + // ensure all the pending headers are synchronized + err := s.Sync(ctx) + if err != nil { + return err + } + + head, err := s.Head(ctx) + if err != nil { + return fmt.Errorf("header/store: reading head: %w", err) + } + if head.Height()+1 < to { + _, err := s.getByHeight(ctx, to) + if errors.Is(err, header.ErrNotFound) { + return fmt.Errorf( + "header/store: delete to %d beyond current head(%d)", + to, + head.Height(), + ) + } + if err != nil { + return fmt.Errorf("delete to potential new head: %w", err) + } + + // if `to` is bigger than the current head and is stored - allow delete, making `to` a new head + } + + tail, err := s.Tail(ctx) + if err != nil { + return fmt.Errorf("header/store: reading tail: %w", err) + } + if tail.Height() >= to { + return fmt.Errorf("header/store: delete to %d below current tail(%d)", to, tail.Height()) + } + + if err := s.deleteRange(ctx, tail.Height(), to); err != nil { + return fmt.Errorf("header/store: delete to height %d: %w", to, err) + } + + if head.Height()+1 == to { + // this is the case where we have deleted all the headers + // wipe the store + if err := s.wipe(ctx); err != nil { + return fmt.Errorf("header/store: wipe: %w", err) + } + } + + return nil +} + +// deleteRangeParallelThreshold defines the threshold for parallel deletion. +// If range is smaller than this threshold, deletion will be performed sequentially. +var deleteRangeParallelThreshold uint64 = 10000 + +// deleteRange deletes [from:to) header range from the store. +func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) error { + if to-from < deleteRangeParallelThreshold { + return s.deleteSequential(ctx, from, to) + } + + return s.deleteParallel(ctx, from, to) +} + +// deleteSequential deletes [from:to) header range from the store sequentially. +func (s *Store[H]) deleteSequential(ctx context.Context, from, to uint64) (err error) { + log.Debugw("starting delete range sequential", "from_height", from, "to_height", to) + + batch, err := s.ds.Batch(ctx) + if err != nil { + return fmt.Errorf("new batch: %w", err) + } + // ctx = badger4.WithBatch(ctx, batch) + + height := from + defer func() { + if derr := s.setTail(ctx, s.ds, height); derr != nil { + err = errors.Join(err, fmt.Errorf("setting tail to %d: %w", height, derr)) + } + + if derr := batch.Commit(ctx); derr != nil { + err = errors.Join(err, fmt.Errorf("committing batch: %w", derr)) + } + }() + + s.onDeleteMu.Lock() + onDelete := slices.Clone(s.onDelete) + s.onDeleteMu.Unlock() + + for ; height < to; height++ { + if err := s.delete(ctx, height, batch, onDelete); err != nil { + return err + } + } + + return nil +} + +// delete deletes a single header from the store, its caches and indexies, notifying any registered onDelete handlers. +func (s *Store[H]) delete( + ctx context.Context, + height uint64, + batch datastore.Batch, + onDelete []func(ctx context.Context, height uint64) error, +) error { + // some of the methods may not handle context cancellation properly + if ctx.Err() != nil { + return context.Cause(ctx) + } + + hash, err := s.heightIndex.HashByHeight(ctx, height, false) + if errors.Is(err, datastore.ErrNotFound) { + log.Warnw("attempt to delete header that's not found", "height", height) + return nil + } + if err != nil { + return fmt.Errorf("hash by height %d: %w", height, err) + } + + for _, deleteFn := range onDelete { + if err := deleteFn(ctx, height); err != nil { + return fmt.Errorf("on delete handler for %d: %w", height, err) + } + } + + if err := batch.Delete(ctx, hashKey(hash)); err != nil { + return fmt.Errorf("delete hash key (%X): %w", hash, err) + } + if err := batch.Delete(ctx, heightKey(height)); err != nil { + return fmt.Errorf("delete height key (%d): %w", height, err) + } + + s.cache.Remove(hash.String()) + s.heightIndex.cache.Remove(height) + s.pending.DeleteRange(height, height+1) + return nil +} + +// workerNum defines how many parallel delete workers to run +// Scales of number of CPUs configred for the process. +var workerNum = runtime.GOMAXPROCS(-1) * 3 + +// deleteParallel deletes [from:to) header range from the store in parallel. +// It gracefully handles context and errors attempting to save interrupted progress. +func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (err error) { + log.Debugw("starting delete range parallel", "from_height", from, "to_height", to) + + deleteCtx, cancel := context.WithCancel(ctx) + defer cancel() + + startTime := time.Now() + if deadline, ok := ctx.Deadline(); ok { + // allocate 95% of caller's set deadline for deletion + // and give leftover to save progress + // this prevents store's state corruption from partial deletion + sub := deadline.Sub(startTime) / 100 * 95 + var cancel context.CancelFunc + deleteCtx, cancel = context.WithDeadlineCause(ctx, startTime.Add(sub), errDeleteTimeout) + defer cancel() + } + + var highestDeleted atomic.Uint64 + defer func() { + newTailHeight := highestDeleted.Load() + 1 + if err != nil { + if errors.Is(err, errDeleteTimeout) { + log.Warnw("partial delete", + "from_height", from, + "expected_to_height", to, + "actual_to_height", newTailHeight, + "took(s)", time.Since(startTime), + ) + } else { + log.Errorw("partial delete with error", + "from_height", from, + "expected_to_height", to, + "actual_to_height", newTailHeight, + "took(s)", time.Since(startTime), + "err", err, + ) + } + } else if to-from > 1 { + log.Infow("deleted headers", "from_height", from, "to_height", to, "took", time.Since(startTime)) + } + + if derr := s.setTail(ctx, s.ds, newTailHeight); derr != nil { + err = errors.Join(err, fmt.Errorf("setting tail to %d: %w", newTailHeight, derr)) + } + }() + + s.onDeleteMu.Lock() + onDelete := slices.Clone(s.onDelete) + s.onDeleteMu.Unlock() + + jobCh := make(chan uint64, workerNum) + errCh := make(chan error, 1) + + worker := func() { + batch, err := s.ds.Batch(ctx) + if err != nil { + errCh <- fmt.Errorf("new batch: %w", err) + return + } + // deleteCtx := badger4.WithBatch(deleteCtx, batch) + + defer func() { + if err := batch.Commit(ctx); err != nil { + errCh <- fmt.Errorf("committing delete batch: %w", err) + } + }() + + var lastHeight uint64 + defer func() { + highest := highestDeleted.Load() + for lastHeight > highest && !highestDeleted.CompareAndSwap(highest, lastHeight) { + highest = highestDeleted.Load() + } + }() + + for height := range jobCh { + if err := s.delete(deleteCtx, height, batch, onDelete); err != nil { + select { + case errCh <- fmt.Errorf("delete header %d: %w", height, err): + default: + } + return + } + + lastHeight = height + } + } + + var wg sync.WaitGroup + wg.Add(workerNum) + for range workerNum { + go func() { + defer wg.Done() + worker() + }() + } + defer wg.Wait() + + for i, height := 0, from; height < to; height++ { + select { + case jobCh <- height: + i++ + if uint64(1)%deleteRangeParallelThreshold == 0 { + log.Debugf("deleting %dth header height %d", deleteRangeParallelThreshold, height) + } + case err = <-errCh: + close(jobCh) + return err + } + } + + close(jobCh) + return err +} + +var errDeleteTimeout = errors.New("delete timeout") diff --git a/store/store_recover.go b/store/store_recover.go new file mode 100644 index 00000000..c59dcd13 --- /dev/null +++ b/store/store_recover.go @@ -0,0 +1,49 @@ +package store + +import ( + "context" + + "github.com/celestiaorg/go-header" +) + +// ResetTail resets the tail of the store to be at the given height. +// The new tail must be present in the store. +// WARNING: Only use this function if you know what you are doing. +func ResetTail[H header.Header[H]](ctx context.Context, store *Store[H], height uint64) error { + if err := store.setTail(ctx, store.ds, height); err != nil { + return err + } + + return nil +} + +// ResetHead resets the head of the store to be at the given height. +// The new head must be present in the store. +// WARNING: Only use this function if you know what you are doing. +func ResetHead[H header.Header[H]](ctx context.Context, store *Store[H], height uint64) error { + newHead, err := store.getByHeight(ctx, height) + if err != nil { + return err + } + + if err := writeHeaderHashTo(ctx, store.ds, newHead, headKey); err != nil { + return err + } + + store.contiguousHead.Store(&newHead) + return nil +} + +// FindHeader forward iterates over the store starting from the given height until it finds any stored header +// or the context is cancelled. +func FindHeader[H header.Header[H]](ctx context.Context, store *Store[H], startFrom uint64) (H, error) { + for height := startFrom; ctx.Err() == nil; height++ { + header, err := store.getByHeight(ctx, height) + if err == nil { + return header, nil + } + } + + var zero H + return zero, ctx.Err() +} diff --git a/store/store_test.go b/store/store_test.go index f74a5dbc..f9ffca0c 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -526,8 +526,6 @@ func TestStore_GetRange(t *testing.T) { } func TestStore_DeleteTo(t *testing.T) { - maxHeadersLoadedPerDelete = 10 - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) @@ -657,8 +655,6 @@ func TestStore_DeleteTo_EmptyStore(t *testing.T) { } func TestStore_DeleteTo_MoveHeadAndTail(t *testing.T) { - maxHeadersLoadedPerDelete = 1 - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) @@ -732,6 +728,45 @@ func TestStore_DeleteTo_Synchronized(t *testing.T) { require.EqualValues(t, 100, tail.Height()) } +func TestStore_OnDelete(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store, err := NewStore[*headertest.DummyHeader](ds) + require.NoError(t, err) + + err = store.Start(ctx) + require.NoError(t, err) + + err = store.Append(ctx, suite.GenDummyHeaders(50)...) + require.NoError(t, err) + // artificial gap + _ = suite.GenDummyHeaders(50) + + err = store.Append(ctx, suite.GenDummyHeaders(50)...) + require.NoError(t, err) + + deleted := 0 + store.OnDelete(func(ctx context.Context, height uint64) error { + hdr, err := store.GetByHeight(ctx, height) + assert.NoError(t, err, "must be accessible") + require.NotNil(t, hdr) + deleted++ + return nil + }) + + err = store.DeleteTo(ctx, 101) + require.NoError(t, err) + assert.Equal(t, 50, deleted) + + hdr, err := store.GetByHeight(ctx, 50) + assert.Error(t, err) + assert.Nil(t, hdr) +} + func TestStorePendingCacheMiss(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) diff --git a/sync/syncer_head.go b/sync/syncer_head.go index b38e72f7..aa0ba242 100644 --- a/sync/syncer_head.go +++ b/sync/syncer_head.go @@ -227,7 +227,11 @@ func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, head H) error { // verify verifies given network head candidate. func (s *Syncer[H]) verify(ctx context.Context, newHead H) error { - sbjHead, _, err := s.subjectiveHead(ctx) + // TODO(@Wondertan): This has to be subjective head. + // But due to an edge case during subjective init, this might be an expired tail + // triggering subjective reinit death loop. + // This can and will be fixed with bsync, + sbjHead, err := s.localHead(ctx) if err != nil { log.Errorw("getting subjective head during new network head verification", "err", err) return err diff --git a/sync/syncer_tail.go b/sync/syncer_tail.go index 2f50e6cb..96b6ba2d 100644 --- a/sync/syncer_tail.go +++ b/sync/syncer_tail.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/celestiaorg/go-header" ) @@ -60,6 +61,13 @@ func (s *Syncer[H]) renewTail(ctx context.Context, oldTail, head H) (newTail H, newTail, err = s.store.Get(ctx, tailHash) if err == nil { + if oldTail.IsZero() { + // old tail is zero while the requested tail was found locally? + // something may go wrong with store, try to recover it with an append + if err := s.store.Append(ctx, newTail); err != nil { + return newTail, fmt.Errorf("appending tail header %d: %w", newTail.Height(), err) + } + } return newTail, nil } if !errors.Is(err, header.ErrNotFound) { @@ -85,6 +93,13 @@ func (s *Syncer[H]) renewTail(ctx context.Context, oldTail, head H) (newTail H, // check if the new tail is below the current head to avoid heightSub blocking newTail, err = s.store.GetByHeight(ctx, tailHeight) if err == nil { + if oldTail.IsZero() { + // old tail is zero while the requested tail was found locally? + // something may go wrong with store, try to recover it with an append + if err := s.store.Append(ctx, newTail); err != nil { + return newTail, fmt.Errorf("appending tail header %d: %w", newTail.Height(), err) + } + } return newTail, nil } if !errors.Is(err, header.ErrNotFound) { @@ -213,6 +228,7 @@ func (s *Syncer[H]) findTailHeight(ctx context.Context, oldTail, head H) (uint64 switch { case tailTimeDiff <= 0: // current tail is relevant as is + log.Debugw("relevant old tail", "expected_tail_time", expectedTailTime.Format(time.DateTime), "current_tail_time", currentTailTime.Format(time.DateTime)) return oldTail.Height(), nil case tailTimeDiff >= window: // current and expected tails are far from each other