Skip to content

Commit 956b25b

Browse files
committed
store handle delete interruptions
1 parent 0f6549c commit 956b25b

File tree

2 files changed

+80
-36
lines changed

2 files changed

+80
-36
lines changed

store/store.go

Lines changed: 76 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,12 @@ func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error {
423423
return nil
424424
}
425425

426-
func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (rerr error) {
426+
var deleteTimeoutError = errors.New("delete timeout")
427+
428+
// deleteRange deletes range of headers defined by from and to
429+
// it gracefully handles context and errors
430+
// attempting to save interrupted progress.
431+
func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error) {
427432
s.onDeleteMu.Lock()
428433
onDelete := slices.Clone(s.onDelete)
429434
s.onDeleteMu.Unlock()
@@ -433,62 +438,98 @@ func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (rerr error
433438
return fmt.Errorf("new batch: %w", err)
434439
}
435440

441+
startTime := time.Now()
442+
deleteCtx := ctx
443+
if deadline, ok := ctx.Deadline(); ok {
444+
// allocate 95% of caller's set deadline for deletion
445+
// and give leftover to save progress
446+
// this prevents store's state corruption from partial deletion
447+
sub := deadline.Sub(startTime) / 100 * 95
448+
var cancel context.CancelFunc
449+
deleteCtx, cancel = context.WithDeadlineCause(ctx, startTime.Add(sub), deleteTimeoutError)
450+
defer cancel()
451+
}
452+
453+
log.Debugw("starting delete range", "from_height", from, "to_height", to)
454+
436455
height := from
437456
defer func() {
438-
// make new context to always save progress
439-
ctx := context.Background()
440-
441-
log.Infow("deleted headers", "from_height", from, "to_height", to)
442457
newTailHeight := to
443-
if rerr != nil {
444-
log.Warnw("partial delete with error", "expected_to_height", newTailHeight, "actual_to_height", height, "err", err)
458+
if err != nil {
459+
if errors.Is(err, deleteTimeoutError) {
460+
log.Warnw("partial delete",
461+
"from_height", from,
462+
"expected_to_height", newTailHeight,
463+
"actual_to_height", height,
464+
"took", time.Since(startTime),
465+
)
466+
} else {
467+
log.Errorw("partial delete with error",
468+
"from_height", from,
469+
"expected_to_height", newTailHeight,
470+
"actual_to_height", height,
471+
"took", time.Since(startTime),
472+
"err", err,
473+
)
474+
}
475+
445476
newTailHeight = height
477+
} else if to-from > 1 {
478+
log.Infow("deleted headers", "from_height", from, "to_height", to, "took", time.Since(startTime))
446479
}
447480

448-
err = s.setTail(ctx, batch, newTailHeight)
449-
if err != nil {
450-
rerr = errors.Join(rerr, fmt.Errorf("setting tail to %d: %w", newTailHeight, err))
481+
derr := s.setTail(ctx, batch, newTailHeight)
482+
if derr != nil {
483+
err = errors.Join(err, fmt.Errorf("setting tail to %d: %w", newTailHeight, derr))
451484
}
452485

453-
if err := batch.Commit(ctx); err != nil {
454-
rerr = errors.Join(rerr, fmt.Errorf("committing delete batch [%d:%d): %w", from, newTailHeight, err))
486+
if derr := batch.Commit(ctx); derr != nil {
487+
err = errors.Join(err, fmt.Errorf("committing delete batch [%d:%d): %w", from, newTailHeight, derr))
455488
}
456489
}()
457490

458491
for i := 0; height < to; height++ {
459-
hash, err := s.heightIndex.HashByHeight(ctx, height, false)
460-
if errors.Is(err, datastore.ErrNotFound) {
461-
log.Warnf("attempt to delete header that's not found", "height", height)
462-
continue
463-
}
464-
if err != nil {
465-
return fmt.Errorf("hash by height %d: %w", height, err)
492+
if err := s.delete(deleteCtx, height, batch, onDelete); err != nil {
493+
return fmt.Errorf("delete header %d: %w", height, err)
466494
}
467495

468-
for _, deleteFn := range onDelete {
469-
if err := deleteFn(ctx, height); err != nil {
470-
return fmt.Errorf("on delete handler for %d: %w", height, err)
471-
}
496+
if i != 0 && i%100000 == 0 {
497+
log.Debugf("deleted %d headers", i)
472498
}
473499

474-
if err := batch.Delete(ctx, hashKey(hash)); err != nil {
475-
return fmt.Errorf("delete hash key (%X): %w", hash, err)
476-
}
477-
if err := batch.Delete(ctx, heightKey(height)); err != nil {
478-
return fmt.Errorf("delete height key (%d): %w", height, err)
479-
}
500+
i++
501+
}
480502

481-
s.cache.Remove(hash.String())
482-
s.heightIndex.cache.Remove(height)
483-
s.pending.DeleteRange(height, height+1)
503+
return nil
504+
}
505+
506+
// delete deletes a single header from the store, its caches and indexies, notifying any registered onDelete handlers.
507+
func (s *Store[H]) delete(ctx context.Context, height uint64, batch datastore.Batch, onDelete []func(ctx context.Context, height uint64) error) error {
508+
hash, err := s.heightIndex.HashByHeight(ctx, height, false)
509+
if errors.Is(err, datastore.ErrNotFound) {
510+
log.Warnf("attempt to delete header that's not found", "height", height)
511+
return nil
512+
}
513+
if err != nil {
514+
return fmt.Errorf("hash by height %d: %w", height, err)
515+
}
484516

485-
if i%100000 == 0 {
486-
log.Debug("deleted %d headers", i)
517+
for _, deleteFn := range onDelete {
518+
if err := deleteFn(ctx, height); err != nil {
519+
return fmt.Errorf("on delete handler for %d: %w", height, err)
487520
}
521+
}
488522

489-
i++
523+
if err := batch.Delete(ctx, hashKey(hash)); err != nil {
524+
return fmt.Errorf("delete hash key (%X): %w", hash, err)
525+
}
526+
if err := batch.Delete(ctx, heightKey(height)); err != nil {
527+
return fmt.Errorf("delete height key (%d): %w", height, err)
490528
}
491529

530+
s.cache.Remove(hash.String())
531+
s.heightIndex.cache.Remove(height)
532+
s.pending.DeleteRange(height, height+1)
492533
return nil
493534
}
494535

sync/syncer_tail.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ func (s *Syncer[H]) renewTail(ctx context.Context, oldTail, head H) (newTail H,
9595
err,
9696
)
9797
}
98+
log.Debugw("new tail not found in store", "height", tailHeight)
9899
}
99100

100101
newTail, err = s.getter.GetByHeight(ctx, tailHeight)
@@ -199,7 +200,9 @@ func (s *Syncer[H]) estimateTailHeight(head H) uint64 {
199200
return 1
200201
}
201202

202-
return head.Height() - headersToRetain
203+
estimatedHeight := head.Height() - headersToRetain
204+
log.Debugw("estimated tail height", "height", estimatedHeight)
205+
return estimatedHeight
203206
}
204207

205208
// findTailHeight find the tail height based on the current head and tail.

0 commit comments

Comments
 (0)