-
Notifications
You must be signed in to change notification settings - Fork 26
feat: Add DeleteRange
method to Store
#347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2c37b35
4c3fc7c
41f7446
cb11b09
a4b1924
642fed7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -79,8 +79,9 @@ func (m *Store[H]) GetByHeight(_ context.Context, height uint64) (H, error) { | |
return zero, header.ErrNotFound | ||
} | ||
|
||
func (m *Store[H]) DeleteTo(ctx context.Context, to uint64) error { | ||
for h := m.TailHeight; h < to; h++ { | ||
func (m *Store[H]) DeleteRange(ctx context.Context, from, to uint64) error { | ||
// Delete headers in the range [from:to) | ||
for h := from; h < to; h++ { | ||
_, ok := m.Headers[h] | ||
if !ok { | ||
continue | ||
|
@@ -95,7 +96,16 @@ func (m *Store[H]) DeleteTo(ctx context.Context, to uint64) error { | |
delete(m.Headers, h) // must be after deleteFn | ||
} | ||
|
||
m.TailHeight = to | ||
// Update TailHeight if we deleted from the beginning | ||
if from <= m.TailHeight { | ||
m.TailHeight = to | ||
} | ||
julienrbrt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Update HeadHeight if we deleted from the end | ||
if to >= m.HeadHeight { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fail out here if to > m.HeadHeight (we should catch this err case in tests) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or actually do we have to allow this condition as we allow deletion --> beyond store head (to sync head) ? |
||
m.HeadHeight = from - 1 | ||
} | ||
|
||
return nil | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -85,8 +85,8 @@ type Store[H Header[H]] interface { | |||||
// GetRange returns the range [from:to). | ||||||
GetRange(context.Context, uint64, uint64) ([]H, error) | ||||||
|
||||||
// DeleteTo deletes the range [Tail():to). | ||||||
DeleteTo(ctx context.Context, to uint64) error | ||||||
// DeleteRange deletes the range [from:to). | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
DeleteRange(ctx context.Context, from, to uint64) error | ||||||
|
||||||
// OnDelete registers given handler to be called whenever a header with the height is being removed. | ||||||
// OnDelete guarantees that the header is accessible for the handler with GetByHeight and is removed | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -11,8 +11,6 @@ import ( | |||||
"time" | ||||||
|
||||||
"github.com/ipfs/go-datastore" | ||||||
|
||||||
"github.com/celestiaorg/go-header" | ||||||
) | ||||||
|
||||||
// OnDelete implements [header.Store] interface. | ||||||
|
@@ -36,129 +34,13 @@ func (s *Store[H]) OnDelete(fn func(context.Context, uint64) error) { | |||||
}) | ||||||
} | ||||||
|
||||||
// DeleteTo implements [header.Store] interface. | ||||||
func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error { | ||||||
// ensure all the pending headers are synchronized | ||||||
err := s.Sync(ctx) | ||||||
if err != nil { | ||||||
return err | ||||||
} | ||||||
|
||||||
head, err := s.Head(ctx) | ||||||
if err != nil { | ||||||
return fmt.Errorf("header/store: reading head: %w", err) | ||||||
} | ||||||
if head.Height()+1 < to { | ||||||
_, err := s.getByHeight(ctx, to) | ||||||
if errors.Is(err, header.ErrNotFound) { | ||||||
return fmt.Errorf( | ||||||
"header/store: delete to %d beyond current head(%d)", | ||||||
to, | ||||||
head.Height(), | ||||||
) | ||||||
} | ||||||
if err != nil { | ||||||
return fmt.Errorf("delete to potential new head: %w", err) | ||||||
} | ||||||
|
||||||
// if `to` is bigger than the current head and is stored - allow delete, making `to` a new head | ||||||
} | ||||||
|
||||||
tail, err := s.Tail(ctx) | ||||||
if err != nil { | ||||||
return fmt.Errorf("header/store: reading tail: %w", err) | ||||||
} | ||||||
if tail.Height() >= to { | ||||||
return fmt.Errorf("header/store: delete to %d below current tail(%d)", to, tail.Height()) | ||||||
} | ||||||
|
||||||
err = s.deleteRange(ctx, tail.Height(), to) | ||||||
if errors.Is(err, header.ErrNotFound) && head.Height()+1 == to { | ||||||
// this is the case where we have deleted all the headers | ||||||
// wipe the store | ||||||
if err := s.wipe(ctx); err != nil { | ||||||
return fmt.Errorf("header/store: wipe: %w", err) | ||||||
} | ||||||
|
||||||
return nil | ||||||
} | ||||||
if err != nil { | ||||||
return fmt.Errorf("header/store: delete to height %d: %w", to, err) | ||||||
} | ||||||
|
||||||
return nil | ||||||
} | ||||||
|
||||||
// deleteRangeParallelThreshold defines the threshold for parallel deletion. | ||||||
// If range is smaller than this threshold, deletion will be performed sequentially. | ||||||
var ( | ||||||
deleteRangeParallelThreshold uint64 = 10000 | ||||||
errDeleteTimeout = errors.New("delete timeout") | ||||||
) | ||||||
|
||||||
// deleteRange deletes [from:to) header range from the store. | ||||||
// It gracefully handles context and errors attempting to save interrupted progress. | ||||||
func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error) { | ||||||
startTime := time.Now() | ||||||
|
||||||
var ( | ||||||
height uint64 | ||||||
missing int | ||||||
) | ||||||
defer func() { | ||||||
if err != nil { | ||||||
if errors.Is(err, errDeleteTimeout) { | ||||||
log.Warnw("partial delete", | ||||||
"from_height", from, | ||||||
"expected_to_height", to, | ||||||
"actual_to_height", height, | ||||||
"hdrs_not_found", missing, | ||||||
"took(s)", time.Since(startTime), | ||||||
) | ||||||
} else { | ||||||
log.Errorw("partial delete with error", | ||||||
"from_height", from, | ||||||
"expected_to_height", to, | ||||||
"actual_to_height", height, | ||||||
"hdrs_not_found", missing, | ||||||
"took(s)", time.Since(startTime), | ||||||
"err", err, | ||||||
) | ||||||
} | ||||||
} else if to-from > 1 { | ||||||
log.Debugw("deleted headers", | ||||||
"from_height", from, | ||||||
"to_height", to, | ||||||
"hdrs_not_found", missing, | ||||||
"took(s)", time.Since(startTime).Seconds(), | ||||||
) | ||||||
} | ||||||
|
||||||
if derr := s.setTail(ctx, s.ds, height); derr != nil { | ||||||
err = errors.Join(err, fmt.Errorf("setting tail to %d: %w", height, derr)) | ||||||
} | ||||||
}() | ||||||
|
||||||
deleteCtx := ctx | ||||||
if deadline, ok := ctx.Deadline(); ok { | ||||||
// allocate 95% of caller's set deadline for deletion | ||||||
// and give leftover to save progress | ||||||
// this prevents store's state corruption from partial deletion | ||||||
sub := deadline.Sub(startTime) / 100 * 95 | ||||||
var cancel context.CancelFunc | ||||||
deleteCtx, cancel = context.WithDeadlineCause(ctx, startTime.Add(sub), errDeleteTimeout) | ||||||
defer cancel() | ||||||
} | ||||||
|
||||||
if to-from < deleteRangeParallelThreshold { | ||||||
height, missing, err = s.deleteSequential(deleteCtx, from, to) | ||||||
} else { | ||||||
height, missing, err = s.deleteParallel(deleteCtx, from, to) | ||||||
} | ||||||
|
||||||
return err | ||||||
} | ||||||
|
||||||
// deleteSingle deletes a single header from the store, | ||||||
// its caches and indexies, notifying any registered onDelete handlers. | ||||||
func (s *Store[H]) deleteSingle( | ||||||
|
@@ -348,3 +230,186 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, | |||||
) | ||||||
return highest, missing, nil | ||||||
} | ||||||
|
||||||
// DeleteRange deletes headers in the range [from:to) from the store. | ||||||
// It intelligently updates head and/or tail pointers based on what range is being deleted. | ||||||
func (s *Store[H]) DeleteRange(ctx context.Context, from, to uint64) error { | ||||||
// ensure all the pending headers are synchronized | ||||||
err := s.Sync(ctx) | ||||||
if err != nil { | ||||||
return err | ||||||
} | ||||||
|
||||||
head, err := s.Head(ctx) | ||||||
if err != nil { | ||||||
return fmt.Errorf("header/store: reading head: %w", err) | ||||||
} | ||||||
|
||||||
tail, err := s.Tail(ctx) | ||||||
if err != nil { | ||||||
return fmt.Errorf("header/store: reading tail: %w", err) | ||||||
} | ||||||
|
||||||
// validate range parameters | ||||||
if from >= to { | ||||||
return fmt.Errorf( | ||||||
"header/store: invalid range [%d:%d) - from must be less than to", | ||||||
from, | ||||||
to, | ||||||
) | ||||||
} | ||||||
|
||||||
if from < tail.Height() { | ||||||
return fmt.Errorf( | ||||||
"header/store: delete range from %d below current tail(%d)", | ||||||
from, | ||||||
tail.Height(), | ||||||
) | ||||||
} | ||||||
|
||||||
// Note: Allow deletion beyond head to match original DeleteTo behavior | ||||||
// Missing headers in the range will be handled gracefully by the deletion logic | ||||||
|
||||||
// if range is empty within the current store bounds, it's a no-op | ||||||
if from > head.Height() || to <= tail.Height() { | ||||||
return nil | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we warn log here as passing a no-op range feels like fishy behaviour from the caller. |
||||||
} | ||||||
|
||||||
// Validate that deletion won't create gaps in the store | ||||||
// Only allow deletions that: | ||||||
// 1. Start from tail (advancing tail forward) | ||||||
// 2. End at head+1 (moving head backward) | ||||||
// 3. Delete the entire store | ||||||
if from > tail.Height() && to <= head.Height() { | ||||||
return fmt.Errorf( | ||||||
"header/store: deletion range [%d:%d) would create gaps in the store. "+ | ||||||
"Only deletion from tail (%d) or to head+1 (%d) is supported", | ||||||
from, to, tail.Height(), head.Height()+1, | ||||||
) | ||||||
} | ||||||
|
||||||
// Check if we're deleting all existing headers (making store empty) | ||||||
// Only wipe if 'to' is exactly at head+1 (normal case) to avoid accidental wipes | ||||||
if from <= tail.Height() && to == head.Height()+1 { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we allow a case where caller specifies a |
||||||
// Check if any headers exist at or beyond 'to' | ||||||
hasHeadersAtOrBeyond := false | ||||||
for checkHeight := to; checkHeight <= to+10; checkHeight++ { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how did we choose 10 as the number to check for if headers exist beyond head.Height() ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This assumes the maximum gap that can exist between store head and syncer heads that it begins inserting is 10, correct? |
||||||
if _, err := s.getByHeight(ctx, checkHeight); err == nil { | ||||||
hasHeadersAtOrBeyond = true | ||||||
break | ||||||
} | ||||||
} | ||||||
|
||||||
if !hasHeadersAtOrBeyond { | ||||||
// wipe the entire store | ||||||
if err := s.wipe(ctx); err != nil { | ||||||
return fmt.Errorf("header/store: wipe: %w", err) | ||||||
} | ||||||
return nil | ||||||
} | ||||||
} | ||||||
|
||||||
// Determine which pointers need updating | ||||||
updateTail := from <= tail.Height() | ||||||
updateHead := to > head.Height() | ||||||
|
||||||
// Delete the headers without automatic tail updates | ||||||
err = s.deleteRangeRaw(ctx, from, to) | ||||||
if err != nil { | ||||||
return fmt.Errorf("header/store: delete range [%d:%d): %w", from, to, err) | ||||||
} | ||||||
|
||||||
// Update tail if we deleted from the beginning | ||||||
if updateTail { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if we wiped store (where to == head.Height+1) ? do we still need to |
||||||
err = s.setTail(ctx, s.ds, to) | ||||||
if err != nil { | ||||||
return fmt.Errorf("header/store: setting tail to %d: %w", to, err) | ||||||
} | ||||||
} | ||||||
|
||||||
// Update head if we deleted from the end | ||||||
if updateHead && from > tail.Height() { | ||||||
newHeadHeight := from - 1 | ||||||
if newHeadHeight >= tail.Height() { | ||||||
err = s.setHead(ctx, s.ds, newHeadHeight) | ||||||
if err != nil { | ||||||
return fmt.Errorf("header/store: setting head to %d: %w", newHeadHeight, err) | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
return nil | ||||||
} | ||||||
|
||||||
// deleteRangeRaw deletes [from:to) header range without updating head or tail pointers. | ||||||
func (s *Store[H]) deleteRangeRaw(ctx context.Context, from, to uint64) (err error) { | ||||||
startTime := time.Now() | ||||||
|
||||||
var ( | ||||||
height uint64 | ||||||
missing int | ||||||
) | ||||||
defer func() { | ||||||
if err != nil { | ||||||
if errors.Is(err, errDeleteTimeout) { | ||||||
log.Warnw("partial delete range", | ||||||
"from_height", from, | ||||||
"expected_to_height", to, | ||||||
"actual_to_height", height, | ||||||
"hdrs_not_found", missing, | ||||||
"took(s)", time.Since(startTime), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
) | ||||||
} else { | ||||||
log.Errorw("partial delete range with error", | ||||||
"from_height", from, | ||||||
"expected_to_height", to, | ||||||
"actual_to_height", height, | ||||||
"hdrs_not_found", missing, | ||||||
"took(s)", time.Since(startTime), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
"err", err, | ||||||
) | ||||||
} | ||||||
} else if to-from > 1 { | ||||||
log.Debugw("deleted range", | ||||||
"from_height", from, | ||||||
"to_height", to, | ||||||
"hdrs_not_found", missing, | ||||||
"took(s)", time.Since(startTime).Seconds(), | ||||||
) | ||||||
} | ||||||
}() | ||||||
|
||||||
deleteCtx := ctx | ||||||
if deadline, ok := ctx.Deadline(); ok { | ||||||
// allocate 95% of caller's set deadline for deletion | ||||||
// and give leftover to save progress | ||||||
sub := deadline.Sub(startTime) / 100 * 95 | ||||||
var cancel context.CancelFunc | ||||||
deleteCtx, cancel = context.WithDeadlineCause(ctx, startTime.Add(sub), errDeleteTimeout) | ||||||
defer cancel() | ||||||
} | ||||||
|
||||||
if to-from < deleteRangeParallelThreshold { | ||||||
height, missing, err = s.deleteSequential(deleteCtx, from, to) | ||||||
} else { | ||||||
height, missing, err = s.deleteParallel(deleteCtx, from, to) | ||||||
} | ||||||
|
||||||
return err | ||||||
Comment on lines
+382
to
+398
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we return from this method the actual values that were successfully deleted to? |
||||||
} | ||||||
|
||||||
// setHead sets the head of the store to the specified height. | ||||||
func (s *Store[H]) setHead(ctx context.Context, write datastore.Write, to uint64) error { | ||||||
newHead, err := s.getByHeight(ctx, to) | ||||||
if err != nil { | ||||||
return fmt.Errorf("getting head: %w", err) | ||||||
} | ||||||
|
||||||
// update the contiguous head | ||||||
s.contiguousHead.Store(&newHead) | ||||||
if err := writeHeaderHashTo(ctx, write, newHead, headKey); err != nil { | ||||||
return fmt.Errorf("writing headKey in batch: %w", err) | ||||||
} | ||||||
|
||||||
return nil | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's just do quick sanity even in testing pkg to make sure from < to