@@ -54,6 +54,12 @@ const (
5454 PinnedSync
5555)
5656
57+ const (
58+ // defaultTimestampQueueSize is the size of the timestamp range queue
59+ // used.
60+ defaultTimestampQueueSize = 1
61+ )
62+
5763// String returns a human readable string describing the target SyncerType.
5864func (t SyncerType ) String () string {
5965 switch t {
@@ -285,6 +291,10 @@ type gossipSyncerCfg struct {
285291 // updates for a channel and returns true if the channel should be
286292 // considered a zombie based on these timestamps.
287293 isStillZombieChannel func (time.Time , time.Time ) bool
294+
295+ // timestampQueueSize is the size of the timestamp range queue. If not
296+ // set, defaults to the global timestampQueueSize constant.
297+ timestampQueueSize int
288298}
289299
290300// GossipSyncer is a struct that handles synchronizing the channel graph state
@@ -381,6 +391,16 @@ type GossipSyncer struct {
381391 // respond to gossip timestamp range messages.
382392 syncerSema chan struct {}
383393
394+ // timestampRangeQueue is a buffered channel for queuing timestamp range
395+ // messages that need to be processed asynchronously. This prevents the
396+ // gossiper from blocking when ApplyGossipFilter is called.
397+ timestampRangeQueue chan * lnwire.GossipTimestampRange
398+
399+ // isSendingBacklog is an atomic flag that indicates whether a goroutine
400+ // is currently sending the backlog of messages. This ensures only one
401+ // goroutine is active at a time.
402+ isSendingBacklog atomic.Bool
403+
384404 sync.Mutex
385405
386406 // cg is a helper that encapsulates a wait group and quit channel and
@@ -392,14 +412,23 @@ type GossipSyncer struct {
392412// newGossipSyncer returns a new instance of the GossipSyncer populated using
393413// the passed config.
394414func newGossipSyncer (cfg gossipSyncerCfg , sema chan struct {}) * GossipSyncer {
415+ // Use the configured queue size if set, otherwise use the default.
416+ queueSize := cfg .timestampQueueSize
417+ if queueSize == 0 {
418+ queueSize = defaultTimestampQueueSize
419+ }
420+
395421 return & GossipSyncer {
396422 cfg : cfg ,
397423 syncTransitionReqs : make (chan * syncTransitionReq ),
398424 historicalSyncReqs : make (chan * historicalSyncReq ),
399425 gossipMsgs : make (chan lnwire.Message , syncerBufferSize ),
400426 queryMsgs : make (chan lnwire.Message , syncerBufferSize ),
401- syncerSema : sema ,
402- cg : fn .NewContextGuard (),
427+ timestampRangeQueue : make (
428+ chan * lnwire.GossipTimestampRange , queueSize ,
429+ ),
430+ syncerSema : sema ,
431+ cg : fn .NewContextGuard (),
403432 }
404433}
405434
@@ -424,6 +453,13 @@ func (g *GossipSyncer) Start() {
424453 g .cg .WgAdd (1 )
425454 go g .replyHandler (ctx )
426455 }
456+
457+ // Start the timestamp range queue processor to handle gossip
458+ // filter applications asynchronously.
459+ if ! g .cfg .noTimestampQueryOption {
460+ g .cg .WgAdd (1 )
461+ go g .processTimestampRangeQueue (ctx )
462+ }
427463 })
428464}
429465
@@ -674,6 +710,63 @@ func (g *GossipSyncer) replyHandler(ctx context.Context) {
674710 }
675711}
676712
713+ // processTimestampRangeQueue handles timestamp range messages from the queue
714+ // asynchronously. This prevents blocking the gossiper when rate limiting is
715+ // active and multiple peers are trying to apply gossip filters.
716+ func (g * GossipSyncer ) processTimestampRangeQueue (ctx context.Context ) {
717+ defer g .cg .WgDone ()
718+
719+ for {
720+ select {
721+ case msg := <- g .timestampRangeQueue :
722+ // Process the timestamp range message. If we hit an
723+ // error, log it but continue processing to avoid
724+ // blocking the queue.
725+ err := g .ApplyGossipFilter (ctx , msg )
726+ switch {
727+ case errors .Is (err , ErrGossipSyncerExiting ):
728+ return
729+
730+ case errors .Is (err , lnpeer .ErrPeerExiting ):
731+ return
732+
733+ case err != nil :
734+ log .Errorf ("Unable to apply gossip filter: %v" ,
735+ err )
736+ }
737+
738+ case <- g .cg .Done ():
739+ return
740+
741+ case <- ctx .Done ():
742+ return
743+ }
744+ }
745+ }
746+
747+ // QueueTimestampRange attempts to queue a timestamp range message for
748+ // asynchronous processing. If the queue is full, it returns false to indicate
749+ // the message was dropped.
750+ func (g * GossipSyncer ) QueueTimestampRange (
751+ msg * lnwire.GossipTimestampRange ) bool {
752+
753+ // If timestamp queries are disabled, don't queue the message.
754+ if g .cfg .noTimestampQueryOption {
755+ return false
756+ }
757+
758+ select {
759+ case g .timestampRangeQueue <- msg :
760+ return true
761+
762+ // Queue is full, drop the message to prevent blocking.
763+ default :
764+ log .Warnf ("Timestamp range queue full for peer %x, " +
765+ "dropping message" , g .cfg .peerPub [:])
766+ return false
767+ }
768+ }
769+
677770// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
678771// syncer and sends it to the remote peer.
679772func (g * GossipSyncer ) sendGossipTimestampRange (ctx context.Context ,
@@ -1308,6 +1401,14 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
13081401 return nil
13091402 }
13101403
1404+ // Check if a goroutine is already sending the backlog. If so, return
1405+ // early without attempting to acquire the semaphore.
1406+ if g .isSendingBacklog .Load () {
1407+ log .Debugf ("GossipSyncer(%x): skipping ApplyGossipFilter, " +
1408+ "backlog send already in progress" , g .cfg .peerPub [:])
1409+ return nil
1410+ }
1411+
13111412 select {
13121413 case <- g .syncerSema :
13131414 case <- g .cg .Done ():
@@ -1342,11 +1443,23 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
13421443 return nil
13431444 }
13441445
1446+ // Set the atomic flag to indicate we're starting to send the backlog.
1447+ // If the swap fails, it means another goroutine is already active, so
1448+ // we return early.
1449+ if ! g .isSendingBacklog .CompareAndSwap (false , true ) {
1450+ returnSema ()
1451+ log .Debugf ("GossipSyncer(%x): another goroutine already " +
1452+ "sending backlog, skipping" , g .cfg .peerPub [:])
1453+
1454+ return nil
1455+ }
1456+
13451457 // We'll conclude by launching a goroutine to send out any updates.
13461458 g .cg .WgAdd (1 )
13471459 go func () {
13481460 defer g .cg .WgDone ()
13491461 defer returnSema ()
1462+ defer g .isSendingBacklog .Store (false )
13501463
13511464 for _ , msg := range newUpdatestoSend {
13521465 err := g .cfg .sendToPeerSync (ctx , msg )
0 commit comments