Skip to content

Commit 7da48a5

Browse files
committed
store: respect context
1 parent ad137b1 commit 7da48a5

File tree

1 file changed

+7
-4
lines changed

1 file changed

+7
-4
lines changed

store/store.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type Store[H header.Header[H]] struct {
5959
pending *batch[H]
6060
// syncCh is a channel used to synchronize writes
6161
syncCh chan chan struct{}
62+
cancel context.CancelFunc
6263

6364
onDeleteMu sync.Mutex
6465
onDelete []func(context.Context, uint64) error
@@ -133,7 +134,10 @@ func (s *Store[H]) Start(ctx context.Context) error {
133134
return fmt.Errorf("header/store: initializing: %w", err)
134135
}
135136

136-
go s.flushLoop()
137+
ctx, cancel := context.WithCancel(ctx)
138+
s.cancel = cancel
139+
140+
go s.flushLoop(ctx)
137141
return nil
138142
}
139143

@@ -146,6 +150,7 @@ func (s *Store[H]) Stop(ctx context.Context) error {
146150
// signal to prevent further writes to Store
147151
select {
148152
case s.writes <- nil:
153+
s.cancel()
149154
case <-ctx.Done():
150155
return ctx.Err()
151156
}
@@ -431,12 +436,10 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
431436
// This way writes are controlled and manageable from one place allowing
432437
// (1) Appends not to be blocked on long disk IO writes and underlying DB compactions
433438
// (2) Batching header writes
434-
func (s *Store[H]) flushLoop() {
439+
func (s *Store[H]) flushLoop(ctx context.Context) {
435440
defer close(s.writesDn)
436-
ctx := context.Background()
437441

438442
flush := func(headers []H) {
439-
log.Debug("flush request", len(headers))
440443
s.ensureInit(headers)
441444
// add headers to the pending and ensure they are accessible
442445
s.pending.Append(headers...)

0 commit comments

Comments
 (0)