Skip to content

Commit b8b73dd

Browse files
committed
unify more and fix error case bug
1 parent 6585d33 commit b8b73dd

File tree

1 file changed

+126
-110
lines changed

1 file changed

+126
-110
lines changed

store/store_delete.go

Lines changed: 126 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"runtime"
88
"slices"
99
"sync"
10-
"sync/atomic"
1110
"time"
1211

1312
"github.com/ipfs/go-datastore"
@@ -88,53 +87,66 @@ func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error {
8887

8988
// deleteRangeParallelThreshold defines the threshold for parallel deletion.
9089
// If range is smaller than this threshold, deletion will be performed sequentially.
91-
var deleteRangeParallelThreshold uint64 = 10000
90+
var (
91+
deleteRangeParallelThreshold uint64 = 10000
92+
errDeleteTimeout = errors.New("delete timeout")
93+
)
9294

9395
// deleteRange deletes [from:to) header range from the store.
94-
func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) error {
95-
if to-from < deleteRangeParallelThreshold {
96-
return s.deleteSequential(ctx, from, to)
97-
}
98-
99-
return s.deleteParallel(ctx, from, to)
100-
}
101-
102-
// deleteSequential deletes [from:to) header range from the store sequentially.
103-
func (s *Store[H]) deleteSequential(ctx context.Context, from, to uint64) (err error) {
104-
log.Debugw("starting delete range sequential", "from_height", from, "to_height", to)
105-
106-
batch, err := s.ds.Batch(ctx)
107-
if err != nil {
108-
return fmt.Errorf("new batch: %w", err)
109-
}
110-
// ctx = badger4.WithBatch(ctx, batch)
96+
// It gracefully handles context and errors attempting to save interrupted progress.
97+
func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error) {
98+
startTime := time.Now()
11199

112-
height := from
100+
var height uint64
113101
defer func() {
114-
if derr := s.setTail(ctx, s.ds, height); derr != nil {
115-
err = errors.Join(err, fmt.Errorf("setting tail to %d: %w", height, derr))
102+
if err != nil {
103+
if errors.Is(err, errDeleteTimeout) {
104+
log.Warnw("partial delete",
105+
"from_height", from,
106+
"expected_to_height", to,
107+
"actual_to_height", height,
108+
"took(s)", time.Since(startTime),
109+
)
110+
} else {
111+
log.Errorw("partial delete with error",
112+
"from_height", from,
113+
"expected_to_height", to,
114+
"actual_to_height", height,
115+
"took(s)", time.Since(startTime),
116+
"err", err,
117+
)
118+
}
119+
} else if to-from > 1 {
120+
log.Debugw("deleted headers", "from_height", from, "to_height", to, "took", time.Since(startTime))
116121
}
117122

118-
if derr := batch.Commit(ctx); derr != nil {
119-
err = errors.Join(err, fmt.Errorf("committing batch: %w", derr))
123+
if derr := s.setTail(ctx, s.ds, height); derr != nil {
124+
err = errors.Join(err, fmt.Errorf("setting tail to %d: %w", height, derr))
120125
}
121126
}()
122127

123-
s.onDeleteMu.Lock()
124-
onDelete := slices.Clone(s.onDelete)
125-
s.onDeleteMu.Unlock()
128+
if deadline, ok := ctx.Deadline(); ok {
129+
// allocate 95% of caller's set deadline for deletion
130+
// and give leftover to save progress
131+
// this prevents store's state corruption from partial deletion
132+
sub := deadline.Sub(startTime) / 100 * 95
133+
var cancel context.CancelFunc
134+
ctx, cancel = context.WithDeadlineCause(ctx, startTime.Add(sub), errDeleteTimeout)
135+
defer cancel()
136+
}
126137

127-
for ; height < to; height++ {
128-
if err := s.delete(ctx, height, batch, onDelete); err != nil {
129-
return err
130-
}
138+
if to-from < deleteRangeParallelThreshold {
139+
height, err = s.deleteSequential(ctx, from, to)
140+
} else {
141+
height, err = s.deleteParallel(ctx, from, to)
131142
}
132143

133144
return nil
134145
}
135146

136-
// delete deletes a single header from the store, its caches and indexies, notifying any registered onDelete handlers.
137-
func (s *Store[H]) delete(
147+
// deleteSingle deletes a single header from the store,
148+
// its caches and indexies, notifying any registered onDelete handlers.
149+
func (s *Store[H]) deleteSingle(
138150
ctx context.Context,
139151
height uint64,
140152
batch datastore.Batch,
@@ -173,53 +185,41 @@ func (s *Store[H]) delete(
173185
return nil
174186
}
175187

176-
// deleteParallel deletes [from:to) header range from the store in parallel.
177-
// It gracefully handles context and errors attempting to save interrupted progress.
178-
func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (err error) {
179-
log.Debugw("starting delete range parallel", "from_height", from, "to_height", to)
180-
181-
deleteCtx, cancel := context.WithCancel(ctx)
182-
defer cancel()
188+
// deleteSequential deletes [from:to) header range from the store sequentially
189+
// and returns the highest unprocessed height: 'to' in success case or the failed height in error case.
190+
func (s *Store[H]) deleteSequential(
191+
ctx context.Context,
192+
from, to uint64,
193+
) (highest uint64, err error) {
194+
log.Debugw("starting delete range sequential", "from_height", from, "to_height", to)
183195

184-
startTime := time.Now()
185-
if deadline, ok := ctx.Deadline(); ok {
186-
// allocate 95% of caller's set deadline for deletion
187-
// and give leftover to save progress
188-
// this prevents store's state corruption from partial deletion
189-
sub := deadline.Sub(startTime) / 100 * 95
190-
var cancel context.CancelFunc
191-
deleteCtx, cancel = context.WithDeadlineCause(ctx, startTime.Add(sub), errDeleteTimeout)
192-
defer cancel()
196+
batch, err := s.ds.Batch(ctx)
197+
if err != nil {
198+
return 0, fmt.Errorf("new batch: %w", err)
193199
}
194-
195-
var highestDeleted atomic.Uint64
196200
defer func() {
197-
newTailHeight := highestDeleted.Load() + 1
198-
if err != nil {
199-
if errors.Is(err, errDeleteTimeout) {
200-
log.Warnw("partial delete",
201-
"from_height", from,
202-
"expected_to_height", to,
203-
"actual_to_height", newTailHeight,
204-
"took(s)", time.Since(startTime),
205-
)
206-
} else {
207-
log.Errorw("partial delete with error",
208-
"from_height", from,
209-
"expected_to_height", to,
210-
"actual_to_height", newTailHeight,
211-
"took(s)", time.Since(startTime),
212-
"err", err,
213-
)
214-
}
215-
} else if to-from > 1 {
216-
log.Infow("deleted headers", "from_height", from, "to_height", to, "took", time.Since(startTime))
201+
if derr := batch.Commit(ctx); derr != nil {
202+
err = errors.Join(err, fmt.Errorf("committing batch: %w", derr))
217203
}
204+
}()
205+
206+
s.onDeleteMu.Lock()
207+
onDelete := slices.Clone(s.onDelete)
208+
s.onDeleteMu.Unlock()
218209

219-
if derr := s.setTail(ctx, s.ds, newTailHeight); derr != nil {
220-
err = errors.Join(err, fmt.Errorf("setting tail to %d: %w", newTailHeight, derr))
210+
for height := from; height < to; height++ {
211+
if err := s.deleteSingle(ctx, height, batch, onDelete); err != nil {
212+
return height, err
221213
}
222-
}()
214+
}
215+
216+
return to, nil
217+
}
218+
219+
// deleteParallel deletes [from:to) header range from the store in parallel
220+
// and returns the highest unprocessed height: 'to' in success case or the failed height in error case.
221+
func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, error) {
222+
log.Debugw("starting delete range parallel", "from_height", from, "to_height", to)
223223

224224
s.onDeleteMu.Lock()
225225
onDelete := slices.Clone(s.onDelete)
@@ -232,53 +232,50 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (err err
232232
// require too much RAM, yet shows good performance.
233233
workerNum := runtime.GOMAXPROCS(-1) * 3
234234

235+
type result struct {
236+
height uint64
237+
err error
238+
}
239+
results := make([]result, workerNum)
235240
jobCh := make(chan uint64, workerNum)
236-
errCh := make(chan error, 1)
237-
238-
worker := func() {
239-
batch, err := s.ds.Batch(ctx)
240-
if err != nil {
241-
errCh <- fmt.Errorf("new batch: %w", err)
242-
return
243-
}
244-
// deleteCtx := badger4.WithBatch(deleteCtx, batch)
241+
errCh := make(chan error)
245242

243+
worker := func(worker int) {
244+
var last result
246245
defer func() {
247-
if err := batch.Commit(ctx); err != nil {
248-
errCh <- fmt.Errorf("committing delete batch: %w", err)
246+
results[worker] = last
247+
if last.err != nil {
248+
close(errCh)
249249
}
250250
}()
251251

252-
var lastHeight uint64
253-
defer func() {
254-
highest := highestDeleted.Load()
255-
for lastHeight > highest && !highestDeleted.CompareAndSwap(highest, lastHeight) {
256-
highest = highestDeleted.Load()
257-
}
258-
}()
252+
batch, err := s.ds.Batch(ctx)
253+
if err != nil {
254+
last.err = fmt.Errorf("new batch: %w", err)
255+
return
256+
}
259257

260258
for height := range jobCh {
261-
if err := s.delete(deleteCtx, height, batch, onDelete); err != nil {
262-
select {
263-
case errCh <- fmt.Errorf("delete header %d: %w", height, err):
264-
default:
265-
}
266-
return
259+
last.height = height
260+
last.err = s.deleteSingle(ctx, height, batch, onDelete)
261+
if last.err != nil {
262+
break
267263
}
264+
}
268265

269-
lastHeight = height
266+
if err := batch.Commit(ctx); err != nil {
267+
last.err = errors.Join(last.err, fmt.Errorf("committing delete batch: %w", err))
270268
}
271269
}
272270

273271
var wg sync.WaitGroup
274272
wg.Add(workerNum)
275-
for range workerNum {
276-
go func() {
273+
for i := range workerNum {
274+
go func(i int) {
277275
defer wg.Done()
278-
worker()
279-
}()
276+
worker(i)
277+
}(i)
280278
}
281-
defer wg.Wait()
282279

283280
for i, height := 0, from; height < to; height++ {
284281
select {
@@ -287,14 +284,33 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (err err
287284
if uint64(1)%deleteRangeParallelThreshold == 0 {
288285
log.Debugf("deleting %dth header height %d", deleteRangeParallelThreshold, height)
289286
}
290-
case err = <-errCh:
291-
close(jobCh)
292-
return err
287+
case <-errCh:
293288
}
294289
}
295290

291+
// tell workers to stop
296292
close(jobCh)
297-
return err
298-
}
293+
// await all workers to finish
294+
wg.Wait()
295+
// ensure results are sorted in ascending order of heights
296+
slices.SortFunc(results, func(a, b result) int {
297+
return int(a.height - b.height) //nolint:gosec
298+
})
299+
// find the highest deleted height
300+
var highest uint64
301+
for _, result := range results {
302+
if result.err != nil {
303+
// return the error immediately even if some higher headers may have been deleted
304+
// this ensures we set tail to the lowest errored height, s.t. retries do not shadowly miss any headers
305+
return result.height, result.err
306+
}
299307

300-
var errDeleteTimeout = errors.New("delete timeout")
308+
if result.height > highest {
309+
highest = result.height
310+
}
311+
}
312+
313+
// ensures the height after the highest deleted becomes the new tail
314+
highest++
315+
return highest, nil
316+
}

0 commit comments

Comments
 (0)