Skip to content

Commit 98c6510

Browse files
committed
feat: make recv queue size configurable for chainsync/blockfetch
1 parent 6aadb95 commit 98c6510

File tree

6 files changed

+22
-0
lines changed

6 files changed

+22
-0
lines changed

protocol/blockfetch/blockfetch.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ type Config struct {
9292
RequestRangeFunc RequestRangeFunc
9393
BatchStartTimeout time.Duration
9494
BlockTimeout time.Duration
95+
RecvQueueSize int
9596
}
9697

9798
// Callback context
@@ -168,3 +169,11 @@ func WithBlockTimeout(timeout time.Duration) BlockFetchOptionFunc {
168169
c.BlockTimeout = timeout
169170
}
170171
}
172+
173+
// WithRecvQueueSize specifies the size of the received messages queue. This is useful to adjust
174+
// the number of blocks that can be fetched at once when acting as a client
175+
func WithRecvQueueSize(size int) BlockFetchOptionFunc {
176+
return func(c *Config) {
177+
c.RecvQueueSize = size
178+
}
179+
}

protocol/blockfetch/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
7474
MessageFromCborFunc: NewMsgFromCbor,
7575
StateMap: stateMap,
7676
InitialState: StateIdle,
77+
RecvQueueSize: c.config.RecvQueueSize,
7778
}
7879
c.Protocol = protocol.New(protoConfig)
7980
return c

protocol/blockfetch/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func (s *Server) initProtocol() {
5656
MessageFromCborFunc: NewMsgFromCbor,
5757
StateMap: StateMap,
5858
InitialState: StateIdle,
59+
RecvQueueSize: s.config.RecvQueueSize,
5960
}
6061
s.Protocol = protocol.New(protoConfig)
6162
}

protocol/chainsync/chainsync.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ type Config struct {
206206
IntersectTimeout time.Duration
207207
BlockTimeout time.Duration
208208
PipelineLimit int
209+
RecvQueueSize int
209210
}
210211

211212
// Callback context
@@ -320,3 +321,11 @@ func WithPipelineLimit(limit int) ChainSyncOptionFunc {
320321
c.PipelineLimit = limit
321322
}
322323
}
324+
325+
// WithRecvQueueSize specifies the size of the received messages queue. This is useful to adjust
326+
// the number of pipelined messages that can be supported when acting as a server
327+
func WithRecvQueueSize(size int) ChainSyncOptionFunc {
328+
return func(c *Config) {
329+
c.RecvQueueSize = size
330+
}
331+
}

protocol/chainsync/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ func NewClient(
111111
StateMap: stateMap,
112112
StateContext: stateContext,
113113
InitialState: stateIdle,
114+
RecvQueueSize: c.config.RecvQueueSize,
114115
}
115116
c.Protocol = protocol.New(protoConfig)
116117
return c

protocol/chainsync/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ func (s *Server) initProtocol() {
7474
StateMap: StateMap,
7575
StateContext: s.stateContext,
7676
InitialState: stateIdle,
77+
RecvQueueSize: s.config.RecvQueueSize,
7778
}
7879
s.Protocol = protocol.New(protoConfig)
7980
}

0 commit comments

Comments
 (0)