diff --git a/go.mod b/go.mod index 84dbb44e..28411a52 100644 --- a/go.mod +++ b/go.mod @@ -115,4 +115,4 @@ require ( lukechampine.com/blake3 v1.4.1 // indirect ) -replace github.com/ipfs/go-datastore => github.com/celestiaorg/go-datastore v0.0.0-20250725152401-1597fcd4fa90 +replace github.com/ipfs/go-datastore => github.com/celestiaorg/go-datastore v0.0.0-20250801131506-48a63ae531e4 diff --git a/go.sum b/go.sum index 535d240a..d8eed713 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= -github.com/celestiaorg/go-datastore v0.0.0-20250725152401-1597fcd4fa90 h1:zZORGwTRC4EyUdb1UQXcdzeCHAOdn0jUtY5KaeCCNf8= -github.com/celestiaorg/go-datastore v0.0.0-20250725152401-1597fcd4fa90/go.mod h1:W+pI1NsUsz3tcsAACMtfC+IZdnQTnC/7VfPoJBQuts0= +github.com/celestiaorg/go-datastore v0.0.0-20250801131506-48a63ae531e4 h1:udw77BU45zmvTV7798FhR1wHFmsFpu4GnA5mubtMcR0= +github.com/celestiaorg/go-datastore v0.0.0-20250801131506-48a63ae531e4/go.mod h1:W+pI1NsUsz3tcsAACMtfC+IZdnQTnC/7VfPoJBQuts0= github.com/celestiaorg/go-libp2p-messenger v0.2.2 h1:osoUfqjss7vWTIZrrDSy953RjQz+ps/vBFE7bychLEc= github.com/celestiaorg/go-libp2p-messenger v0.2.2/go.mod h1:oTCRV5TfdO7V/k6nkx7QjQzGrWuJbupv+0o1cgnY2i4= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= diff --git a/store/store.go b/store/store.go index ce8e0d88..92a6e820 100644 --- a/store/store.go +++ b/store/store.go @@ -10,6 +10,8 @@ import ( lru "github.com/hashicorp/golang-lru/v2" "github.com/ipfs/go-datastore" + contextds "github.com/ipfs/go-datastore/context" + "github.com/ipfs/go-datastore/keytransform" "github.com/ipfs/go-datastore/namespace" logging "github.com/ipfs/go-log/v2" "go.uber.org/zap/zapcore" @@ -31,7 +33,7 @@ type Store[H header.Header[H]] struct { // header storing // // underlying KV store - ds datastore.Batching + ds *keytransform.Datastore // adaptive replacement cache of headers cache *lru.TwoQueueCache[string, H] // metrics collection instance @@ -275,6 +277,9 @@ func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) { return h, nil } + ctx, done := s.withReadTransaction(ctx) + defer done() + hash, err := s.heightIndex.HashByHeight(ctx, height, true) if err != nil { var zero H @@ -308,6 +313,10 @@ func (s *Store[H]) getRangeByHeight(ctx context.Context, from, to uint64) ([]H, if from >= to { return nil, fmt.Errorf("header/store: invalid range(%d,%d)", from, to) } + + ctx, done := s.withReadTransaction(ctx) + defer done() + h, err := s.GetByHeight(ctx, to-1) if err != nil { return nil, err @@ -627,6 +636,9 @@ func (s *Store[H]) nextHead(ctx context.Context) (head H, changed bool) { return head, false } + ctx, done := s.withReadTransaction(ctx) + defer done() + for ctx.Err() == nil { h, err := s.getByHeight(ctx, head.Height()+1) if err != nil { @@ -664,6 +676,9 @@ func (s *Store[H]) nextTail(ctx context.Context) (tail H, changed bool) { return tail, false } + ctx, done := s.withReadTransaction(ctx) + defer done() + for ctx.Err() == nil { h, err := s.getByHeight(ctx, tail.Height()-1) if err != nil { @@ -746,6 +761,54 @@ func (s *Store[H]) deinit() { s.heightSub.SetHeight(0) } +// withWriteBatch attaches a new batch to the given context and returns cleanup func. +// batch is used to couple multiple related writes to substantially optimize performance. +func (s *Store[H]) withWriteBatch(ctx context.Context) (context.Context, func() error) { + bds, ok := s.ds.Children()[0].(datastore.Batching) + if !ok { + return ctx, func() error { return nil } + } + + if _, ok = contextds.GetWrite(ctx); ok { + // there is a batch already provided by parent + return ctx, func() error { return nil } + } + + batch, err := bds.Batch(ctx) + if err != nil { + log.Errorw("new batch", "err", err) + return ctx, func() error { return nil } + } + + return contextds.WithWrite(ctx, batch), func() error { + return batch.Commit(ctx) + } +} + +// withReadTransaction attaches a new transaction to the given context and returns cleanup func. +// transaction is used to couple multiple related reads to substantially optimize performance. +func (s *Store[H]) withReadTransaction(ctx context.Context) (context.Context, func()) { + tds, ok := s.ds.Children()[0].(datastore.TxnFeature) + if !ok { + return ctx, func() {} + } + + if _, ok = contextds.GetRead(ctx); ok { + // there is a transaction already + return ctx, func() {} + } + + txn, err := tds.NewTransaction(ctx, true) + if err != nil { + log.Errorw("new transaction", "err", err) + return ctx, func() {} + } + + return contextds.WithRead(ctx, txn), func() { + txn.Discard(ctx) + } +} + func writeHeaderHashTo[H header.Header[H]]( ctx context.Context, write datastore.Write, diff --git a/store/store_delete.go b/store/store_delete.go index 0932966e..4bea211e 100644 --- a/store/store_delete.go +++ b/store/store_delete.go @@ -10,7 +10,6 @@ import ( "time" "github.com/ipfs/go-datastore" - contextds "github.com/ipfs/go-datastore/context" "github.com/celestiaorg/go-header" ) @@ -100,7 +99,10 @@ var ( func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error) { startTime := time.Now() - var height uint64 + var ( + height uint64 + missing int + ) defer func() { if err != nil { if errors.Is(err, errDeleteTimeout) { @@ -108,6 +110,7 @@ func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error) "from_height", from, "expected_to_height", to, "actual_to_height", height, + "hdrs_not_found", missing, "took(s)", time.Since(startTime), ) } else { @@ -115,12 +118,18 @@ func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error) "from_height", from, "expected_to_height", to, "actual_to_height", height, + "hdrs_not_found", missing, "took(s)", time.Since(startTime), "err", err, ) } } else if to-from > 1 { - log.Debugw("deleted headers", "from_height", from, "to_height", to, "took(s)", time.Since(startTime).Seconds()) + log.Debugw("deleted headers", + "from_height", from, + "to_height", to, + "hdrs_not_found", missing, + "took(s)", time.Since(startTime).Seconds(), + ) } if derr := s.setTail(ctx, s.ds, height); derr != nil { @@ -140,9 +149,9 @@ func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error) } if to-from < deleteRangeParallelThreshold { - height, err = s.deleteSequential(deleteCtx, from, to) + height, missing, err = s.deleteSequential(deleteCtx, from, to) } else { - height, err = s.deleteParallel(deleteCtx, from, to) + height, missing, err = s.deleteParallel(deleteCtx, from, to) } return err @@ -153,7 +162,6 @@ func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error) func (s *Store[H]) deleteSingle( ctx context.Context, height uint64, - batch datastore.Batch, onDelete []func(ctx context.Context, height uint64) error, ) error { // some of the methods may not handle context cancellation properly @@ -162,10 +170,6 @@ func (s *Store[H]) deleteSingle( } hash, err := s.heightIndex.HashByHeight(ctx, height, false) - if errors.Is(err, datastore.ErrNotFound) { - log.Debugw("attempt to delete header that's not found", "height", height) - return nil - } if err != nil { return fmt.Errorf("hash by height %d: %w", height, err) } @@ -176,10 +180,10 @@ func (s *Store[H]) deleteSingle( } } - if err := batch.Delete(ctx, hashKey(hash)); err != nil { + if err := s.ds.Delete(ctx, hashKey(hash)); err != nil { return fmt.Errorf("delete hash key (%X): %w", hash, err) } - if err := batch.Delete(ctx, heightKey(height)); err != nil { + if err := s.ds.Delete(ctx, heightKey(height)); err != nil { return fmt.Errorf("delete height key (%d): %w", height, err) } @@ -194,36 +198,38 @@ func (s *Store[H]) deleteSingle( func (s *Store[H]) deleteSequential( ctx context.Context, from, to uint64, -) (highest uint64, err error) { +) (highest uint64, missing int, err error) { log.Debugw("starting delete range sequential", "from_height", from, "to_height", to) - batch, err := s.ds.Batch(ctx) - if err != nil { - return 0, fmt.Errorf("new batch: %w", err) - } - ctx = contextds.WithWrite(ctx, batch) + ctx, done := s.withWriteBatch(ctx) defer func() { - if derr := batch.Commit(ctx); derr != nil { + if derr := done(); derr != nil { err = errors.Join(err, fmt.Errorf("committing batch: %w", derr)) } }() + ctx, doneTx := s.withReadTransaction(ctx) + defer doneTx() s.onDeleteMu.Lock() onDelete := slices.Clone(s.onDelete) s.onDeleteMu.Unlock() for height := from; height < to; height++ { - if err := s.deleteSingle(ctx, height, batch, onDelete); err != nil { - return height, err + err := s.deleteSingle(ctx, height, onDelete) + if errors.Is(err, datastore.ErrNotFound) { + missing++ + log.Debugw("attempt to delete header that's not found", "height", height) + } else if err != nil { + return height, missing, err } } - return to, nil + return to, missing, nil } // deleteParallel deletes [from:to) header range from the store in parallel // and returns the highest unprocessed height: 'to' in success case or the failed height in error case. -func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, error) { +func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, int, error) { now := time.Now() s.onDeleteMu.Lock() onDelete := slices.Clone(s.onDelete) @@ -235,13 +241,16 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, // require too much RAM, yet shows good performance. workerNum := runtime.GOMAXPROCS(-1) * 3 - log.Infow( - "deleting range parallel", "from_height", from, "to_height", to, "worker_num", workerNum, + log.Infow("deleting range parallel", + "from_height", from, + "to_height", to, + "worker_num", workerNum, ) type result struct { - height uint64 - err error + missing int + height uint64 + err error } results := make([]result, workerNum) jobCh := make(chan uint64, workerNum) @@ -260,24 +269,25 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, } }() - batch, err := s.ds.Batch(ctx) - if err != nil { - last.err = fmt.Errorf("new batch: %w", err) - return - } - ctx = contextds.WithWrite(ctx, batch) + workerCtx, done := s.withWriteBatch(ctx) + defer func() { + if err := done(); err != nil { + last.err = errors.Join(last.err, fmt.Errorf("committing delete batch: %w", err)) + } + }() + workerCtx, doneTx := s.withReadTransaction(workerCtx) + defer doneTx() for height := range jobCh { last.height = height - last.err = s.deleteSingle(ctx, height, batch, onDelete) - if last.err != nil { + last.err = s.deleteSingle(workerCtx, height, onDelete) + if errors.Is(last.err, datastore.ErrNotFound) { + last.missing++ + log.Debugw("attempt to delete header that's not found", "height", height) + } else if last.err != nil { break } } - - if err := batch.Commit(ctx); err != nil { - last.err = errors.Join(last.err, fmt.Errorf("committing delete batch: %w", err)) - } } var wg sync.WaitGroup @@ -309,29 +319,30 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, return int(a.height - b.height) //nolint:gosec }) // find the highest deleted height - var highest uint64 + var ( + highest uint64 + missing int + ) for _, result := range results { if result.err != nil { // return the error immediately even if some higher headers may have been deleted // this ensures we set tail to the lowest errored height, s.t. retries do not shadowly miss any headers - return result.height, result.err + return result.height, missing, result.err } if result.height > highest { highest = result.height } + missing += result.missing } // ensures the height after the highest deleted becomes the new tail highest++ - log.Infow( - "deleted range parallel", - "from_height", - from, - "to_height", - to, - "took(s)", - time.Since(now).Seconds(), + log.Infow("deleted range parallel", + "from_height", from, + "to_height", to, + "hdrs_not_found", missing, + "took(s)", time.Since(now).Seconds(), ) - return highest, nil + return highest, missing, nil }