diff --git a/protocol/blockfetch/blockfetch.go b/protocol/blockfetch/blockfetch.go index dab95c12..e82b4b7c 100644 --- a/protocol/blockfetch/blockfetch.go +++ b/protocol/blockfetch/blockfetch.go @@ -92,6 +92,7 @@ type Config struct { RequestRangeFunc RequestRangeFunc BatchStartTimeout time.Duration BlockTimeout time.Duration + RecvQueueSize int } // Callback context @@ -168,3 +169,11 @@ func WithBlockTimeout(timeout time.Duration) BlockFetchOptionFunc { c.BlockTimeout = timeout } } + +// WithRecvQueueSize specifies the size of the received messages queue. This is useful to adjust +// the number of blocks that can be fetched at once when acting as a client +func WithRecvQueueSize(size int) BlockFetchOptionFunc { + return func(c *Config) { + c.RecvQueueSize = size + } +} diff --git a/protocol/blockfetch/client.go b/protocol/blockfetch/client.go index 832f9b1e..113246d6 100644 --- a/protocol/blockfetch/client.go +++ b/protocol/blockfetch/client.go @@ -75,6 +75,9 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { StateMap: stateMap, InitialState: StateIdle, } + if c.config != nil { + protoConfig.RecvQueueSize = c.config.RecvQueueSize + } c.Protocol = protocol.New(protoConfig) return c } diff --git a/protocol/blockfetch/server.go b/protocol/blockfetch/server.go index 979dae81..c33df4ec 100644 --- a/protocol/blockfetch/server.go +++ b/protocol/blockfetch/server.go @@ -57,6 +57,9 @@ func (s *Server) initProtocol() { StateMap: StateMap, InitialState: StateIdle, } + if s.config != nil { + protoConfig.RecvQueueSize = s.config.RecvQueueSize + } s.Protocol = protocol.New(protoConfig) } diff --git a/protocol/chainsync/chainsync.go b/protocol/chainsync/chainsync.go index 52227087..bfc313f3 100644 --- a/protocol/chainsync/chainsync.go +++ b/protocol/chainsync/chainsync.go @@ -206,6 +206,7 @@ type Config struct { IntersectTimeout time.Duration BlockTimeout time.Duration PipelineLimit int + RecvQueueSize int } // Callback context @@ -320,3 +321,11 @@ func WithPipelineLimit(limit int) ChainSyncOptionFunc { c.PipelineLimit = limit } } + +// WithRecvQueueSize specifies the size of the received messages queue. This is useful to adjust +// the number of pipelined messages that can be supported when acting as a server +func WithRecvQueueSize(size int) ChainSyncOptionFunc { + return func(c *Config) { + c.RecvQueueSize = size + } +} diff --git a/protocol/chainsync/client.go b/protocol/chainsync/client.go index 3e7236cc..81486462 100644 --- a/protocol/chainsync/client.go +++ b/protocol/chainsync/client.go @@ -112,6 +112,9 @@ func NewClient( StateContext: stateContext, InitialState: stateIdle, } + if c.config != nil { + protoConfig.RecvQueueSize = c.config.RecvQueueSize + } c.Protocol = protocol.New(protoConfig) return c } diff --git a/protocol/chainsync/server.go b/protocol/chainsync/server.go index e20f4739..b04f796f 100644 --- a/protocol/chainsync/server.go +++ b/protocol/chainsync/server.go @@ -75,6 +75,9 @@ func (s *Server) initProtocol() { StateContext: s.stateContext, InitialState: stateIdle, } + if s.config != nil { + protoConfig.RecvQueueSize = s.config.RecvQueueSize + } s.Protocol = protocol.New(protoConfig) }