-
Notifications
You must be signed in to change notification settings - Fork 26
feat: Add DeleteRange
method to Store
#347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
2c37b35
4c3fc7c
41f7446
cb11b09
a4b1924
642fed7
34f6358
de67900
989e057
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,8 +10,6 @@ import ( | |
"time" | ||
|
||
"github.com/ipfs/go-datastore" | ||
|
||
"github.com/celestiaorg/go-header" | ||
) | ||
|
||
// OnDelete implements [header.Store] interface. | ||
|
@@ -35,56 +33,14 @@ func (s *Store[H]) OnDelete(fn func(context.Context, uint64) error) { | |
} | ||
|
||
// DeleteTo implements [header.Store] interface. | ||
// This is a convenience wrapper around DeleteRange that deletes from tail up to a height. | ||
func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error { | ||
julienrbrt marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
// ensure all the pending headers are synchronized | ||
err := s.Sync(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
head, err := s.Head(ctx) | ||
if err != nil { | ||
return fmt.Errorf("header/store: reading head: %w", err) | ||
} | ||
if head.Height()+1 < to { | ||
_, err := s.getByHeight(ctx, to) | ||
if errors.Is(err, header.ErrNotFound) { | ||
return fmt.Errorf( | ||
"header/store: delete to %d beyond current head(%d)", | ||
to, | ||
head.Height(), | ||
) | ||
} | ||
if err != nil { | ||
return fmt.Errorf("delete to potential new head: %w", err) | ||
} | ||
|
||
// if `to` is bigger than the current head and is stored - allow delete, making `to` a new head | ||
} | ||
|
||
tail, err := s.Tail(ctx) | ||
if err != nil { | ||
return fmt.Errorf("header/store: reading tail: %w", err) | ||
} | ||
if tail.Height() >= to { | ||
return fmt.Errorf("header/store: delete to %d below current tail(%d)", to, tail.Height()) | ||
} | ||
|
||
err = s.deleteRange(ctx, tail.Height(), to) | ||
if errors.Is(err, header.ErrNotFound) && head.Height()+1 == to { | ||
// this is the case where we have deleted all the headers | ||
// wipe the store | ||
if err := s.wipe(ctx); err != nil { | ||
return fmt.Errorf("header/store: wipe: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
if err != nil { | ||
return fmt.Errorf("header/store: delete to height %d: %w", to, err) | ||
} | ||
|
||
return nil | ||
return s.DeleteRange(ctx, tail.Height(), to) | ||
} | ||
|
||
// deleteRangeParallelThreshold defines the threshold for parallel deletion. | ||
|
@@ -94,69 +50,6 @@ var ( | |
errDeleteTimeout = errors.New("delete timeout") | ||
) | ||
|
||
// deleteRange deletes [from:to) header range from the store. | ||
// It gracefully handles context and errors attempting to save interrupted progress. | ||
func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error) { | ||
startTime := time.Now() | ||
|
||
var ( | ||
height uint64 | ||
missing int | ||
) | ||
defer func() { | ||
if err != nil { | ||
if errors.Is(err, errDeleteTimeout) { | ||
log.Warnw("partial delete", | ||
"from_height", from, | ||
"expected_to_height", to, | ||
"actual_to_height", height, | ||
"hdrs_not_found", missing, | ||
"took(s)", time.Since(startTime), | ||
) | ||
} else { | ||
log.Errorw("partial delete with error", | ||
"from_height", from, | ||
"expected_to_height", to, | ||
"actual_to_height", height, | ||
"hdrs_not_found", missing, | ||
"took(s)", time.Since(startTime), | ||
"err", err, | ||
) | ||
} | ||
} else if to-from > 1 { | ||
log.Debugw("deleted headers", | ||
"from_height", from, | ||
"to_height", to, | ||
"hdrs_not_found", missing, | ||
"took(s)", time.Since(startTime).Seconds(), | ||
) | ||
} | ||
|
||
if derr := s.setTail(ctx, s.ds, height); derr != nil { | ||
err = errors.Join(err, fmt.Errorf("setting tail to %d: %w", height, derr)) | ||
} | ||
}() | ||
|
||
deleteCtx := ctx | ||
if deadline, ok := ctx.Deadline(); ok { | ||
// allocate 95% of caller's set deadline for deletion | ||
// and give leftover to save progress | ||
// this prevents store's state corruption from partial deletion | ||
sub := deadline.Sub(startTime) / 100 * 95 | ||
var cancel context.CancelFunc | ||
deleteCtx, cancel = context.WithDeadlineCause(ctx, startTime.Add(sub), errDeleteTimeout) | ||
defer cancel() | ||
} | ||
|
||
if to-from < deleteRangeParallelThreshold { | ||
height, missing, err = s.deleteSequential(deleteCtx, from, to) | ||
} else { | ||
height, missing, err = s.deleteParallel(deleteCtx, from, to) | ||
} | ||
|
||
return err | ||
} | ||
|
||
// deleteSingle deletes a single header from the store, | ||
// its caches and indexies, notifying any registered onDelete handlers. | ||
func (s *Store[H]) deleteSingle( | ||
|
@@ -346,3 +239,190 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, | |
) | ||
return highest, missing, nil | ||
} | ||
|
||
// DeleteRange deletes headers in the range [from:to) from the store. | ||
// It intelligently updates head and/or tail pointers based on what range is being deleted. | ||
func (s *Store[H]) DeleteRange(ctx context.Context, from, to uint64) error { | ||
// ensure all the pending headers are synchronized | ||
err := s.Sync(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
head, err := s.Head(ctx) | ||
if err != nil { | ||
return fmt.Errorf("header/store: reading head: %w", err) | ||
} | ||
|
||
tail, err := s.Tail(ctx) | ||
if err != nil { | ||
return fmt.Errorf("header/store: reading tail: %w", err) | ||
} | ||
|
||
// validate range parameters | ||
if from >= to { | ||
return fmt.Errorf( | ||
"header/store: invalid range [%d:%d) - from must be less than to", | ||
from, | ||
to, | ||
) | ||
} | ||
|
||
if from < tail.Height() { | ||
return fmt.Errorf( | ||
"header/store: delete range from %d below current tail(%d)", | ||
from, | ||
tail.Height(), | ||
) | ||
} | ||
|
||
// Note: Allow deletion beyond head to match original DeleteTo behavior | ||
// Missing headers in the range will be handled gracefully by the deletion logic | ||
|
||
// if range is empty within the current store bounds, it's a no-op | ||
if from > head.Height() || to <= tail.Height() { | ||
return nil | ||
julienrbrt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// Validate that deletion won't create gaps in the store | ||
// Only allow deletions that: | ||
// 1. Start from tail (advancing tail forward) | ||
// 2. End at head+1 (moving head backward) | ||
// 3. Delete the entire store | ||
if from > tail.Height() && to <= head.Height() { | ||
return fmt.Errorf( | ||
"header/store: deletion range [%d:%d) would create gaps in the store. "+ | ||
"Only deletion from tail (%d) or to head+1 (%d) is supported", | ||
from, to, tail.Height(), head.Height()+1, | ||
) | ||
} | ||
|
||
// Check if we're deleting all existing headers (making store empty) | ||
// Only wipe if 'to' is exactly at head+1 (normal case) to avoid accidental wipes | ||
if from <= tail.Height() && to == head.Height()+1 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we allow a case where caller specifies a |
||
// Check if any headers exist at or beyond 'to' | ||
hasHeadersAtOrBeyond := false | ||
for checkHeight := to; checkHeight <= to+10; checkHeight++ { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how did we choose 10 as the number to check for if headers exist beyond head.Height() ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This assumes the maximum gap that can exist between store head and syncer heads that it begins inserting is 10, correct? |
||
if _, err := s.getByHeight(ctx, checkHeight); err == nil { | ||
hasHeadersAtOrBeyond = true | ||
break | ||
} | ||
} | ||
|
||
if !hasHeadersAtOrBeyond { | ||
// wipe the entire store | ||
if err := s.wipe(ctx); err != nil { | ||
return fmt.Errorf("header/store: wipe: %w", err) | ||
} | ||
return nil | ||
} | ||
} | ||
|
||
// Determine which pointers need updating | ||
updateTail := from <= tail.Height() | ||
updateHead := to > head.Height() | ||
|
||
// Delete the headers without automatic tail updates | ||
err = s.deleteRangeRaw(ctx, from, to) | ||
if err != nil { | ||
return fmt.Errorf("header/store: delete range [%d:%d): %w", from, to, err) | ||
} | ||
|
||
// Update tail if we deleted from the beginning | ||
if updateTail { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if we wiped store (where to == head.Height+1) ? do we still need to |
||
_, err = s.getByHeight(ctx, to) | ||
if err != nil { | ||
return fmt.Errorf("header/store: new tail height %d not found: %w", to, err) | ||
} | ||
julienrbrt marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
err = s.setTail(ctx, s.ds, to) | ||
if err != nil { | ||
return fmt.Errorf("header/store: setting tail to %d: %w", to, err) | ||
} | ||
} | ||
|
||
// Update head if we deleted from the end | ||
if updateHead && from > tail.Height() { | ||
newHeadHeight := from - 1 | ||
if newHeadHeight >= tail.Height() { | ||
err = s.setHead(ctx, s.ds, newHeadHeight) | ||
if err != nil { | ||
return fmt.Errorf("header/store: setting head to %d: %w", newHeadHeight, err) | ||
} | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// deleteRangeRaw deletes [from:to) header range without updating head or tail pointers. | ||
func (s *Store[H]) deleteRangeRaw(ctx context.Context, from, to uint64) (err error) { | ||
startTime := time.Now() | ||
|
||
var ( | ||
height uint64 | ||
missing int | ||
) | ||
defer func() { | ||
if err != nil { | ||
if errors.Is(err, errDeleteTimeout) { | ||
log.Warnw("partial delete range", | ||
"from_height", from, | ||
"expected_to_height", to, | ||
"actual_to_height", height, | ||
"hdrs_not_found", missing, | ||
"took(s)", time.Since(startTime), | ||
julienrbrt marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
) | ||
} else { | ||
log.Errorw("partial delete range with error", | ||
"from_height", from, | ||
"expected_to_height", to, | ||
"actual_to_height", height, | ||
"hdrs_not_found", missing, | ||
"took(s)", time.Since(startTime), | ||
julienrbrt marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
"err", err, | ||
) | ||
} | ||
} else if to-from > 1 { | ||
log.Debugw("deleted range", | ||
"from_height", from, | ||
"to_height", to, | ||
"hdrs_not_found", missing, | ||
"took(s)", time.Since(startTime).Seconds(), | ||
) | ||
} | ||
}() | ||
|
||
deleteCtx := ctx | ||
if deadline, ok := ctx.Deadline(); ok { | ||
// allocate 95% of caller's set deadline for deletion | ||
// and give leftover to save progress | ||
sub := deadline.Sub(startTime) / 100 * 95 | ||
var cancel context.CancelFunc | ||
deleteCtx, cancel = context.WithDeadlineCause(ctx, startTime.Add(sub), errDeleteTimeout) | ||
defer cancel() | ||
} | ||
|
||
if to-from < deleteRangeParallelThreshold { | ||
height, missing, err = s.deleteSequential(deleteCtx, from, to) | ||
} else { | ||
height, missing, err = s.deleteParallel(deleteCtx, from, to) | ||
} | ||
|
||
return err | ||
Comment on lines
+382
to
+398
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we return from this method the actual values that were successfully deleted to? |
||
} | ||
|
||
// setHead sets the head of the store to the specified height. | ||
func (s *Store[H]) setHead(ctx context.Context, write datastore.Write, to uint64) error { | ||
newHead, err := s.getByHeight(ctx, to) | ||
if err != nil { | ||
return fmt.Errorf("getting head: %w", err) | ||
} | ||
|
||
// update the contiguous head | ||
s.contiguousHead.Store(&newHead) | ||
if err := writeHeaderHashTo(ctx, write, newHead, headKey); err != nil { | ||
return fmt.Errorf("writing headKey in batch: %w", err) | ||
} | ||
|
||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's just do quick sanity even in testing pkg to make sure from < to