Skip to content

Commit 03ded0d

Browse files
authored
misc(store): various fixes and chores (#330)
* Fixes leftover bug from #326 * refactors store wiping case to avoid silent failure for setTail * fixes two bugs in deleteRange * context shadowing bug causing context to cancel prematurely * closed of a closed channel panic when multiple workers error * error wrappings * more logging improvements
1 parent 5776877 commit 03ded0d

File tree

3 files changed

+41
-20
lines changed

3 files changed

+41
-20
lines changed

store/store.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ func (s *Store[H]) Stop(ctx context.Context) error {
149149
// signal to prevent further writes to Store
150150
select {
151151
case s.writes <- nil:
152+
s.cancel()
152153
case <-ctx.Done():
153154
return ctx.Err()
154155
}
@@ -362,9 +363,6 @@ func (s *Store[H]) HasAt(ctx context.Context, height uint64) bool {
362363

363364
func (s *Store[H]) setTail(ctx context.Context, write datastore.Write, to uint64) error {
364365
newTail, err := s.getByHeight(ctx, to)
365-
if errors.Is(err, header.ErrNotFound) {
366-
return nil
367-
}
368366
if err != nil {
369367
return fmt.Errorf("getting tail: %w", err)
370368
}

store/store_delete.go

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,16 +70,18 @@ func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error {
7070
return fmt.Errorf("header/store: delete to %d below current tail(%d)", to, tail.Height())
7171
}
7272

73-
if err := s.deleteRange(ctx, tail.Height(), to); err != nil {
74-
return fmt.Errorf("header/store: delete to height %d: %w", to, err)
75-
}
76-
77-
if head.Height()+1 == to {
73+
err = s.deleteRange(ctx, tail.Height(), to)
74+
if errors.Is(err, header.ErrNotFound) && head.Height()+1 == to {
7875
// this is the case where we have deleted all the headers
7976
// wipe the store
8077
if err := s.wipe(ctx); err != nil {
8178
return fmt.Errorf("header/store: wipe: %w", err)
8279
}
80+
81+
return nil
82+
}
83+
if err != nil {
84+
return fmt.Errorf("header/store: delete to height %d: %w", to, err)
8385
}
8486

8587
return nil
@@ -117,28 +119,29 @@ func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error)
117119
)
118120
}
119121
} else if to-from > 1 {
120-
log.Debugw("deleted headers", "from_height", from, "to_height", to, "took", time.Since(startTime))
122+
log.Debugw("deleted headers", "from_height", from, "to_height", to, "took(s)", time.Since(startTime).Seconds())
121123
}
122124

123125
if derr := s.setTail(ctx, s.ds, height); derr != nil {
124126
err = errors.Join(err, fmt.Errorf("setting tail to %d: %w", height, derr))
125127
}
126128
}()
127129

130+
deleteCtx := ctx
128131
if deadline, ok := ctx.Deadline(); ok {
129132
// allocate 95% of caller's set deadline for deletion
130133
// and give leftover to save progress
131134
// this prevents store's state corruption from partial deletion
132135
sub := deadline.Sub(startTime) / 100 * 95
133136
var cancel context.CancelFunc
134-
ctx, cancel = context.WithDeadlineCause(ctx, startTime.Add(sub), errDeleteTimeout)
137+
deleteCtx, cancel = context.WithDeadlineCause(ctx, startTime.Add(sub), errDeleteTimeout)
135138
defer cancel()
136139
}
137140

138141
if to-from < deleteRangeParallelThreshold {
139-
height, err = s.deleteSequential(ctx, from, to)
142+
height, err = s.deleteSequential(deleteCtx, from, to)
140143
} else {
141-
height, err = s.deleteParallel(ctx, from, to)
144+
height, err = s.deleteParallel(deleteCtx, from, to)
142145
}
143146

144147
return err
@@ -159,7 +162,7 @@ func (s *Store[H]) deleteSingle(
159162

160163
hash, err := s.heightIndex.HashByHeight(ctx, height, false)
161164
if errors.Is(err, datastore.ErrNotFound) {
162-
log.Warnw("attempt to delete header that's not found", "height", height)
165+
log.Debugw("attempt to delete header that's not found", "height", height)
163166
return nil
164167
}
165168
if err != nil {
@@ -219,19 +222,21 @@ func (s *Store[H]) deleteSequential(
219222
// deleteParallel deletes [from:to) header range from the store in parallel
220223
// and returns the highest unprocessed height: 'to' in success case or the failed height in error case.
221224
func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, error) {
222-
log.Debugw("starting delete range parallel", "from_height", from, "to_height", to)
223-
225+
now := time.Now()
224226
s.onDeleteMu.Lock()
225227
onDelete := slices.Clone(s.onDelete)
226228
s.onDeleteMu.Unlock()
227-
228229
// workerNum defines how many parallel delete workers to run
229230
// Scales of number of CPUs configured for the process.
230231
// Usually, it's recommended to have 2-4 multiplier for the number of CPUs for
231232
// IO operations. Three was picked empirically to be a sweet spot that doesn't
232233
// require too much RAM, yet shows good performance.
233234
workerNum := runtime.GOMAXPROCS(-1) * 3
234235

236+
log.Infow(
237+
"deleting range parallel", "from_height", from, "to_height", to, "worker_num", workerNum,
238+
)
239+
235240
type result struct {
236241
height uint64
237242
err error
@@ -245,7 +250,11 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64,
245250
defer func() {
246251
results[worker] = last
247252
if last.err != nil {
248-
close(errCh)
253+
select {
254+
case <-errCh:
255+
default:
256+
close(errCh)
257+
}
249258
}
250259
}()
251260

@@ -277,11 +286,11 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64,
277286
}(i)
278287
}
279288

280-
for i, height := 0, from; height < to; height++ {
289+
for i, height := uint64(0), from; height < to; height++ {
281290
select {
282291
case jobCh <- height:
283292
i++
284-
if uint64(1)%deleteRangeParallelThreshold == 0 {
293+
if i%deleteRangeParallelThreshold == 0 {
285294
log.Debugf("deleting %dth header height %d", deleteRangeParallelThreshold, height)
286295
}
287296
case <-errCh:
@@ -312,5 +321,14 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64,
312321

313322
// ensures the height after the highest deleted becomes the new tail
314323
highest++
324+
log.Infow(
325+
"deleted range parallel",
326+
"from_height",
327+
from,
328+
"to_height",
329+
to,
330+
"took(s)",
331+
time.Since(now).Seconds(),
332+
)
315333
return highest, nil
316334
}

sync/syncer_head.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,12 @@ func (s *Syncer[H]) localHead(ctx context.Context) (H, error) {
178178
return pendHead, nil
179179
}
180180
// if pending is empty - get the latest stored/synced head
181-
return s.store.Head(ctx)
181+
head, err := s.store.Head(ctx)
182+
if err != nil {
183+
return head, fmt.Errorf("local store head: %w", err)
184+
}
185+
186+
return head, nil
182187
}
183188

184189
// setLocalHead takes the already validated head and sets it as the new sync target.

0 commit comments

Comments
 (0)