Skip to content

Commit 9cdfc7b

Browse files
committed
chanutils+log: add concurrent queue impl
1 parent 66599be commit 9cdfc7b

File tree

3 files changed

+171
-0
lines changed

3 files changed

+171
-0
lines changed

chanutils/log.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package chanutils
2+
3+
import "github.com/btcsuite/btclog"
4+
5+
// log is a logger that is initialized with no output filters. This
6+
// means the package will not perform any logging by default until the caller
7+
// requests it.
8+
var log btclog.Logger
9+
10+
// The default amount of logging is none.
11+
func init() {
12+
DisableLog()
13+
}
14+
15+
// DisableLog disables all library log output. Logging output is disabled
16+
// by default until either UseLogger or SetLogWriter are called.
17+
func DisableLog() {
18+
UseLogger(btclog.Disabled)
19+
}
20+
21+
// UseLogger uses a specified Logger to output package logging info.
22+
// This should be used in preference to SetLogWriter if the caller is also
23+
// using btclog.
24+
func UseLogger(logger btclog.Logger) {
25+
log = logger
26+
}

chanutils/queue.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package chanutils
2+
3+
import (
4+
"sync"
5+
6+
"github.com/lightninglabs/neutrino/cache/lru"
7+
)
8+
9+
const (
10+
// DefaultQueueSize is the default size to use for concurrent queues.
11+
DefaultQueueSize = 10
12+
)
13+
14+
// ConcurrentQueue is a typed concurrent-safe FIFO queue with unbounded
15+
// capacity. Clients interact with the queue by pushing items into the in
16+
// channel and popping items from the out channel. There is a goroutine that
17+
// manages moving items from the in channel to the out channel in the correct
18+
// order that must be started by calling Start().
19+
type ConcurrentQueue[T any] struct {
20+
started sync.Once
21+
stopped sync.Once
22+
23+
chanIn chan T
24+
chanOut chan T
25+
overflow *lru.List[T]
26+
27+
wg sync.WaitGroup
28+
quit chan struct{}
29+
}
30+
31+
// NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is
32+
// the capacity of the output channel. When the size of the queue is below this
33+
// threshold, pushes do not incur the overhead of the less efficient overflow
34+
// structure.
35+
func NewConcurrentQueue[T any](bufferSize int) *ConcurrentQueue[T] {
36+
return &ConcurrentQueue[T]{
37+
chanIn: make(chan T),
38+
chanOut: make(chan T, bufferSize),
39+
overflow: lru.NewList[T](),
40+
quit: make(chan struct{}),
41+
}
42+
}
43+
44+
// ChanIn returns a channel that can be used to push new items into the queue.
45+
func (cq *ConcurrentQueue[T]) ChanIn() chan<- T {
46+
return cq.chanIn
47+
}
48+
49+
// ChanOut returns a channel that can be used to pop items from the queue.
50+
func (cq *ConcurrentQueue[T]) ChanOut() <-chan T {
51+
return cq.chanOut
52+
}
53+
54+
// Start begins a goroutine that manages moving items from the in channel to the
55+
// out channel. The queue tries to move items directly to the out channel
56+
// minimize overhead, but if the out channel is full it pushes items to an
57+
// overflow queue. This must be called before using the queue.
58+
func (cq *ConcurrentQueue[T]) Start() {
59+
cq.started.Do(cq.start)
60+
}
61+
62+
func (cq *ConcurrentQueue[T]) start() {
63+
cq.wg.Add(1)
64+
go func() {
65+
defer cq.wg.Done()
66+
67+
readLoop:
68+
for {
69+
nextElement := cq.overflow.Front()
70+
if nextElement == nil {
71+
// Overflow queue is empty so incoming items can
72+
// be pushed directly to the output channel. If
73+
// output channel is full though, push to
74+
// overflow.
75+
select {
76+
case item, ok := <-cq.chanIn:
77+
if !ok {
78+
log.Warnf("ConcurrentQueue " +
79+
"has exited due to " +
80+
"the input channel " +
81+
"being closed")
82+
83+
break readLoop
84+
}
85+
select {
86+
case cq.chanOut <- item:
87+
// Optimistically push directly
88+
// to chanOut.
89+
default:
90+
cq.overflow.PushBack(item)
91+
}
92+
case <-cq.quit:
93+
return
94+
}
95+
} else {
96+
// Overflow queue is not empty, so any new items
97+
// get pushed to the back to preserve order.
98+
select {
99+
case item, ok := <-cq.chanIn:
100+
if !ok {
101+
log.Warnf("ConcurrentQueue " +
102+
"has exited due to " +
103+
"the input channel " +
104+
"being closed")
105+
106+
break readLoop
107+
}
108+
cq.overflow.PushBack(item)
109+
case cq.chanOut <- nextElement.Value:
110+
cq.overflow.Remove(nextElement)
111+
case <-cq.quit:
112+
return
113+
}
114+
}
115+
}
116+
117+
// Incoming channel has been closed. Empty overflow queue into
118+
// the outgoing channel.
119+
nextElement := cq.overflow.Front()
120+
for nextElement != nil {
121+
select {
122+
case cq.chanOut <- nextElement.Value:
123+
cq.overflow.Remove(nextElement)
124+
case <-cq.quit:
125+
return
126+
}
127+
nextElement = cq.overflow.Front()
128+
}
129+
130+
// Close outgoing channel.
131+
close(cq.chanOut)
132+
}()
133+
}
134+
135+
// Stop ends the goroutine that moves items from the in channel to the out
136+
// channel. This does not clear the queue state, so the queue can be restarted
137+
// without dropping items.
138+
func (cq *ConcurrentQueue[T]) Stop() {
139+
cq.stopped.Do(func() {
140+
close(cq.quit)
141+
cq.wg.Wait()
142+
})
143+
}

log.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/btcsuite/btcd/txscript"
99
"github.com/btcsuite/btclog"
1010
"github.com/lightninglabs/neutrino/blockntfns"
11+
"github.com/lightninglabs/neutrino/chanutils"
1112
"github.com/lightninglabs/neutrino/filterdb"
1213
"github.com/lightninglabs/neutrino/pushtx"
1314
"github.com/lightninglabs/neutrino/query"
@@ -43,4 +44,5 @@ func UseLogger(logger btclog.Logger) {
4344
connmgr.UseLogger(logger)
4445
query.UseLogger(logger)
4546
filterdb.UseLogger(logger)
47+
chanutils.UseLogger(logger)
4648
}

0 commit comments

Comments
 (0)