1717package enode
1818
1919import (
20+ "context"
2021 "sync"
2122 "time"
2223)
@@ -59,6 +60,11 @@ func (it sourceIter) NodeSource() string {
5960 return it .name
6061}
6162
63+ type iteratorItem struct {
64+ n * Node
65+ source string
66+ }
67+
6268// ReadNodes reads at most n nodes from the given iterator. The return value contains no
6369// duplicates and no nil values. To prevent looping indefinitely for small repeating node
6470// sequences, this function calls Next at most n times.
@@ -152,6 +158,149 @@ func (f *filterIter) Next() bool {
152158 return false
153159}
154160
161+ // asyncFilterIter wraps an iterator such that Next only returns nodes for which
162+ // the 'check' function returns a (possibly modified) node.
163+ type asyncFilterIter struct {
164+ it SourceIterator // the iterator to filter
165+ slots chan struct {} // the slots for parallel checking
166+ passed chan iteratorItem // channel to collect passed nodes
167+ cur iteratorItem // buffer to serve the Node call
168+ cancel context.CancelFunc
169+ closeOnce sync.Once
170+ }
171+
172+ type AsyncFilterFunc func (context.Context , * Node ) * Node
173+
174+ // AsyncFilter creates an iterator which checks nodes in parallel.
175+ // The 'check' function is called on multiple goroutines to filter each node
176+ // from the upstream iterator. When check returns nil, the node will be skipped.
177+ // It can also return a new node to be returned by the iterator instead of the .
178+ func AsyncFilter (it Iterator , check AsyncFilterFunc , workers int ) Iterator {
179+ f := & asyncFilterIter {
180+ it : ensureSourceIter (it ),
181+ slots : make (chan struct {}, workers + 1 ),
182+ passed : make (chan iteratorItem ),
183+ }
184+ for range cap (f .slots ) {
185+ f .slots <- struct {}{}
186+ }
187+ ctx , cancel := context .WithCancel (context .Background ())
188+ f .cancel = cancel
189+
190+ go func () {
191+ select {
192+ case <- ctx .Done ():
193+ return
194+ case <- f .slots :
195+ }
196+ // read from the iterator and start checking nodes in parallel
197+ // when a node is checked, it will be sent to the passed channel
198+ // and the slot will be released
199+ for f .it .Next () {
200+ node := f .it .Node ()
201+ nodeSource := f .it .NodeSource ()
202+
203+ // check the node async, in a separate goroutine
204+ <- f .slots
205+ go func () {
206+ if nn := check (ctx , node ); nn != nil {
207+ item := iteratorItem {nn , nodeSource }
208+ select {
209+ case f .passed <- item :
210+ case <- ctx .Done (): // bale out if downstream is already closed and not calling Next
211+ }
212+ }
213+ f .slots <- struct {}{}
214+ }()
215+ }
216+ // the iterator has ended
217+ f .slots <- struct {}{}
218+ }()
219+
220+ return f
221+ }
222+
223+ // Next blocks until a node is available or the iterator is closed.
224+ func (f * asyncFilterIter ) Next () bool {
225+ var ok bool
226+ f .cur , ok = <- f .passed
227+ return ok
228+ }
229+
230+ // Node returns the current node.
231+ func (f * asyncFilterIter ) Node () * Node {
232+ return f .cur .n
233+ }
234+
235+ // NodeSource implements IteratorSource.
236+ func (f * asyncFilterIter ) NodeSource () string {
237+ return f .cur .source
238+ }
239+
240+ // Close ends the iterator, also closing the wrapped iterator.
241+ func (f * asyncFilterIter ) Close () {
242+ f .closeOnce .Do (func () {
243+ f .it .Close ()
244+ f .cancel ()
245+ for range cap (f .slots ) {
246+ <- f .slots
247+ }
248+ close (f .slots )
249+ close (f .passed )
250+ })
251+ }
252+
253+ // bufferIter wraps an iterator and buffers the nodes it returns.
254+ // The buffer is pre-filled with the given size from the wrapped iterator.
255+ type bufferIter struct {
256+ it SourceIterator
257+ buffer chan iteratorItem
258+ head iteratorItem
259+ closeOnce sync.Once
260+ }
261+
262+ // NewBufferIter creates a new pre-fetch buffer of a given size.
263+ func NewBufferIter (it Iterator , size int ) Iterator {
264+ b := bufferIter {
265+ it : ensureSourceIter (it ),
266+ buffer : make (chan iteratorItem , size ),
267+ }
268+
269+ go func () {
270+ // if the wrapped iterator ends, the buffer content will still be served.
271+ defer close (b .buffer )
272+ // If instead the bufferIterator is closed, we bail out of the loop.
273+ for b .it .Next () {
274+ item := iteratorItem {b .it .Node (), b .it .NodeSource ()}
275+ b .buffer <- item
276+ }
277+ }()
278+ return & b
279+ }
280+
281+ func (b * bufferIter ) Next () bool {
282+ var ok bool
283+ b .head , ok = <- b .buffer
284+ return ok
285+ }
286+
287+ func (b * bufferIter ) Node () * Node {
288+ return b .head .n
289+ }
290+
291+ func (b * bufferIter ) NodeSource () string {
292+ return b .head .source
293+ }
294+
295+ func (b * bufferIter ) Close () {
296+ b .closeOnce .Do (func () {
297+ b .it .Close ()
298+ // Drain buffer and wait for the goroutine to end.
299+ for range b .buffer {
300+ }
301+ })
302+ }
303+
155304// FairMix aggregates multiple node iterators. The mixer itself is an iterator which ends
156305// only when Close is called. Source iterators added via AddSource are removed from the
157306// mix when they end.
@@ -164,9 +313,9 @@ func (f *filterIter) Next() bool {
164313// It's safe to call AddSource and Close concurrently with Next.
165314type FairMix struct {
166315 wg sync.WaitGroup
167- fromAny chan mixItem
316+ fromAny chan iteratorItem
168317 timeout time.Duration
169- cur mixItem
318+ cur iteratorItem
170319
171320 mu sync.Mutex
172321 closed chan struct {}
@@ -176,15 +325,10 @@ type FairMix struct {
176325
177326type mixSource struct {
178327 it SourceIterator
179- next chan mixItem
328+ next chan iteratorItem
180329 timeout time.Duration
181330}
182331
183- type mixItem struct {
184- n * Node
185- source string
186- }
187-
188332// NewFairMix creates a mixer.
189333//
190334// The timeout specifies how long the mixer will wait for the next fairly-chosen source
@@ -193,7 +337,7 @@ type mixItem struct {
193337// timeout makes the mixer completely fair.
194338func NewFairMix (timeout time.Duration ) * FairMix {
195339 m := & FairMix {
196- fromAny : make (chan mixItem ),
340+ fromAny : make (chan iteratorItem ),
197341 closed : make (chan struct {}),
198342 timeout : timeout ,
199343 }
@@ -211,7 +355,7 @@ func (m *FairMix) AddSource(it Iterator) {
211355 m .wg .Add (1 )
212356 source := & mixSource {
213357 it : ensureSourceIter (it ),
214- next : make (chan mixItem ),
358+ next : make (chan iteratorItem ),
215359 timeout : m .timeout ,
216360 }
217361 m .sources = append (m .sources , source )
@@ -239,7 +383,7 @@ func (m *FairMix) Close() {
239383
240384// Next returns a node from a random source.
241385func (m * FairMix ) Next () bool {
242- m .cur = mixItem {}
386+ m .cur = iteratorItem {}
243387
244388 for {
245389 source := m .pickSource ()
@@ -327,7 +471,7 @@ func (m *FairMix) runSource(closed chan struct{}, s *mixSource) {
327471 defer m .wg .Done ()
328472 defer close (s .next )
329473 for s .it .Next () {
330- item := mixItem {s .it .Node (), s .it .NodeSource ()}
474+ item := iteratorItem {s .it .Node (), s .it .NodeSource ()}
331475 select {
332476 case s .next <- item :
333477 case m .fromAny <- item :
0 commit comments