Skip to content

Commit b82fc6d

Browse files
committed
store: respect context
1 parent 9394773 commit b82fc6d

File tree

1 file changed

+7
-4
lines changed

1 file changed

+7
-4
lines changed

store/store.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type Store[H header.Header[H]] struct {
5959
pending *batch[H]
6060
// syncCh is a channel used to synchronize writes
6161
syncCh chan chan struct{}
62+
cancel context.CancelFunc
6263

6364
onDeleteMu sync.Mutex
6465
onDelete []func(context.Context, uint64) error
@@ -133,7 +134,10 @@ func (s *Store[H]) Start(ctx context.Context) error {
133134
return fmt.Errorf("header/store: initializing: %w", err)
134135
}
135136

136-
go s.flushLoop()
137+
ctx, cancel := context.WithCancel(ctx)
138+
s.cancel = cancel
139+
140+
go s.flushLoop(ctx)
137141
return nil
138142
}
139143

@@ -146,6 +150,7 @@ func (s *Store[H]) Stop(ctx context.Context) error {
146150
// signal to prevent further writes to Store
147151
select {
148152
case s.writes <- nil:
153+
s.cancel()
149154
case <-ctx.Done():
150155
return ctx.Err()
151156
}
@@ -428,12 +433,10 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
428433
// This way writes are controlled and manageable from one place allowing
429434
// (1) Appends not to be blocked on long disk IO writes and underlying DB compactions
430435
// (2) Batching header writes
431-
func (s *Store[H]) flushLoop() {
436+
func (s *Store[H]) flushLoop(ctx context.Context) {
432437
defer close(s.writesDn)
433-
ctx := context.Background()
434438

435439
flush := func(headers []H) {
436-
log.Debug("flush request", len(headers))
437440
s.ensureInit(headers)
438441
// add headers to the pending and ensure they are accessible
439442
s.pending.Append(headers...)

0 commit comments

Comments
 (0)