File tree Expand file tree Collapse file tree 2 files changed +16
-2
lines changed Expand file tree Collapse file tree 2 files changed +16
-2
lines changed Original file line number Diff line number Diff line change @@ -102,6 +102,7 @@ type Config struct {
102102 RollForwardFunc RollForwardFunc
103103 IntersectTimeout time.Duration
104104 BlockTimeout time.Duration
105+ PipelineLimit int
105106}
106107
107108// Callback function types
@@ -123,6 +124,7 @@ type ChainSyncOptionFunc func(*Config)
123124// NewConfig returns a new ChainSync config object with the provided options
124125func NewConfig (options ... ChainSyncOptionFunc ) Config {
125126 c := Config {
127+ PipelineLimit : 0 ,
126128 IntersectTimeout : 5 * time .Second ,
127129 // We should really use something more useful like 30-60s, but we've seen 55s between blocks
128130 // in the preview network
@@ -164,3 +166,10 @@ func WithBlockTimeout(timeout time.Duration) ChainSyncOptionFunc {
164166 c .BlockTimeout = timeout
165167 }
166168}
169+
170+ // WithPipelineLimit specifies the maximum number of block requests to pipeline
171+ func WithPipelineLimit (limit int ) ChainSyncOptionFunc {
172+ return func (c * Config ) {
173+ c .PipelineLimit = limit
174+ }
175+ }
Original file line number Diff line number Diff line change @@ -2,10 +2,11 @@ package chainsync
22
33import (
44 "fmt"
5+ "sync"
6+
57 "github.com/blinklabs-io/gouroboros/ledger"
68 "github.com/blinklabs-io/gouroboros/protocol"
79 "github.com/blinklabs-io/gouroboros/protocol/common"
8- "sync"
910)
1011
1112// Client implements the ChainSync client
@@ -133,7 +134,7 @@ func (c *Client) Sync(intersectPoints []common.Point) error {
133134 }
134135 // Pipeline the initial block requests to speed things up a bit
135136 // Using a value higher than 10 seems to cause problems with NtN
136- for i := 0 ; i < 10 ; i ++ {
137+ for i := 0 ; i <= c . config . PipelineLimit ; i ++ {
137138 msg := NewMsgRequestNext ()
138139 if err := c .SendMessage (msg ); err != nil {
139140 return err
@@ -222,6 +223,10 @@ func (c *Client) handleRollBackward(msgGeneric protocol.Message) error {
222223 return fmt .Errorf ("received chain-sync RollBackward message but no callback function is defined" )
223224 }
224225 msg := msgGeneric .(* MsgRollBackward )
226+ // Signal that we're ready for the next block after we finish handling the rollback
227+ defer func () {
228+ c .readyForNextBlockChan <- true
229+ }()
225230 // Call the user callback function
226231 return c .config .RollBackwardFunc (msg .Point , msg .Tip )
227232}
You can’t perform that action at this time.
0 commit comments