Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions protocol/blockfetch/blockfetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type Config struct {
RequestRangeFunc RequestRangeFunc
BatchStartTimeout time.Duration
BlockTimeout time.Duration
RecvQueueSize int
}

// Callback context
Expand Down Expand Up @@ -168,3 +169,11 @@ func WithBlockTimeout(timeout time.Duration) BlockFetchOptionFunc {
c.BlockTimeout = timeout
}
}

// WithRecvQueueSize specifies the size of the received messages queue. This is useful to adjust
// the number of blocks that can be fetched at once when acting as a client
func WithRecvQueueSize(size int) BlockFetchOptionFunc {
return func(c *Config) {
c.RecvQueueSize = size
}
}
3 changes: 3 additions & 0 deletions protocol/blockfetch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
StateMap: stateMap,
InitialState: StateIdle,
}
if c.config != nil {
protoConfig.RecvQueueSize = c.config.RecvQueueSize
}
c.Protocol = protocol.New(protoConfig)
return c
}
Expand Down
3 changes: 3 additions & 0 deletions protocol/blockfetch/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func (s *Server) initProtocol() {
StateMap: StateMap,
InitialState: StateIdle,
}
if s.config != nil {
protoConfig.RecvQueueSize = s.config.RecvQueueSize
}
s.Protocol = protocol.New(protoConfig)
}

Expand Down
9 changes: 9 additions & 0 deletions protocol/chainsync/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ type Config struct {
IntersectTimeout time.Duration
BlockTimeout time.Duration
PipelineLimit int
RecvQueueSize int
}

// Callback context
Expand Down Expand Up @@ -320,3 +321,11 @@ func WithPipelineLimit(limit int) ChainSyncOptionFunc {
c.PipelineLimit = limit
}
}

// WithRecvQueueSize specifies the size of the received messages queue. This is useful to adjust
// the number of pipelined messages that can be supported when acting as a server
func WithRecvQueueSize(size int) ChainSyncOptionFunc {
return func(c *Config) {
c.RecvQueueSize = size
}
}
3 changes: 3 additions & 0 deletions protocol/chainsync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ func NewClient(
StateContext: stateContext,
InitialState: stateIdle,
}
if c.config != nil {
protoConfig.RecvQueueSize = c.config.RecvQueueSize
}
c.Protocol = protocol.New(protoConfig)
return c
}
Expand Down
3 changes: 3 additions & 0 deletions protocol/chainsync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func (s *Server) initProtocol() {
StateContext: s.stateContext,
InitialState: stateIdle,
}
if s.config != nil {
protoConfig.RecvQueueSize = s.config.RecvQueueSize
}
s.Protocol = protocol.New(protoConfig)
}

Expand Down
Loading