Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,4 @@ require (
lukechampine.com/blake3 v1.4.1 // indirect
)

replace github.com/ipfs/go-datastore => github.com/celestiaorg/go-datastore v0.0.0-20250725152401-1597fcd4fa90
replace github.com/ipfs/go-datastore => github.com/celestiaorg/go-datastore v0.0.0-20250801131506-48a63ae531e4
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/celestiaorg/go-datastore v0.0.0-20250725152401-1597fcd4fa90 h1:zZORGwTRC4EyUdb1UQXcdzeCHAOdn0jUtY5KaeCCNf8=
github.com/celestiaorg/go-datastore v0.0.0-20250725152401-1597fcd4fa90/go.mod h1:W+pI1NsUsz3tcsAACMtfC+IZdnQTnC/7VfPoJBQuts0=
github.com/celestiaorg/go-datastore v0.0.0-20250801131506-48a63ae531e4 h1:udw77BU45zmvTV7798FhR1wHFmsFpu4GnA5mubtMcR0=
github.com/celestiaorg/go-datastore v0.0.0-20250801131506-48a63ae531e4/go.mod h1:W+pI1NsUsz3tcsAACMtfC+IZdnQTnC/7VfPoJBQuts0=
github.com/celestiaorg/go-libp2p-messenger v0.2.2 h1:osoUfqjss7vWTIZrrDSy953RjQz+ps/vBFE7bychLEc=
github.com/celestiaorg/go-libp2p-messenger v0.2.2/go.mod h1:oTCRV5TfdO7V/k6nkx7QjQzGrWuJbupv+0o1cgnY2i4=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down
65 changes: 64 additions & 1 deletion store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

lru "github.com/hashicorp/golang-lru/v2"
"github.com/ipfs/go-datastore"
contextds "github.com/ipfs/go-datastore/context"
"github.com/ipfs/go-datastore/keytransform"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/zap/zapcore"
Expand All @@ -31,7 +33,7 @@ type Store[H header.Header[H]] struct {
// header storing
//
// underlying KV store
ds datastore.Batching
ds *keytransform.Datastore
// adaptive replacement cache of headers
cache *lru.TwoQueueCache[string, H]
// metrics collection instance
Expand Down Expand Up @@ -275,6 +277,9 @@ func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) {
return h, nil
}

ctx, done := s.withReadTransaction(ctx)
defer done()

hash, err := s.heightIndex.HashByHeight(ctx, height, true)
if err != nil {
var zero H
Expand Down Expand Up @@ -308,6 +313,10 @@ func (s *Store[H]) getRangeByHeight(ctx context.Context, from, to uint64) ([]H,
if from >= to {
return nil, fmt.Errorf("header/store: invalid range(%d,%d)", from, to)
}

ctx, done := s.withReadTransaction(ctx)
defer done()

h, err := s.GetByHeight(ctx, to-1)
if err != nil {
return nil, err
Expand Down Expand Up @@ -627,6 +636,9 @@ func (s *Store[H]) nextHead(ctx context.Context) (head H, changed bool) {
return head, false
}

ctx, done := s.withReadTransaction(ctx)
defer done()

for ctx.Err() == nil {
h, err := s.getByHeight(ctx, head.Height()+1)
if err != nil {
Expand Down Expand Up @@ -664,6 +676,9 @@ func (s *Store[H]) nextTail(ctx context.Context) (tail H, changed bool) {
return tail, false
}

ctx, done := s.withReadTransaction(ctx)
defer done()

for ctx.Err() == nil {
h, err := s.getByHeight(ctx, tail.Height()-1)
if err != nil {
Expand Down Expand Up @@ -746,6 +761,54 @@ func (s *Store[H]) deinit() {
s.heightSub.SetHeight(0)
}

// withWriteBatch attaches a new batch to the given context and returns cleanup func.
func (s *Store[H]) withWriteBatch(ctx context.Context) (context.Context, func() error) {
bds, ok := s.ds.Children()[0].(datastore.Batching)
if !ok {
return ctx, func() error { return nil }
}
Comment on lines +767 to +770
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A note for an advanced reader: we need to unwrap the underlying datastore from the keytransformer one, otherwise key transformation is going to happen twice in the datastore itself and the tx wrapper.

Basically, a combination of keytransformer and contextds may lead to unexpected and silent double transformation, which is annoying to debug. There might be a general way to untangle these on the library side, but this is the simplest solution on our side I could think of.


if _, ok = contextds.GetWrite(ctx); ok {
// there is a batch already
// avoid returning so its not discarded
return ctx, func() error { return nil }
}

batch, err := bds.Batch(ctx)
if err != nil {
log.Errorw("new batch", "err", err)
return ctx, func() error { return nil }
}

return contextds.WithWrite(ctx, batch), func() error {
return batch.Commit(ctx)
}
}

// withReadTransaction attaches a new transaction to the given context and returns cleanup func.
func (s *Store[H]) withReadTransaction(ctx context.Context) (context.Context, func()) {
tds, ok := s.ds.Children()[0].(datastore.TxnFeature)
if !ok {
return ctx, func() {}
}

if _, ok = contextds.GetRead(ctx); ok {
// there is a transaction already
// avoid returning so its not discarded
return ctx, func() {}
}

txn, err := tds.NewTransaction(ctx, true)
if err != nil {
log.Errorw("new transaction", "err", err)
return ctx, func() {}
}

return contextds.WithRead(ctx, txn), func() {
txn.Discard(ctx)
}
}

func writeHeaderHashTo[H header.Header[H]](
ctx context.Context,
write datastore.Write,
Expand Down
111 changes: 61 additions & 50 deletions store/store_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/ipfs/go-datastore"
contextds "github.com/ipfs/go-datastore/context"

"github.com/celestiaorg/go-header"
)
Expand Down Expand Up @@ -100,27 +99,37 @@ var (
func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error) {
startTime := time.Now()

var height uint64
var (
height uint64
missing int
)
defer func() {
if err != nil {
if errors.Is(err, errDeleteTimeout) {
log.Warnw("partial delete",
"from_height", from,
"expected_to_height", to,
"actual_to_height", height,
"hdrs_not_found", missing,
"took(s)", time.Since(startTime),
)
} else {
log.Errorw("partial delete with error",
"from_height", from,
"expected_to_height", to,
"actual_to_height", height,
"hdrs_not_found", missing,
"took(s)", time.Since(startTime),
"err", err,
)
}
} else if to-from > 1 {
log.Debugw("deleted headers", "from_height", from, "to_height", to, "took(s)", time.Since(startTime).Seconds())
log.Debugw("deleted headers",
"from_height", from,
"to_height", to,
"hdrs_not_found", missing,
"took(s)", time.Since(startTime).Seconds(),
)
}

if derr := s.setTail(ctx, s.ds, height); derr != nil {
Expand All @@ -140,9 +149,9 @@ func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error)
}

if to-from < deleteRangeParallelThreshold {
height, err = s.deleteSequential(deleteCtx, from, to)
height, missing, err = s.deleteSequential(deleteCtx, from, to)
} else {
height, err = s.deleteParallel(deleteCtx, from, to)
height, missing, err = s.deleteParallel(deleteCtx, from, to)
}

return err
Expand All @@ -153,7 +162,6 @@ func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error)
func (s *Store[H]) deleteSingle(
ctx context.Context,
height uint64,
batch datastore.Batch,
onDelete []func(ctx context.Context, height uint64) error,
) error {
// some of the methods may not handle context cancellation properly
Expand All @@ -162,10 +170,6 @@ func (s *Store[H]) deleteSingle(
}

hash, err := s.heightIndex.HashByHeight(ctx, height, false)
if errors.Is(err, datastore.ErrNotFound) {
log.Debugw("attempt to delete header that's not found", "height", height)
return nil
}
if err != nil {
return fmt.Errorf("hash by height %d: %w", height, err)
}
Expand All @@ -176,10 +180,10 @@ func (s *Store[H]) deleteSingle(
}
}

if err := batch.Delete(ctx, hashKey(hash)); err != nil {
if err := s.ds.Delete(ctx, hashKey(hash)); err != nil {
return fmt.Errorf("delete hash key (%X): %w", hash, err)
}
if err := batch.Delete(ctx, heightKey(height)); err != nil {
if err := s.ds.Delete(ctx, heightKey(height)); err != nil {
return fmt.Errorf("delete height key (%d): %w", height, err)
}

Expand All @@ -194,36 +198,38 @@ func (s *Store[H]) deleteSingle(
func (s *Store[H]) deleteSequential(
ctx context.Context,
from, to uint64,
) (highest uint64, err error) {
) (highest uint64, missing int, err error) {
log.Debugw("starting delete range sequential", "from_height", from, "to_height", to)

batch, err := s.ds.Batch(ctx)
if err != nil {
return 0, fmt.Errorf("new batch: %w", err)
}
ctx = contextds.WithWrite(ctx, batch)
ctx, done := s.withWriteBatch(ctx)
defer func() {
if derr := batch.Commit(ctx); derr != nil {
if derr := done(); derr != nil {
err = errors.Join(err, fmt.Errorf("committing batch: %w", derr))
}
}()
ctx, doneTx := s.withReadTransaction(ctx)
defer doneTx()

s.onDeleteMu.Lock()
onDelete := slices.Clone(s.onDelete)
s.onDeleteMu.Unlock()

for height := from; height < to; height++ {
if err := s.deleteSingle(ctx, height, batch, onDelete); err != nil {
return height, err
err := s.deleteSingle(ctx, height, onDelete)
if errors.Is(err, datastore.ErrNotFound) {
missing++
log.Debugw("attempt to delete header that's not found", "height", height)
} else if err != nil {
return height, missing, err
}
}

return to, nil
return to, missing, nil
}

// deleteParallel deletes [from:to) header range from the store in parallel
// and returns the highest unprocessed height: 'to' in success case or the failed height in error case.
func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, error) {
func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, int, error) {
now := time.Now()
s.onDeleteMu.Lock()
onDelete := slices.Clone(s.onDelete)
Expand All @@ -235,13 +241,16 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64,
// require too much RAM, yet shows good performance.
workerNum := runtime.GOMAXPROCS(-1) * 3

log.Infow(
"deleting range parallel", "from_height", from, "to_height", to, "worker_num", workerNum,
log.Infow("deleting range parallel",
"from_height", from,
"to_height", to,
"worker_num", workerNum,
)

type result struct {
height uint64
err error
missing int
height uint64
err error
}
results := make([]result, workerNum)
jobCh := make(chan uint64, workerNum)
Expand All @@ -260,24 +269,25 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64,
}
}()

batch, err := s.ds.Batch(ctx)
if err != nil {
last.err = fmt.Errorf("new batch: %w", err)
return
}
ctx = contextds.WithWrite(ctx, batch)
workerCtx, done := s.withWriteBatch(ctx)
defer func() {
if err := done(); err != nil {
last.err = errors.Join(last.err, fmt.Errorf("committing delete batch: %w", err))
}
}()
workerCtx, doneTx := s.withReadTransaction(workerCtx)
defer doneTx()

for height := range jobCh {
last.height = height
last.err = s.deleteSingle(ctx, height, batch, onDelete)
if last.err != nil {
last.err = s.deleteSingle(workerCtx, height, onDelete)
if errors.Is(last.err, datastore.ErrNotFound) {
last.missing++
log.Debugw("attempt to delete header that's not found", "height", height)
} else if last.err != nil {
break
}
}

if err := batch.Commit(ctx); err != nil {
last.err = errors.Join(last.err, fmt.Errorf("committing delete batch: %w", err))
}
}

var wg sync.WaitGroup
Expand Down Expand Up @@ -309,29 +319,30 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64,
return int(a.height - b.height) //nolint:gosec
})
// find the highest deleted height
var highest uint64
var (
highest uint64
missing int
)
for _, result := range results {
if result.err != nil {
// return the error immediately even if some higher headers may have been deleted
// this ensures we set tail to the lowest errored height, s.t. retries do not shadowly miss any headers
return result.height, result.err
return result.height, missing, result.err
}

if result.height > highest {
highest = result.height
}
missing += result.missing
}

// ensures the height after the highest deleted becomes the new tail
highest++
log.Infow(
"deleted range parallel",
"from_height",
from,
"to_height",
to,
"took(s)",
time.Since(now).Seconds(),
log.Infow("deleted range parallel",
"from_height", from,
"to_height", to,
"hdrs_not_found", missing,
"took(s)", time.Since(now).Seconds(),
)
return highest, nil
return highest, missing, nil
}