Skip to content

Commit 38be935

Browse files
authored
perf(store): batch reads (#331)
This PR shaves another 30s when we prune the whole chain. It achieves this by applying the same technique as in #328, but for reads. Previously, the underlying datastore would create a read-only transaction for every singular read. As shown in pyroscope profiles, this puts pressure on badger as creating new transactions ain't cheap and involves complex synchronization to ensure atomicity between parallel transactions. With this insight in mind, I also integrated read batching in other places where this must be beneficial: advancing, receding, range, and singular header reads. More noticeable changes: * Fixes a silent bug that wouldn't delete samples on the Pruner side * Fixes a bug in the upstream contextds change * Adds tracking for missing headers that were not found during deletion
1 parent bc5064b commit 38be935

File tree

4 files changed

+128
-54
lines changed

4 files changed

+128
-54
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,4 +115,4 @@ require (
115115
lukechampine.com/blake3 v1.4.1 // indirect
116116
)
117117

118-
replace github.com/ipfs/go-datastore => github.com/celestiaorg/go-datastore v0.0.0-20250725152401-1597fcd4fa90
118+
replace github.com/ipfs/go-datastore => github.com/celestiaorg/go-datastore v0.0.0-20250801131506-48a63ae531e4

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
1616
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
1717
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
1818
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
19-
github.com/celestiaorg/go-datastore v0.0.0-20250725152401-1597fcd4fa90 h1:zZORGwTRC4EyUdb1UQXcdzeCHAOdn0jUtY5KaeCCNf8=
20-
github.com/celestiaorg/go-datastore v0.0.0-20250725152401-1597fcd4fa90/go.mod h1:W+pI1NsUsz3tcsAACMtfC+IZdnQTnC/7VfPoJBQuts0=
19+
github.com/celestiaorg/go-datastore v0.0.0-20250801131506-48a63ae531e4 h1:udw77BU45zmvTV7798FhR1wHFmsFpu4GnA5mubtMcR0=
20+
github.com/celestiaorg/go-datastore v0.0.0-20250801131506-48a63ae531e4/go.mod h1:W+pI1NsUsz3tcsAACMtfC+IZdnQTnC/7VfPoJBQuts0=
2121
github.com/celestiaorg/go-libp2p-messenger v0.2.2 h1:osoUfqjss7vWTIZrrDSy953RjQz+ps/vBFE7bychLEc=
2222
github.com/celestiaorg/go-libp2p-messenger v0.2.2/go.mod h1:oTCRV5TfdO7V/k6nkx7QjQzGrWuJbupv+0o1cgnY2i4=
2323
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=

store/store.go

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010

1111
lru "github.com/hashicorp/golang-lru/v2"
1212
"github.com/ipfs/go-datastore"
13+
contextds "github.com/ipfs/go-datastore/context"
14+
"github.com/ipfs/go-datastore/keytransform"
1315
"github.com/ipfs/go-datastore/namespace"
1416
logging "github.com/ipfs/go-log/v2"
1517
"go.uber.org/zap/zapcore"
@@ -31,7 +33,7 @@ type Store[H header.Header[H]] struct {
3133
// header storing
3234
//
3335
// underlying KV store
34-
ds datastore.Batching
36+
ds *keytransform.Datastore
3537
// adaptive replacement cache of headers
3638
cache *lru.TwoQueueCache[string, H]
3739
// metrics collection instance
@@ -275,6 +277,9 @@ func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) {
275277
return h, nil
276278
}
277279

280+
ctx, done := s.withReadTransaction(ctx)
281+
defer done()
282+
278283
hash, err := s.heightIndex.HashByHeight(ctx, height, true)
279284
if err != nil {
280285
var zero H
@@ -308,6 +313,10 @@ func (s *Store[H]) getRangeByHeight(ctx context.Context, from, to uint64) ([]H,
308313
if from >= to {
309314
return nil, fmt.Errorf("header/store: invalid range(%d,%d)", from, to)
310315
}
316+
317+
ctx, done := s.withReadTransaction(ctx)
318+
defer done()
319+
311320
h, err := s.GetByHeight(ctx, to-1)
312321
if err != nil {
313322
return nil, err
@@ -627,6 +636,9 @@ func (s *Store[H]) nextHead(ctx context.Context) (head H, changed bool) {
627636
return head, false
628637
}
629638

639+
ctx, done := s.withReadTransaction(ctx)
640+
defer done()
641+
630642
for ctx.Err() == nil {
631643
h, err := s.getByHeight(ctx, head.Height()+1)
632644
if err != nil {
@@ -664,6 +676,9 @@ func (s *Store[H]) nextTail(ctx context.Context) (tail H, changed bool) {
664676
return tail, false
665677
}
666678

679+
ctx, done := s.withReadTransaction(ctx)
680+
defer done()
681+
667682
for ctx.Err() == nil {
668683
h, err := s.getByHeight(ctx, tail.Height()-1)
669684
if err != nil {
@@ -746,6 +761,54 @@ func (s *Store[H]) deinit() {
746761
s.heightSub.SetHeight(0)
747762
}
748763

764+
// withWriteBatch attaches a new batch to the given context and returns cleanup func.
765+
// batch is used to couple multiple related writes to substantially optimize performance.
766+
func (s *Store[H]) withWriteBatch(ctx context.Context) (context.Context, func() error) {
767+
bds, ok := s.ds.Children()[0].(datastore.Batching)
768+
if !ok {
769+
return ctx, func() error { return nil }
770+
}
771+
772+
if _, ok = contextds.GetWrite(ctx); ok {
773+
// there is a batch already provided by parent
774+
return ctx, func() error { return nil }
775+
}
776+
777+
batch, err := bds.Batch(ctx)
778+
if err != nil {
779+
log.Errorw("new batch", "err", err)
780+
return ctx, func() error { return nil }
781+
}
782+
783+
return contextds.WithWrite(ctx, batch), func() error {
784+
return batch.Commit(ctx)
785+
}
786+
}
787+
788+
// withReadTransaction attaches a new transaction to the given context and returns cleanup func.
789+
// transaction is used to couple multiple related reads to substantially optimize performance.
790+
func (s *Store[H]) withReadTransaction(ctx context.Context) (context.Context, func()) {
791+
tds, ok := s.ds.Children()[0].(datastore.TxnFeature)
792+
if !ok {
793+
return ctx, func() {}
794+
}
795+
796+
if _, ok = contextds.GetRead(ctx); ok {
797+
// there is a transaction already
798+
return ctx, func() {}
799+
}
800+
801+
txn, err := tds.NewTransaction(ctx, true)
802+
if err != nil {
803+
log.Errorw("new transaction", "err", err)
804+
return ctx, func() {}
805+
}
806+
807+
return contextds.WithRead(ctx, txn), func() {
808+
txn.Discard(ctx)
809+
}
810+
}
811+
749812
func writeHeaderHashTo[H header.Header[H]](
750813
ctx context.Context,
751814
write datastore.Write,

store/store_delete.go

Lines changed: 61 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"time"
1111

1212
"github.com/ipfs/go-datastore"
13-
contextds "github.com/ipfs/go-datastore/context"
1413

1514
"github.com/celestiaorg/go-header"
1615
)
@@ -100,27 +99,37 @@ var (
10099
func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error) {
101100
startTime := time.Now()
102101

103-
var height uint64
102+
var (
103+
height uint64
104+
missing int
105+
)
104106
defer func() {
105107
if err != nil {
106108
if errors.Is(err, errDeleteTimeout) {
107109
log.Warnw("partial delete",
108110
"from_height", from,
109111
"expected_to_height", to,
110112
"actual_to_height", height,
113+
"hdrs_not_found", missing,
111114
"took(s)", time.Since(startTime),
112115
)
113116
} else {
114117
log.Errorw("partial delete with error",
115118
"from_height", from,
116119
"expected_to_height", to,
117120
"actual_to_height", height,
121+
"hdrs_not_found", missing,
118122
"took(s)", time.Since(startTime),
119123
"err", err,
120124
)
121125
}
122126
} else if to-from > 1 {
123-
log.Debugw("deleted headers", "from_height", from, "to_height", to, "took(s)", time.Since(startTime).Seconds())
127+
log.Debugw("deleted headers",
128+
"from_height", from,
129+
"to_height", to,
130+
"hdrs_not_found", missing,
131+
"took(s)", time.Since(startTime).Seconds(),
132+
)
124133
}
125134

126135
if derr := s.setTail(ctx, s.ds, height); derr != nil {
@@ -140,9 +149,9 @@ func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error)
140149
}
141150

142151
if to-from < deleteRangeParallelThreshold {
143-
height, err = s.deleteSequential(deleteCtx, from, to)
152+
height, missing, err = s.deleteSequential(deleteCtx, from, to)
144153
} else {
145-
height, err = s.deleteParallel(deleteCtx, from, to)
154+
height, missing, err = s.deleteParallel(deleteCtx, from, to)
146155
}
147156

148157
return err
@@ -153,7 +162,6 @@ func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error)
153162
func (s *Store[H]) deleteSingle(
154163
ctx context.Context,
155164
height uint64,
156-
batch datastore.Batch,
157165
onDelete []func(ctx context.Context, height uint64) error,
158166
) error {
159167
// some of the methods may not handle context cancellation properly
@@ -162,10 +170,6 @@ func (s *Store[H]) deleteSingle(
162170
}
163171

164172
hash, err := s.heightIndex.HashByHeight(ctx, height, false)
165-
if errors.Is(err, datastore.ErrNotFound) {
166-
log.Debugw("attempt to delete header that's not found", "height", height)
167-
return nil
168-
}
169173
if err != nil {
170174
return fmt.Errorf("hash by height %d: %w", height, err)
171175
}
@@ -176,10 +180,10 @@ func (s *Store[H]) deleteSingle(
176180
}
177181
}
178182

179-
if err := batch.Delete(ctx, hashKey(hash)); err != nil {
183+
if err := s.ds.Delete(ctx, hashKey(hash)); err != nil {
180184
return fmt.Errorf("delete hash key (%X): %w", hash, err)
181185
}
182-
if err := batch.Delete(ctx, heightKey(height)); err != nil {
186+
if err := s.ds.Delete(ctx, heightKey(height)); err != nil {
183187
return fmt.Errorf("delete height key (%d): %w", height, err)
184188
}
185189

@@ -194,36 +198,38 @@ func (s *Store[H]) deleteSingle(
194198
func (s *Store[H]) deleteSequential(
195199
ctx context.Context,
196200
from, to uint64,
197-
) (highest uint64, err error) {
201+
) (highest uint64, missing int, err error) {
198202
log.Debugw("starting delete range sequential", "from_height", from, "to_height", to)
199203

200-
batch, err := s.ds.Batch(ctx)
201-
if err != nil {
202-
return 0, fmt.Errorf("new batch: %w", err)
203-
}
204-
ctx = contextds.WithWrite(ctx, batch)
204+
ctx, done := s.withWriteBatch(ctx)
205205
defer func() {
206-
if derr := batch.Commit(ctx); derr != nil {
206+
if derr := done(); derr != nil {
207207
err = errors.Join(err, fmt.Errorf("committing batch: %w", derr))
208208
}
209209
}()
210+
ctx, doneTx := s.withReadTransaction(ctx)
211+
defer doneTx()
210212

211213
s.onDeleteMu.Lock()
212214
onDelete := slices.Clone(s.onDelete)
213215
s.onDeleteMu.Unlock()
214216

215217
for height := from; height < to; height++ {
216-
if err := s.deleteSingle(ctx, height, batch, onDelete); err != nil {
217-
return height, err
218+
err := s.deleteSingle(ctx, height, onDelete)
219+
if errors.Is(err, datastore.ErrNotFound) {
220+
missing++
221+
log.Debugw("attempt to delete header that's not found", "height", height)
222+
} else if err != nil {
223+
return height, missing, err
218224
}
219225
}
220226

221-
return to, nil
227+
return to, missing, nil
222228
}
223229

224230
// deleteParallel deletes [from:to) header range from the store in parallel
225231
// and returns the highest unprocessed height: 'to' in success case or the failed height in error case.
226-
func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, error) {
232+
func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, int, error) {
227233
now := time.Now()
228234
s.onDeleteMu.Lock()
229235
onDelete := slices.Clone(s.onDelete)
@@ -235,13 +241,16 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64,
235241
// require too much RAM, yet shows good performance.
236242
workerNum := runtime.GOMAXPROCS(-1) * 3
237243

238-
log.Infow(
239-
"deleting range parallel", "from_height", from, "to_height", to, "worker_num", workerNum,
244+
log.Infow("deleting range parallel",
245+
"from_height", from,
246+
"to_height", to,
247+
"worker_num", workerNum,
240248
)
241249

242250
type result struct {
243-
height uint64
244-
err error
251+
missing int
252+
height uint64
253+
err error
245254
}
246255
results := make([]result, workerNum)
247256
jobCh := make(chan uint64, workerNum)
@@ -260,24 +269,25 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64,
260269
}
261270
}()
262271

263-
batch, err := s.ds.Batch(ctx)
264-
if err != nil {
265-
last.err = fmt.Errorf("new batch: %w", err)
266-
return
267-
}
268-
ctx = contextds.WithWrite(ctx, batch)
272+
workerCtx, done := s.withWriteBatch(ctx)
273+
defer func() {
274+
if err := done(); err != nil {
275+
last.err = errors.Join(last.err, fmt.Errorf("committing delete batch: %w", err))
276+
}
277+
}()
278+
workerCtx, doneTx := s.withReadTransaction(workerCtx)
279+
defer doneTx()
269280

270281
for height := range jobCh {
271282
last.height = height
272-
last.err = s.deleteSingle(ctx, height, batch, onDelete)
273-
if last.err != nil {
283+
last.err = s.deleteSingle(workerCtx, height, onDelete)
284+
if errors.Is(last.err, datastore.ErrNotFound) {
285+
last.missing++
286+
log.Debugw("attempt to delete header that's not found", "height", height)
287+
} else if last.err != nil {
274288
break
275289
}
276290
}
277-
278-
if err := batch.Commit(ctx); err != nil {
279-
last.err = errors.Join(last.err, fmt.Errorf("committing delete batch: %w", err))
280-
}
281291
}
282292

283293
var wg sync.WaitGroup
@@ -309,29 +319,30 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64,
309319
return int(a.height - b.height) //nolint:gosec
310320
})
311321
// find the highest deleted height
312-
var highest uint64
322+
var (
323+
highest uint64
324+
missing int
325+
)
313326
for _, result := range results {
314327
if result.err != nil {
315328
// return the error immediately even if some higher headers may have been deleted
316329
// this ensures we set tail to the lowest errored height, s.t. retries do not shadowly miss any headers
317-
return result.height, result.err
330+
return result.height, missing, result.err
318331
}
319332

320333
if result.height > highest {
321334
highest = result.height
322335
}
336+
missing += result.missing
323337
}
324338

325339
// ensures the height after the highest deleted becomes the new tail
326340
highest++
327-
log.Infow(
328-
"deleted range parallel",
329-
"from_height",
330-
from,
331-
"to_height",
332-
to,
333-
"took(s)",
334-
time.Since(now).Seconds(),
341+
log.Infow("deleted range parallel",
342+
"from_height", from,
343+
"to_height", to,
344+
"hdrs_not_found", missing,
345+
"took(s)", time.Since(now).Seconds(),
335346
)
336-
return highest, nil
347+
return highest, missing, nil
337348
}

0 commit comments

Comments
 (0)