Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,4 @@ require (
lukechampine.com/blake3 v1.4.1 // indirect
)

replace github.com/ipfs/go-datastore => github.com/celestiaorg/go-datastore v0.0.0-20250725152401-1597fcd4fa90
replace github.com/ipfs/go-datastore => github.com/celestiaorg/go-datastore v0.0.0-20250801131506-48a63ae531e4
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/celestiaorg/go-datastore v0.0.0-20250725152401-1597fcd4fa90 h1:zZORGwTRC4EyUdb1UQXcdzeCHAOdn0jUtY5KaeCCNf8=
github.com/celestiaorg/go-datastore v0.0.0-20250725152401-1597fcd4fa90/go.mod h1:W+pI1NsUsz3tcsAACMtfC+IZdnQTnC/7VfPoJBQuts0=
github.com/celestiaorg/go-datastore v0.0.0-20250801131506-48a63ae531e4 h1:udw77BU45zmvTV7798FhR1wHFmsFpu4GnA5mubtMcR0=
github.com/celestiaorg/go-datastore v0.0.0-20250801131506-48a63ae531e4/go.mod h1:W+pI1NsUsz3tcsAACMtfC+IZdnQTnC/7VfPoJBQuts0=
github.com/celestiaorg/go-libp2p-messenger v0.2.2 h1:osoUfqjss7vWTIZrrDSy953RjQz+ps/vBFE7bychLEc=
github.com/celestiaorg/go-libp2p-messenger v0.2.2/go.mod h1:oTCRV5TfdO7V/k6nkx7QjQzGrWuJbupv+0o1cgnY2i4=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down
65 changes: 64 additions & 1 deletion store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

lru "github.com/hashicorp/golang-lru/v2"
"github.com/ipfs/go-datastore"
contextds "github.com/ipfs/go-datastore/context"
"github.com/ipfs/go-datastore/keytransform"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/zap/zapcore"
Expand All @@ -31,7 +33,7 @@ type Store[H header.Header[H]] struct {
// header storing
//
// underlying KV store
ds datastore.Batching
ds *keytransform.Datastore
// adaptive replacement cache of headers
cache *lru.TwoQueueCache[string, H]
// metrics collection instance
Expand Down Expand Up @@ -275,6 +277,9 @@ func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) {
return h, nil
}

ctx, done := s.withReadTransaction(ctx)
defer done()

hash, err := s.heightIndex.HashByHeight(ctx, height, true)
if err != nil {
var zero H
Expand Down Expand Up @@ -308,6 +313,10 @@ func (s *Store[H]) getRangeByHeight(ctx context.Context, from, to uint64) ([]H,
if from >= to {
return nil, fmt.Errorf("header/store: invalid range(%d,%d)", from, to)
}

ctx, done := s.withReadTransaction(ctx)
defer done()

h, err := s.GetByHeight(ctx, to-1)
if err != nil {
return nil, err
Expand Down Expand Up @@ -627,6 +636,9 @@ func (s *Store[H]) nextHead(ctx context.Context) (head H, changed bool) {
return head, false
}

ctx, done := s.withReadTransaction(ctx)
defer done()

for ctx.Err() == nil {
h, err := s.getByHeight(ctx, head.Height()+1)
if err != nil {
Expand Down Expand Up @@ -664,6 +676,9 @@ func (s *Store[H]) nextTail(ctx context.Context) (tail H, changed bool) {
return tail, false
}

ctx, done := s.withReadTransaction(ctx)
defer done()

for ctx.Err() == nil {
h, err := s.getByHeight(ctx, tail.Height()-1)
if err != nil {
Expand Down Expand Up @@ -746,6 +761,54 @@ func (s *Store[H]) deinit() {
s.heightSub.SetHeight(0)
}

// withWriteBatch attaches a new batch to the given context and returns cleanup func.
// batch is used to couple multiple related writes to substantially optimize performance.
func (s *Store[H]) withWriteBatch(ctx context.Context) (context.Context, func() error) {
bds, ok := s.ds.Children()[0].(datastore.Batching)
if !ok {
return ctx, func() error { return nil }
}
Comment on lines +767 to +770
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A note for an advanced reader: we need to unwrap the underlying datastore from the keytransformer one, otherwise key transformation is going to happen twice in the datastore itself and the tx wrapper.

Basically, a combination of keytransformer and contextds may lead to unexpected and silent double transformation, which is annoying to debug. There might be a general way to untangle these on the library side, but this is the simplest solution on our side I could think of.


if _, ok = contextds.GetWrite(ctx); ok {
// there is a batch already provided by parent
return ctx, func() error { return nil }
}

batch, err := bds.Batch(ctx)
if err != nil {
log.Errorw("new batch", "err", err)
return ctx, func() error { return nil }
}

return contextds.WithWrite(ctx, batch), func() error {
return batch.Commit(ctx)
}
}

// withReadTransaction attaches a new transaction to the given context and returns cleanup func.
// transaction is used to couple multiple related reads to substantially optimize performance.
func (s *Store[H]) withReadTransaction(ctx context.Context) (context.Context, func()) {
tds, ok := s.ds.Children()[0].(datastore.TxnFeature)
if !ok {
return ctx, func() {}
}

if _, ok = contextds.GetRead(ctx); ok {
// there is a transaction already
return ctx, func() {}
}

txn, err := tds.NewTransaction(ctx, true)
if err != nil {
log.Errorw("new transaction", "err", err)
return ctx, func() {}
}

return contextds.WithRead(ctx, txn), func() {
txn.Discard(ctx)
}
}

func writeHeaderHashTo[H header.Header[H]](
ctx context.Context,
write datastore.Write,
Expand Down
111 changes: 61 additions & 50 deletions store/store_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/ipfs/go-datastore"
contextds "github.com/ipfs/go-datastore/context"

"github.com/celestiaorg/go-header"
)
Expand Down Expand Up @@ -100,27 +99,37 @@ var (
func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error) {
startTime := time.Now()

var height uint64
var (
height uint64
missing int
)
defer func() {
if err != nil {
if errors.Is(err, errDeleteTimeout) {
log.Warnw("partial delete",
"from_height", from,
"expected_to_height", to,
"actual_to_height", height,
"hdrs_not_found", missing,
"took(s)", time.Since(startTime),
)
} else {
log.Errorw("partial delete with error",
"from_height", from,
"expected_to_height", to,
"actual_to_height", height,
"hdrs_not_found", missing,
"took(s)", time.Since(startTime),
"err", err,
)
}
} else if to-from > 1 {
log.Debugw("deleted headers", "from_height", from, "to_height", to, "took(s)", time.Since(startTime).Seconds())
log.Debugw("deleted headers",
"from_height", from,
"to_height", to,
"hdrs_not_found", missing,
"took(s)", time.Since(startTime).Seconds(),
)
}

if derr := s.setTail(ctx, s.ds, height); derr != nil {
Expand All @@ -140,9 +149,9 @@ func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error)
}

if to-from < deleteRangeParallelThreshold {
height, err = s.deleteSequential(deleteCtx, from, to)
height, missing, err = s.deleteSequential(deleteCtx, from, to)
} else {
height, err = s.deleteParallel(deleteCtx, from, to)
height, missing, err = s.deleteParallel(deleteCtx, from, to)
}

return err
Expand All @@ -153,7 +162,6 @@ func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error)
func (s *Store[H]) deleteSingle(
ctx context.Context,
height uint64,
batch datastore.Batch,
onDelete []func(ctx context.Context, height uint64) error,
) error {
// some of the methods may not handle context cancellation properly
Expand All @@ -162,10 +170,6 @@ func (s *Store[H]) deleteSingle(
}

hash, err := s.heightIndex.HashByHeight(ctx, height, false)
if errors.Is(err, datastore.ErrNotFound) {
log.Debugw("attempt to delete header that's not found", "height", height)
return nil
}
if err != nil {
return fmt.Errorf("hash by height %d: %w", height, err)
}
Expand All @@ -176,10 +180,10 @@ func (s *Store[H]) deleteSingle(
}
}

if err := batch.Delete(ctx, hashKey(hash)); err != nil {
if err := s.ds.Delete(ctx, hashKey(hash)); err != nil {
return fmt.Errorf("delete hash key (%X): %w", hash, err)
}
if err := batch.Delete(ctx, heightKey(height)); err != nil {
if err := s.ds.Delete(ctx, heightKey(height)); err != nil {
return fmt.Errorf("delete height key (%d): %w", height, err)
}

Expand All @@ -194,36 +198,38 @@ func (s *Store[H]) deleteSingle(
func (s *Store[H]) deleteSequential(
ctx context.Context,
from, to uint64,
) (highest uint64, err error) {
) (highest uint64, missing int, err error) {
log.Debugw("starting delete range sequential", "from_height", from, "to_height", to)

batch, err := s.ds.Batch(ctx)
if err != nil {
return 0, fmt.Errorf("new batch: %w", err)
}
ctx = contextds.WithWrite(ctx, batch)
ctx, done := s.withWriteBatch(ctx)
defer func() {
if derr := batch.Commit(ctx); derr != nil {
if derr := done(); derr != nil {
err = errors.Join(err, fmt.Errorf("committing batch: %w", derr))
}
}()
ctx, doneTx := s.withReadTransaction(ctx)
defer doneTx()

s.onDeleteMu.Lock()
onDelete := slices.Clone(s.onDelete)
s.onDeleteMu.Unlock()

for height := from; height < to; height++ {
if err := s.deleteSingle(ctx, height, batch, onDelete); err != nil {
return height, err
err := s.deleteSingle(ctx, height, onDelete)
if errors.Is(err, datastore.ErrNotFound) {
missing++
log.Debugw("attempt to delete header that's not found", "height", height)
} else if err != nil {
return height, missing, err
}
}

return to, nil
return to, missing, nil
}

// deleteParallel deletes [from:to) header range from the store in parallel
// and returns the highest unprocessed height: 'to' in success case or the failed height in error case.
func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, error) {
func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, int, error) {
now := time.Now()
s.onDeleteMu.Lock()
onDelete := slices.Clone(s.onDelete)
Expand All @@ -235,13 +241,16 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64,
// require too much RAM, yet shows good performance.
workerNum := runtime.GOMAXPROCS(-1) * 3

log.Infow(
"deleting range parallel", "from_height", from, "to_height", to, "worker_num", workerNum,
log.Infow("deleting range parallel",
"from_height", from,
"to_height", to,
"worker_num", workerNum,
)

type result struct {
height uint64
err error
missing int
height uint64
err error
}
results := make([]result, workerNum)
jobCh := make(chan uint64, workerNum)
Expand All @@ -260,24 +269,25 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64,
}
}()

batch, err := s.ds.Batch(ctx)
if err != nil {
last.err = fmt.Errorf("new batch: %w", err)
return
}
ctx = contextds.WithWrite(ctx, batch)
workerCtx, done := s.withWriteBatch(ctx)
defer func() {
if err := done(); err != nil {
last.err = errors.Join(last.err, fmt.Errorf("committing delete batch: %w", err))
}
}()
workerCtx, doneTx := s.withReadTransaction(workerCtx)
defer doneTx()

for height := range jobCh {
last.height = height
last.err = s.deleteSingle(ctx, height, batch, onDelete)
if last.err != nil {
last.err = s.deleteSingle(workerCtx, height, onDelete)
if errors.Is(last.err, datastore.ErrNotFound) {
last.missing++
log.Debugw("attempt to delete header that's not found", "height", height)
} else if last.err != nil {
break
}
}

if err := batch.Commit(ctx); err != nil {
last.err = errors.Join(last.err, fmt.Errorf("committing delete batch: %w", err))
}
}

var wg sync.WaitGroup
Expand Down Expand Up @@ -309,29 +319,30 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64,
return int(a.height - b.height) //nolint:gosec
})
// find the highest deleted height
var highest uint64
var (
highest uint64
missing int
)
for _, result := range results {
if result.err != nil {
// return the error immediately even if some higher headers may have been deleted
// this ensures we set tail to the lowest errored height, s.t. retries do not shadowly miss any headers
return result.height, result.err
return result.height, missing, result.err
}

if result.height > highest {
highest = result.height
}
missing += result.missing
}

// ensures the height after the highest deleted becomes the new tail
highest++
log.Infow(
"deleted range parallel",
"from_height",
from,
"to_height",
to,
"took(s)",
time.Since(now).Seconds(),
log.Infow("deleted range parallel",
"from_height", from,
"to_height", to,
"hdrs_not_found", missing,
"took(s)", time.Since(now).Seconds(),
)
return highest, nil
return highest, missing, nil
}