Skip to content

Commit b018a27

Browse files
committed
parallelize
1 parent 77e9571 commit b018a27

File tree

2 files changed

+277
-189
lines changed

2 files changed

+277
-189
lines changed

store/store.go

Lines changed: 2 additions & 189 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"slices"
87
"sync"
98
"sync/atomic"
109
"time"
@@ -15,7 +14,6 @@ import (
1514
logging "github.com/ipfs/go-log/v2"
1615

1716
"github.com/celestiaorg/go-header"
18-
badger4 "github.com/ipfs/go-ds-badger4"
1917
)
2018

2119
var log = logging.Logger("header/store")
@@ -358,193 +356,8 @@ func (s *Store[H]) HasAt(ctx context.Context, height uint64) bool {
358356
return head.Height() >= height && height >= tail.Height()
359357
}
360358

361-
func (s *Store[H]) OnDelete(fn func(context.Context, uint64) error) {
362-
s.onDeleteMu.Lock()
363-
defer s.onDeleteMu.Unlock()
364-
365-
s.onDelete = append(s.onDelete, func(ctx context.Context, height uint64) (rerr error) {
366-
defer func() {
367-
err := recover()
368-
if err != nil {
369-
rerr = fmt.Errorf("header/store: user provided onDelete panicked on %d with: %s", height, err)
370-
}
371-
}()
372-
return fn(ctx, height)
373-
})
374-
}
375-
376-
// DeleteTo implements [header.Store] interface.
377-
func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error {
378-
// ensure all the pending headers are synchronized
379-
err := s.Sync(ctx)
380-
if err != nil {
381-
return err
382-
}
383-
384-
head, err := s.Head(ctx)
385-
if err != nil {
386-
return fmt.Errorf("header/store: reading head: %w", err)
387-
}
388-
if head.Height()+1 < to {
389-
_, err := s.getByHeight(ctx, to)
390-
if errors.Is(err, header.ErrNotFound) {
391-
return fmt.Errorf(
392-
"header/store: delete to %d beyond current head(%d)",
393-
to,
394-
head.Height(),
395-
)
396-
}
397-
if err != nil {
398-
return fmt.Errorf("delete to potential new head: %w", err)
399-
}
400-
401-
// if `to` is bigger than the current head and is stored - allow delete, making `to` a new head
402-
}
403-
404-
tail, err := s.Tail(ctx)
405-
if err != nil {
406-
return fmt.Errorf("header/store: reading tail: %w", err)
407-
}
408-
if tail.Height() >= to {
409-
return fmt.Errorf("header/store: delete to %d below current tail(%d)", to, tail.Height())
410-
}
411-
412-
if err := s.deleteRange(ctx, tail.Height(), to); err != nil {
413-
return fmt.Errorf("header/store: delete to height %d: %w", to, err)
414-
}
415-
416-
if head.Height()+1 == to {
417-
// this is the case where we have deleted all the headers
418-
// wipe the store
419-
if err := s.wipe(ctx); err != nil {
420-
return fmt.Errorf("header/store: wipe: %w", err)
421-
}
422-
}
423-
424-
return nil
425-
}
426-
427-
var deleteTimeoutError = errors.New("delete timeout")
428-
429-
// deleteRange deletes range of headers defined by from and to
430-
// it gracefully handles context and errors
431-
// attempting to save interrupted progress.
432-
func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error) {
433-
s.onDeleteMu.Lock()
434-
onDelete := slices.Clone(s.onDelete)
435-
s.onDeleteMu.Unlock()
436-
437-
batch, err := s.ds.Batch(ctx)
438-
if err != nil {
439-
return fmt.Errorf("new batch: %w", err)
440-
}
441-
ctx = badger4.WithBatch(ctx, batch)
442-
443-
startTime := time.Now()
444-
deleteCtx := ctx
445-
if deadline, ok := ctx.Deadline(); ok {
446-
// allocate 95% of caller's set deadline for deletion
447-
// and give leftover to save progress
448-
// this prevents store's state corruption from partial deletion
449-
sub := deadline.Sub(startTime) / 100 * 95
450-
var cancel context.CancelFunc
451-
deleteCtx, cancel = context.WithDeadlineCause(ctx, startTime.Add(sub), deleteTimeoutError)
452-
defer cancel()
453-
}
454-
455-
log.Debugw("starting delete range", "from_height", from, "to_height", to)
456-
457-
height := from
458-
defer func() {
459-
newTailHeight := to
460-
if err != nil {
461-
if errors.Is(err, deleteTimeoutError) {
462-
log.Warnw("partial delete",
463-
"from_height", from,
464-
"expected_to_height", newTailHeight,
465-
"actual_to_height", height,
466-
"took", time.Since(startTime),
467-
)
468-
} else {
469-
log.Errorw("partial delete with error",
470-
"from_height", from,
471-
"expected_to_height", newTailHeight,
472-
"actual_to_height", height,
473-
"took", time.Since(startTime),
474-
"err", err,
475-
)
476-
}
477-
478-
newTailHeight = height
479-
} else if to-from > 1 {
480-
log.Infow("deleted headers", "from_height", from, "to_height", to, "took", time.Since(startTime))
481-
}
482-
483-
derr := s.setTail(ctx, batch, newTailHeight)
484-
if derr != nil {
485-
err = errors.Join(err, fmt.Errorf("setting tail to %d: %w", newTailHeight, derr))
486-
}
487-
488-
if derr := batch.Commit(ctx); derr != nil {
489-
err = errors.Join(err, fmt.Errorf("committing delete batch [%d:%d): %w", from, newTailHeight, derr))
490-
}
491-
}()
492-
493-
for i := 0; height < to; height++ {
494-
if err := s.delete(deleteCtx, height, batch, onDelete); err != nil {
495-
return fmt.Errorf("delete header %d: %w", height, err)
496-
}
497-
498-
if i != 0 && i%100000 == 0 {
499-
log.Debugf("deleted %d headers", i)
500-
}
501-
502-
i++
503-
}
504-
505-
return nil
506-
}
507-
508-
// delete deletes a single header from the store, its caches and indexies, notifying any registered onDelete handlers.
509-
func (s *Store[H]) delete(ctx context.Context, height uint64, batch datastore.Batch, onDelete []func(ctx context.Context, height uint64) error) error {
510-
// some of the methods may not handle context cancellation properly
511-
if ctx.Err() != nil {
512-
return context.Cause(ctx)
513-
}
514-
515-
hash, err := s.heightIndex.HashByHeight(ctx, height, false)
516-
if errors.Is(err, datastore.ErrNotFound) {
517-
log.Warnf("attempt to delete header that's not found", "height", height)
518-
return nil
519-
}
520-
if err != nil {
521-
return fmt.Errorf("hash by height %d: %w", height, err)
522-
}
523-
524-
for _, deleteFn := range onDelete {
525-
if err := deleteFn(ctx, height); err != nil {
526-
return fmt.Errorf("on delete handler for %d: %w", height, err)
527-
}
528-
}
529-
530-
if err := batch.Delete(ctx, hashKey(hash)); err != nil {
531-
return fmt.Errorf("delete hash key (%X): %w", hash, err)
532-
}
533-
if err := batch.Delete(ctx, heightKey(height)); err != nil {
534-
return fmt.Errorf("delete height key (%d): %w", height, err)
535-
}
536-
537-
s.cache.Remove(hash.String())
538-
s.heightIndex.cache.Remove(height)
539-
s.pending.DeleteRange(height, height+1)
540-
return nil
541-
}
542-
543-
func (s *Store[H]) setTail(ctx context.Context, batch datastore.Batch, to uint64) error {
359+
func (s *Store[H]) setTail(ctx context.Context, batch datastore.Write, to uint64) error {
544360
newTail, err := s.getByHeight(ctx, to)
545-
if errors.Is(err, header.ErrNotFound) {
546-
return nil
547-
}
548361
if err != nil {
549362
return fmt.Errorf("getting tail: %w", err)
550363
}
@@ -872,7 +685,7 @@ func (s *Store[H]) deinit() {
872685

873686
func writeHeaderHashTo[H header.Header[H]](
874687
ctx context.Context,
875-
batch datastore.Batch,
688+
batch datastore.Write,
876689
h H,
877690
key datastore.Key,
878691
) error {

0 commit comments

Comments
 (0)