Skip to content
16 changes: 13 additions & 3 deletions headertest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ func (m *Store[H]) GetByHeight(_ context.Context, height uint64) (H, error) {
return zero, header.ErrNotFound
}

func (m *Store[H]) DeleteTo(ctx context.Context, to uint64) error {
for h := m.TailHeight; h < to; h++ {
func (m *Store[H]) DeleteRange(ctx context.Context, from, to uint64) error {
// Delete headers in the range [from:to)
for h := from; h < to; h++ {
_, ok := m.Headers[h]
if !ok {
continue
Expand All @@ -95,7 +96,16 @@ func (m *Store[H]) DeleteTo(ctx context.Context, to uint64) error {
delete(m.Headers, h) // must be after deleteFn
}

m.TailHeight = to
// Update TailHeight if we deleted from the beginning
if from <= m.TailHeight {
m.TailHeight = to
}

// Update HeadHeight if we deleted from the end
if to >= m.HeadHeight {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fail out here if to > m.HeadHeight (we should catch this err case in tests)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or actually do we have to allow this condition as we allow deletion --> beyond store head (to sync head) ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it is fine to be able to delete beyond. it allows you to put a big number if you do not know the head.

m.HeadHeight = from - 1
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ type Store[H Header[H]] interface {
// GetRange returns the range [from:to).
GetRange(context.Context, uint64, uint64) ([]H, error)

// DeleteTo deletes the range [Tail():to).
DeleteTo(ctx context.Context, to uint64) error
// DeleteRange deletes the range [from:to).
DeleteRange(ctx context.Context, from, to uint64) error

// OnDelete registers given handler to be called whenever a header with the height is being removed.
// OnDelete guarantees that the header is accessible for the handler with GetByHeight and is removed
Expand Down
2 changes: 1 addition & 1 deletion p2p/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (timeoutStore[H]) GetRange(ctx context.Context, _, _ uint64) ([]H, error) {
return nil, ctx.Err()
}

func (timeoutStore[H]) DeleteTo(ctx context.Context, _ uint64) error {
func (timeoutStore[H]) DeleteRange(ctx context.Context, _, _ uint64) error {
<-ctx.Done()
return ctx.Err()
}
Expand Down
301 changes: 183 additions & 118 deletions store/store_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"time"

"github.com/ipfs/go-datastore"

"github.com/celestiaorg/go-header"
)

// OnDelete implements [header.Store] interface.
Expand All @@ -36,129 +34,13 @@ func (s *Store[H]) OnDelete(fn func(context.Context, uint64) error) {
})
}

// DeleteTo implements [header.Store] interface.
func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error {
// ensure all the pending headers are synchronized
err := s.Sync(ctx)
if err != nil {
return err
}

head, err := s.Head(ctx)
if err != nil {
return fmt.Errorf("header/store: reading head: %w", err)
}
if head.Height()+1 < to {
_, err := s.getByHeight(ctx, to)
if errors.Is(err, header.ErrNotFound) {
return fmt.Errorf(
"header/store: delete to %d beyond current head(%d)",
to,
head.Height(),
)
}
if err != nil {
return fmt.Errorf("delete to potential new head: %w", err)
}

// if `to` is bigger than the current head and is stored - allow delete, making `to` a new head
}

tail, err := s.Tail(ctx)
if err != nil {
return fmt.Errorf("header/store: reading tail: %w", err)
}
if tail.Height() >= to {
return fmt.Errorf("header/store: delete to %d below current tail(%d)", to, tail.Height())
}

err = s.deleteRange(ctx, tail.Height(), to)
if errors.Is(err, header.ErrNotFound) && head.Height()+1 == to {
// this is the case where we have deleted all the headers
// wipe the store
if err := s.wipe(ctx); err != nil {
return fmt.Errorf("header/store: wipe: %w", err)
}

return nil
}
if err != nil {
return fmt.Errorf("header/store: delete to height %d: %w", to, err)
}

return nil
}

// deleteRangeParallelThreshold defines the threshold for parallel deletion.
// If range is smaller than this threshold, deletion will be performed sequentially.
var (
deleteRangeParallelThreshold uint64 = 10000
errDeleteTimeout = errors.New("delete timeout")
)

// deleteRange deletes [from:to) header range from the store.
// It gracefully handles context and errors attempting to save interrupted progress.
func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error) {
startTime := time.Now()

var (
height uint64
missing int
)
defer func() {
if err != nil {
if errors.Is(err, errDeleteTimeout) {
log.Warnw("partial delete",
"from_height", from,
"expected_to_height", to,
"actual_to_height", height,
"hdrs_not_found", missing,
"took(s)", time.Since(startTime),
)
} else {
log.Errorw("partial delete with error",
"from_height", from,
"expected_to_height", to,
"actual_to_height", height,
"hdrs_not_found", missing,
"took(s)", time.Since(startTime),
"err", err,
)
}
} else if to-from > 1 {
log.Debugw("deleted headers",
"from_height", from,
"to_height", to,
"hdrs_not_found", missing,
"took(s)", time.Since(startTime).Seconds(),
)
}

if derr := s.setTail(ctx, s.ds, height); derr != nil {
err = errors.Join(err, fmt.Errorf("setting tail to %d: %w", height, derr))
}
}()

deleteCtx := ctx
if deadline, ok := ctx.Deadline(); ok {
// allocate 95% of caller's set deadline for deletion
// and give leftover to save progress
// this prevents store's state corruption from partial deletion
sub := deadline.Sub(startTime) / 100 * 95
var cancel context.CancelFunc
deleteCtx, cancel = context.WithDeadlineCause(ctx, startTime.Add(sub), errDeleteTimeout)
defer cancel()
}

if to-from < deleteRangeParallelThreshold {
height, missing, err = s.deleteSequential(deleteCtx, from, to)
} else {
height, missing, err = s.deleteParallel(deleteCtx, from, to)
}

return err
}

// deleteSingle deletes a single header from the store,
// its caches and indexies, notifying any registered onDelete handlers.
func (s *Store[H]) deleteSingle(
Expand Down Expand Up @@ -348,3 +230,186 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64,
)
return highest, missing, nil
}

// DeleteRange deletes headers in the range [from:to) from the store.
// It intelligently updates head and/or tail pointers based on what range is being deleted.
func (s *Store[H]) DeleteRange(ctx context.Context, from, to uint64) error {
// ensure all the pending headers are synchronized
err := s.Sync(ctx)
if err != nil {
return err
}

head, err := s.Head(ctx)
if err != nil {
return fmt.Errorf("header/store: reading head: %w", err)
}

tail, err := s.Tail(ctx)
if err != nil {
return fmt.Errorf("header/store: reading tail: %w", err)
}

// validate range parameters
if from >= to {
return fmt.Errorf(
"header/store: invalid range [%d:%d) - from must be less than to",
from,
to,
)
}

if from < tail.Height() {
return fmt.Errorf(
"header/store: delete range from %d below current tail(%d)",
from,
tail.Height(),
)
}

// Note: Allow deletion beyond head to match original DeleteTo behavior
// Missing headers in the range will be handled gracefully by the deletion logic

// if range is empty within the current store bounds, it's a no-op
if from > head.Height() || to <= tail.Height() {
return nil
}

// Validate that deletion won't create gaps in the store
// Only allow deletions that:
// 1. Start from tail (advancing tail forward)
// 2. End at head+1 (moving head backward)
// 3. Delete the entire store
if from > tail.Height() && to <= head.Height() {
return fmt.Errorf(
"header/store: deletion range [%d:%d) would create gaps in the store. "+
"Only deletion from tail (%d) or to head+1 (%d) is supported",
from, to, tail.Height(), head.Height()+1,
)
}

// Check if we're deleting all existing headers (making store empty)
// Only wipe if 'to' is exactly at head+1 (normal case) to avoid accidental wipes
if from <= tail.Height() && to == head.Height()+1 {
// Check if any headers exist at or beyond 'to'
hasHeadersAtOrBeyond := false
for checkHeight := to; checkHeight <= to+10; checkHeight++ {
if _, err := s.getByHeight(ctx, checkHeight); err == nil {
hasHeadersAtOrBeyond = true
break
}
}

if !hasHeadersAtOrBeyond {
// wipe the entire store
if err := s.wipe(ctx); err != nil {
return fmt.Errorf("header/store: wipe: %w", err)
}
return nil
}
}

// Determine which pointers need updating
updateTail := from <= tail.Height()
updateHead := to > head.Height()

// Delete the headers without automatic tail updates
err = s.deleteRangeRaw(ctx, from, to)
if err != nil {
return fmt.Errorf("header/store: delete range [%d:%d): %w", from, to, err)
}

// Update tail if we deleted from the beginning
if updateTail {
err = s.setTail(ctx, s.ds, to)
if err != nil {
return fmt.Errorf("header/store: setting tail to %d: %w", to, err)
}
}

// Update head if we deleted from the end
if updateHead && from > tail.Height() {
newHeadHeight := from - 1
if newHeadHeight >= tail.Height() {
err = s.setHead(ctx, s.ds, newHeadHeight)
if err != nil {
return fmt.Errorf("header/store: setting head to %d: %w", newHeadHeight, err)
}
}
}

return nil
}

// deleteRangeRaw deletes [from:to) header range without updating head or tail pointers.
func (s *Store[H]) deleteRangeRaw(ctx context.Context, from, to uint64) (err error) {
startTime := time.Now()

var (
height uint64
missing int
)
defer func() {
if err != nil {
if errors.Is(err, errDeleteTimeout) {
log.Warnw("partial delete range",
"from_height", from,
"expected_to_height", to,
"actual_to_height", height,
"hdrs_not_found", missing,
"took(s)", time.Since(startTime),
)
} else {
log.Errorw("partial delete range with error",
"from_height", from,
"expected_to_height", to,
"actual_to_height", height,
"hdrs_not_found", missing,
"took(s)", time.Since(startTime),
"err", err,
)
}
} else if to-from > 1 {
log.Debugw("deleted range",
"from_height", from,
"to_height", to,
"hdrs_not_found", missing,
"took(s)", time.Since(startTime).Seconds(),
)
}
}()

deleteCtx := ctx
if deadline, ok := ctx.Deadline(); ok {
// allocate 95% of caller's set deadline for deletion
// and give leftover to save progress
sub := deadline.Sub(startTime) / 100 * 95
var cancel context.CancelFunc
deleteCtx, cancel = context.WithDeadlineCause(ctx, startTime.Add(sub), errDeleteTimeout)
defer cancel()
}

if to-from < deleteRangeParallelThreshold {
height, missing, err = s.deleteSequential(deleteCtx, from, to)
} else {
height, missing, err = s.deleteParallel(deleteCtx, from, to)
}

return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, updateTail was in the defer of the same function, so even if an error occurred, the partial progress was saved. This is no longer true as the call deleteRangeRaw short-circuits on error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we return from this method the actual values that were successfully deleted to?

}

// setHead sets the head of the store to the specified height.
func (s *Store[H]) setHead(ctx context.Context, write datastore.Write, to uint64) error {
newHead, err := s.getByHeight(ctx, to)
if err != nil {
return fmt.Errorf("getting head: %w", err)
}

// update the contiguous head
s.contiguousHead.Store(&newHead)
if err := writeHeaderHashTo(ctx, write, newHead, headKey); err != nil {
return fmt.Errorf("writing headKey in batch: %w", err)
}

return nil
}
Loading