Skip to content

Commit 1c5e84d

Browse files
committed
feat: make chainsync pipelining limit configurable
Fixes #145
1 parent 0700b1a commit 1c5e84d

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

protocol/chainsync/chainsync.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ type Config struct {
102102
RollForwardFunc RollForwardFunc
103103
IntersectTimeout time.Duration
104104
BlockTimeout time.Duration
105+
PipelineLimit int
105106
}
106107

107108
// Callback function types
@@ -123,6 +124,7 @@ type ChainSyncOptionFunc func(*Config)
123124
// NewConfig returns a new ChainSync config object with the provided options
124125
func NewConfig(options ...ChainSyncOptionFunc) Config {
125126
c := Config{
127+
PipelineLimit: 0,
126128
IntersectTimeout: 5 * time.Second,
127129
// We should really use something more useful like 30-60s, but we've seen 55s between blocks
128130
// in the preview network
@@ -164,3 +166,10 @@ func WithBlockTimeout(timeout time.Duration) ChainSyncOptionFunc {
164166
c.BlockTimeout = timeout
165167
}
166168
}
169+
170+
// WithPipelineLimit specifies the maximum number of block requests to pipeline
171+
func WithPipelineLimit(limit int) ChainSyncOptionFunc {
172+
return func(c *Config) {
173+
c.PipelineLimit = limit
174+
}
175+
}

protocol/chainsync/client.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package chainsync
22

33
import (
44
"fmt"
5+
"sync"
6+
57
"github.com/blinklabs-io/gouroboros/ledger"
68
"github.com/blinklabs-io/gouroboros/protocol"
79
"github.com/blinklabs-io/gouroboros/protocol/common"
8-
"sync"
910
)
1011

1112
// Client implements the ChainSync client
@@ -133,7 +134,7 @@ func (c *Client) Sync(intersectPoints []common.Point) error {
133134
}
134135
// Pipeline the initial block requests to speed things up a bit
135136
// Using a value higher than 10 seems to cause problems with NtN
136-
for i := 0; i < 10; i++ {
137+
for i := 0; i <= c.config.PipelineLimit; i++ {
137138
msg := NewMsgRequestNext()
138139
if err := c.SendMessage(msg); err != nil {
139140
return err
@@ -222,6 +223,10 @@ func (c *Client) handleRollBackward(msgGeneric protocol.Message) error {
222223
return fmt.Errorf("received chain-sync RollBackward message but no callback function is defined")
223224
}
224225
msg := msgGeneric.(*MsgRollBackward)
226+
// Signal that we're ready for the next block after we finish handling the rollback
227+
defer func() {
228+
c.readyForNextBlockChan <- true
229+
}()
225230
// Call the user callback function
226231
return c.config.RollBackwardFunc(msg.Point, msg.Tip)
227232
}

0 commit comments

Comments
 (0)