From 3e27f25c21903452166ab383f5453e347f312fb1 Mon Sep 17 00:00:00 2001 From: Aurora Gaffney Date: Mon, 27 Jan 2025 17:15:17 -0500 Subject: [PATCH] refactor: send RequestNext in batches for chainsync pipelining We now send pipelined RequestNext messages in batches instead of an initial batch and subsequent single messages. This potentially improves performance and reduces pressure on mini-protocol buffers --- protocol/chainsync/client.go | 53 +++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/protocol/chainsync/client.go b/protocol/chainsync/client.go index 5755614a..3e474564 100644 --- a/protocol/chainsync/client.go +++ b/protocol/chainsync/client.go @@ -28,12 +28,13 @@ import ( // Client implements the ChainSync client type Client struct { *protocol.Protocol - config *Config - callbackContext CallbackContext - busyMutex sync.Mutex - readyForNextBlockChan chan bool - onceStart sync.Once - onceStop sync.Once + config *Config + callbackContext CallbackContext + busyMutex sync.Mutex + readyForNextBlockChan chan bool + onceStart sync.Once + onceStop sync.Once + syncPipelinedRequestNext int // waitingForCurrentTipChan will process all the requests for the current tip until the channel // is empty. @@ -404,8 +405,8 @@ func (c *Client) Sync(intersectPoints []common.Point) error { } intersectResultChan, cancel := c.wantIntersectFound() - msg := NewMsgFindIntersect(intersectPoints) - if err := c.SendMessage(msg); err != nil { + msgFindIntersect := NewMsgFindIntersect(intersectPoints) + if err := c.SendMessage(msgFindIntersect); err != nil { cancel() return err } @@ -418,14 +419,14 @@ func (c *Client) Sync(intersectPoints []common.Point) error { } } - // Pipeline the initial block requests to speed things up a bit - // Using a value higher than 10 seems to cause problems with NtN - for i := 0; i <= c.config.PipelineLimit; i++ { - msg := NewMsgRequestNext() - if err := c.SendMessage(msg); err != nil { - return err - } + // Send initial RequestNext + msgRequestNext := NewMsgRequestNext() + if err := c.SendMessage(msgRequestNext); err != nil { + return err } + // Reset pipelined message counter + c.syncPipelinedRequestNext = 0 + // Start sync loop go c.syncLoop() return nil } @@ -441,15 +442,23 @@ func (c *Client) syncLoop() { return } c.busyMutex.Lock() - // Request the next block - // In practice we already have multiple block requests pipelined - // and this just adds another one to the pile - msg := NewMsgRequestNext() - if err := c.SendMessage(msg); err != nil { - c.SendError(err) + // Wait for next block if we have pipelined messages + if c.syncPipelinedRequestNext > 0 { + c.syncPipelinedRequestNext-- c.busyMutex.Unlock() - return + continue + } + // Request the next block(s) + msgCount := max(c.config.PipelineLimit, 1) + for i := 0; i < msgCount; i++ { + msg := NewMsgRequestNext() + if err := c.SendMessage(msg); err != nil { + c.SendError(err) + c.busyMutex.Unlock() + return + } } + c.syncPipelinedRequestNext = msgCount - 1 c.busyMutex.Unlock() } }