Skip to content

Commit 680c975

Browse files
committed
feat: chainsync bulk mode
Fixes #18
1 parent fc87bcc commit 680c975

File tree

3 files changed

+104
-11
lines changed

3 files changed

+104
-11
lines changed

input/chainsync/chainsync.go

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
ouroboros "github.com/blinklabs-io/gouroboros"
2525
"github.com/blinklabs-io/gouroboros/ledger"
26+
"github.com/blinklabs-io/gouroboros/protocol/blockfetch"
2627
ochainsync "github.com/blinklabs-io/gouroboros/protocol/chainsync"
2728
ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
2829
)
@@ -34,13 +35,16 @@ type ChainSync struct {
3435
address string
3536
socketPath string
3637
ntcTcp bool
38+
bulkMode bool
3739
intersectTip bool
3840
intersectPoints []ocommon.Point
3941
includeCbor bool
4042
statusUpdateFunc StatusUpdateFunc
4143
status *ChainSyncStatus
4244
errorChan chan error
4345
eventChan chan event.Event
46+
bulkRangeStart ocommon.Point
47+
bulkRangeEnd ocommon.Point
4448
}
4549

4650
type ChainSyncStatus struct {
@@ -76,15 +80,26 @@ func (c *ChainSync) Start() error {
7680
if c.oConn.BlockFetch() != nil {
7781
c.oConn.BlockFetch().Client.Start()
7882
}
79-
if c.intersectTip {
80-
tip, err := c.oConn.ChainSync().Client.GetCurrentTip()
83+
if c.bulkMode && !c.intersectTip && c.oConn.BlockFetch() != nil {
84+
var err error
85+
c.bulkRangeStart, c.bulkRangeEnd, err = c.oConn.ChainSync().Client.GetAvailableBlockRange(c.intersectPoints)
8186
if err != nil {
8287
return err
8388
}
84-
c.intersectPoints = []ocommon.Point{tip.Point}
85-
}
86-
if err := c.oConn.ChainSync().Client.Sync(c.intersectPoints); err != nil {
87-
return err
89+
if err := c.oConn.BlockFetch().Client.GetBlockRange(c.bulkRangeStart, c.bulkRangeEnd); err != nil {
90+
return err
91+
}
92+
} else {
93+
if c.intersectTip {
94+
tip, err := c.oConn.ChainSync().Client.GetCurrentTip()
95+
if err != nil {
96+
return err
97+
}
98+
c.intersectPoints = []ocommon.Point{tip.Point}
99+
}
100+
if err := c.oConn.ChainSync().Client.Sync(c.intersectPoints); err != nil {
101+
return err
102+
}
88103
}
89104
return nil
90105
}
@@ -158,6 +173,11 @@ func (c *ChainSync) setupConnection() error {
158173
ochainsync.WithRollBackwardFunc(c.handleRollBackward),
159174
),
160175
),
176+
ouroboros.WithBlockFetchConfig(
177+
blockfetch.NewConfig(
178+
blockfetch.WithBlockFunc(c.handleBlockFetchBlock),
179+
),
180+
),
161181
)
162182
if err != nil {
163183
return err
@@ -208,6 +228,19 @@ func (c *ChainSync) handleRollForward(blockType uint, blockData interface{}, tip
208228
return nil
209229
}
210230

231+
func (c *ChainSync) handleBlockFetchBlock(block ledger.Block) error {
232+
evt := event.New("chainsync.block", time.Now(), NewBlockEvent(block, c.includeCbor))
233+
c.eventChan <- evt
234+
c.updateStatus(block.SlotNumber(), block.BlockNumber(), block.Hash(), c.bulkRangeEnd.Slot, hex.EncodeToString(c.bulkRangeEnd.Hash))
235+
// Start normal chain-sync if we've reached the last block of our bulk range
236+
if block.SlotNumber() == c.bulkRangeEnd.Slot {
237+
if err := c.oConn.ChainSync().Client.Sync(c.intersectPoints); err != nil {
238+
return err
239+
}
240+
}
241+
return nil
242+
}
243+
211244
func (c *ChainSync) updateStatus(slotNumber uint64, blockNumber uint64, blockHash string, tipSlotNumber uint64, tipBlockHash string) {
212245
c.status.SlotNumber = slotNumber
213246
c.status.BlockNumber = blockNumber

input/chainsync/options.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,10 @@ func WithStatusUpdateFunc(statusUpdateFunc StatusUpdateFunc) ChainSyncOptionFunc
8383
c.statusUpdateFunc = statusUpdateFunc
8484
}
8585
}
86+
87+
// WithBulkMode specifies whether to use the "bulk" sync mode with NtN (node-to-node). This should only be used against your own nodes for resource usage reasons
88+
func WithBulkMode(bulkMode bool) ChainSyncOptionFunc {
89+
return func(c *ChainSync) {
90+
c.bulkMode = bulkMode
91+
}
92+
}

input/chainsync/plugin.go

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,13 @@
1515
package chainsync
1616

1717
import (
18+
"encoding/hex"
19+
"strconv"
20+
"strings"
21+
1822
"github.com/blinklabs-io/snek/plugin"
23+
24+
ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
1925
)
2026

2127
var cmdlineOptions struct {
@@ -24,6 +30,7 @@ var cmdlineOptions struct {
2430
address string
2531
socketPath string
2632
ntcTcp bool
33+
bulkMode bool
2734
intersectTip bool
2835
intersectPoint string
2936
includeCbor bool
@@ -74,14 +81,27 @@ func init() {
7481
DefaultValue: false,
7582
Dest: &(cmdlineOptions.ntcTcp),
7683
},
84+
{
85+
Name: "bulk-mode",
86+
Type: plugin.PluginOptionTypeBool,
87+
Description: "use the 'bulk' sync mode with NtN (node-to-node). This should only be used against your own nodes for resource usage reasons",
88+
DefaultValue: false,
89+
Dest: &(cmdlineOptions.bulkMode),
90+
},
7791
{
7892
Name: "intersect-tip",
7993
Type: plugin.PluginOptionTypeBool,
8094
Description: "start syncing at the chain tip (defaults to chain genesis)",
8195
DefaultValue: true,
8296
Dest: &(cmdlineOptions.intersectTip),
8397
},
84-
// TODO: intersect-point
98+
{
99+
Name: "intersect-point",
100+
Type: plugin.PluginOptionTypeString,
101+
Description: "start syncing at the specified chain point(s) in '<slot>.<hash>' format",
102+
DefaultValue: "",
103+
Dest: &(cmdlineOptions.intersectPoint),
104+
},
85105
{
86106
Name: "include-cbor",
87107
Type: plugin.PluginOptionTypeBool,
@@ -95,15 +115,48 @@ func init() {
95115
}
96116

97117
func NewFromCmdlineOptions() plugin.Plugin {
98-
p := New(
118+
opts := []ChainSyncOptionFunc{
99119
WithNetwork(cmdlineOptions.network),
100120
WithNetworkMagic(uint32(cmdlineOptions.networkMagic)),
101121
WithAddress(cmdlineOptions.address),
102122
WithSocketPath(cmdlineOptions.socketPath),
103123
WithNtcTcp(cmdlineOptions.ntcTcp),
104-
WithIntersectTip(cmdlineOptions.intersectTip),
105-
// TODO: WithIntersectPoints
124+
WithBulkMode(cmdlineOptions.bulkMode),
106125
WithIncludeCbor(cmdlineOptions.includeCbor),
107-
)
126+
}
127+
if cmdlineOptions.intersectPoint != "" {
128+
intersectPoints := []ocommon.Point{}
129+
for _, point := range strings.Split(cmdlineOptions.intersectPoint, ",") {
130+
intersectPointParts := strings.Split(point, ".")
131+
if len(intersectPointParts) != 2 {
132+
panic("invalid intersect point format")
133+
}
134+
intersectSlot, err := strconv.ParseUint(intersectPointParts[0], 10, 64)
135+
if err != nil {
136+
panic("invalid intersect point format")
137+
}
138+
intersectHashBytes, err := hex.DecodeString(intersectPointParts[1])
139+
if err != nil {
140+
panic("invalid intersect point format")
141+
}
142+
intersectPoints = append(
143+
intersectPoints,
144+
ocommon.Point{
145+
Slot: intersectSlot,
146+
Hash: intersectHashBytes[:],
147+
},
148+
)
149+
}
150+
opts = append(
151+
opts,
152+
WithIntersectPoints(intersectPoints),
153+
)
154+
} else {
155+
opts = append(
156+
opts,
157+
WithIntersectTip(cmdlineOptions.intersectTip),
158+
)
159+
}
160+
p := New(opts...)
108161
return p
109162
}

0 commit comments

Comments
 (0)