Skip to content

Commit 9e04c5e

Browse files
ucwongfjl
andauthored
core/bloombits: use single channel for shutdown (#20878)
This replaces the two-stage shutdown scheme with the one we use almost everywhere else: a single quit channel signalling termination. Co-authored-by: Felix Lange <[email protected]>
1 parent abf2d7d commit 9e04c5e

File tree

1 file changed

+24
-35
lines changed

1 file changed

+24
-35
lines changed

core/bloombits/matcher.go

Lines changed: 24 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,6 @@ func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uin
155155
session := &MatcherSession{
156156
matcher: m,
157157
quit: make(chan struct{}),
158-
kill: make(chan struct{}),
159158
ctx: ctx,
160159
}
161160
for _, scheduler := range m.schedulers {
@@ -386,10 +385,8 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
386385
requests = make(map[uint][]uint64) // Per-bit list of section requests, ordered by section number
387386
unallocs = make(map[uint]struct{}) // Bits with pending requests but not allocated to any retriever
388387
retrievers chan chan uint // Waiting retrievers (toggled to nil if unallocs is empty)
389-
)
390-
var (
391-
allocs int // Number of active allocations to handle graceful shutdown requests
392-
shutdown = session.quit // Shutdown request channel, will gracefully wait for pending requests
388+
allocs int // Number of active allocations to handle graceful shutdown requests
389+
shutdown = session.quit // Shutdown request channel, will gracefully wait for pending requests
393390
)
394391

395392
// assign is a helper method fo try to assign a pending bit an actively
@@ -409,15 +406,12 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
409406
for {
410407
select {
411408
case <-shutdown:
412-
// Graceful shutdown requested, wait until all pending requests are honoured
409+
// Shutdown requested. No more retrievers can be allocated,
410+
// but we still need to wait until all pending requests have returned.
411+
shutdown = nil
413412
if allocs == 0 {
414413
return
415414
}
416-
shutdown = nil
417-
418-
case <-session.kill:
419-
// Pending requests not honoured in time, hard terminate
420-
return
421415

422416
case req := <-dist:
423417
// New retrieval request arrived to be distributed to some fetcher process
@@ -499,8 +493,9 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
499493
assign(result.Bit)
500494
}
501495
}
502-
// If we're in the process of shutting down, terminate
503-
if allocs == 0 && shutdown == nil {
496+
497+
// End the session when all pending deliveries have arrived.
498+
if shutdown == nil && allocs == 0 {
504499
return
505500
}
506501
}
@@ -514,7 +509,6 @@ type MatcherSession struct {
514509

515510
closer sync.Once // Sync object to ensure we only ever close once
516511
quit chan struct{} // Quit channel to request pipeline termination
517-
kill chan struct{} // Term channel to signal non-graceful forced shutdown
518512

519513
ctx context.Context // Context used by the light client to abort filtering
520514
err atomic.Value // Global error to track retrieval failures deep in the chain
@@ -529,7 +523,6 @@ func (s *MatcherSession) Close() {
529523
s.closer.Do(func() {
530524
// Signal termination and wait for all goroutines to tear down
531525
close(s.quit)
532-
time.AfterFunc(time.Second, func() { close(s.kill) })
533526
s.pend.Wait()
534527
})
535528
}
@@ -542,10 +535,10 @@ func (s *MatcherSession) Error() error {
542535
return nil
543536
}
544537

545-
// AllocateRetrieval assigns a bloom bit index to a client process that can either
538+
// allocateRetrieval assigns a bloom bit index to a client process that can either
546539
// immediately request and fetch the section contents assigned to this bit or wait
547540
// a little while for more sections to be requested.
548-
func (s *MatcherSession) AllocateRetrieval() (uint, bool) {
541+
func (s *MatcherSession) allocateRetrieval() (uint, bool) {
549542
fetcher := make(chan uint)
550543

551544
select {
@@ -557,9 +550,9 @@ func (s *MatcherSession) AllocateRetrieval() (uint, bool) {
557550
}
558551
}
559552

560-
// PendingSections returns the number of pending section retrievals belonging to
553+
// pendingSections returns the number of pending section retrievals belonging to
561554
// the given bloom bit index.
562-
func (s *MatcherSession) PendingSections(bit uint) int {
555+
func (s *MatcherSession) pendingSections(bit uint) int {
563556
fetcher := make(chan uint)
564557

565558
select {
@@ -571,9 +564,9 @@ func (s *MatcherSession) PendingSections(bit uint) int {
571564
}
572565
}
573566

574-
// AllocateSections assigns all or part of an already allocated bit-task queue
567+
// allocateSections assigns all or part of an already allocated bit-task queue
575568
// to the requesting process.
576-
func (s *MatcherSession) AllocateSections(bit uint, count int) []uint64 {
569+
func (s *MatcherSession) allocateSections(bit uint, count int) []uint64 {
577570
fetcher := make(chan *Retrieval)
578571

579572
select {
@@ -589,14 +582,10 @@ func (s *MatcherSession) AllocateSections(bit uint, count int) []uint64 {
589582
}
590583
}
591584

592-
// DeliverSections delivers a batch of section bit-vectors for a specific bloom
585+
// deliverSections delivers a batch of section bit-vectors for a specific bloom
593586
// bit index to be injected into the processing pipeline.
594-
func (s *MatcherSession) DeliverSections(bit uint, sections []uint64, bitsets [][]byte) {
595-
select {
596-
case <-s.kill:
597-
return
598-
case s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets}:
599-
}
587+
func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets [][]byte) {
588+
s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets}
600589
}
601590

602591
// Multiplex polls the matcher session for retrieval tasks and multiplexes it into
@@ -608,31 +597,31 @@ func (s *MatcherSession) DeliverSections(bit uint, sections []uint64, bitsets []
608597
func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
609598
for {
610599
// Allocate a new bloom bit index to retrieve data for, stopping when done
611-
bit, ok := s.AllocateRetrieval()
600+
bit, ok := s.allocateRetrieval()
612601
if !ok {
613602
return
614603
}
615604
// Bit allocated, throttle a bit if we're below our batch limit
616-
if s.PendingSections(bit) < batch {
605+
if s.pendingSections(bit) < batch {
617606
select {
618607
case <-s.quit:
619608
// Session terminating, we can't meaningfully service, abort
620-
s.AllocateSections(bit, 0)
621-
s.DeliverSections(bit, []uint64{}, [][]byte{})
609+
s.allocateSections(bit, 0)
610+
s.deliverSections(bit, []uint64{}, [][]byte{})
622611
return
623612

624613
case <-time.After(wait):
625614
// Throttling up, fetch whatever's available
626615
}
627616
}
628617
// Allocate as much as we can handle and request servicing
629-
sections := s.AllocateSections(bit, batch)
618+
sections := s.allocateSections(bit, batch)
630619
request := make(chan *Retrieval)
631620

632621
select {
633622
case <-s.quit:
634623
// Session terminating, we can't meaningfully service, abort
635-
s.DeliverSections(bit, sections, make([][]byte, len(sections)))
624+
s.deliverSections(bit, sections, make([][]byte, len(sections)))
636625
return
637626

638627
case mux <- request:
@@ -644,7 +633,7 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
644633
s.err.Store(result.Error)
645634
s.Close()
646635
}
647-
s.DeliverSections(result.Bit, result.Sections, result.Bitsets)
636+
s.deliverSections(result.Bit, result.Sections, result.Bitsets)
648637
}
649638
}
650639
}

0 commit comments

Comments
 (0)