@@ -59,6 +59,7 @@ type Store[H header.Header[H]] struct {
59
59
pending * batch [H ]
60
60
// syncCh is a channel used to synchronize writes
61
61
syncCh chan chan struct {}
62
+ cancel context.CancelFunc
62
63
63
64
onDeleteMu sync.Mutex
64
65
onDelete []func (context.Context , uint64 ) error
@@ -133,7 +134,10 @@ func (s *Store[H]) Start(ctx context.Context) error {
133
134
return fmt .Errorf ("header/store: initializing: %w" , err )
134
135
}
135
136
136
- go s .flushLoop ()
137
+ ctx , cancel := context .WithCancel (ctx )
138
+ s .cancel = cancel
139
+
140
+ go s .flushLoop (ctx )
137
141
return nil
138
142
}
139
143
@@ -146,6 +150,7 @@ func (s *Store[H]) Stop(ctx context.Context) error {
146
150
// signal to prevent further writes to Store
147
151
select {
148
152
case s .writes <- nil :
153
+ s .cancel ()
149
154
case <- ctx .Done ():
150
155
return ctx .Err ()
151
156
}
@@ -431,12 +436,10 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
431
436
// This way writes are controlled and manageable from one place allowing
432
437
// (1) Appends not to be blocked on long disk IO writes and underlying DB compactions
433
438
// (2) Batching header writes
434
- func (s * Store [H ]) flushLoop () {
439
+ func (s * Store [H ]) flushLoop (ctx context. Context ) {
435
440
defer close (s .writesDn )
436
- ctx := context .Background ()
437
441
438
442
flush := func (headers []H ) {
439
- log .Debug ("flush request" , len (headers ))
440
443
s .ensureInit (headers )
441
444
// add headers to the pending and ensure they are accessible
442
445
s .pending .Append (headers ... )
0 commit comments