11package chainsync
22
33import (
4+ "encoding/hex"
45 "fmt"
56 "sync"
67
@@ -18,6 +19,8 @@ type Client struct {
1819 readyForNextBlockChan chan bool
1920 wantCurrentTip bool
2021 currentTipChan chan Tip
22+ wantFirstBlock bool
23+ firstBlockChan chan common.Point
2124}
2225
2326// NewClient returns a new ChainSync client object
@@ -39,6 +42,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
3942 intersectResultChan : make (chan error ),
4043 readyForNextBlockChan : make (chan bool ),
4144 currentTipChan : make (chan Tip ),
45+ firstBlockChan : make (chan common.Point ),
4246 }
4347 // Update state map with timeouts
4448 stateMap := StateMap .Copy ()
@@ -72,6 +76,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
7276 close (c .intersectResultChan )
7377 close (c .readyForNextBlockChan )
7478 close (c .currentTipChan )
79+ close (c .firstBlockChan )
7580 }()
7681 return c
7782}
@@ -120,6 +125,51 @@ func (c *Client) GetCurrentTip() (*Tip, error) {
120125 return & tip , nil
121126}
122127
128+ // GetAvailableBlockRange returns the start and end of the range of available blocks given the provided intersect
129+ // point(s).
130+ func (c * Client ) GetAvailableBlockRange (intersectPoints []common.Point ) (common.Point , common.Point , error ) {
131+ c .busyMutex .Lock ()
132+ defer c .busyMutex .Unlock ()
133+ c .wantCurrentTip = true
134+ c .wantFirstBlock = true
135+ var start , end common.Point
136+ msgFindIntersect := NewMsgFindIntersect (intersectPoints )
137+ if err := c .SendMessage (msgFindIntersect ); err != nil {
138+ return start , end , err
139+ }
140+ select {
141+ case tip := <- c .currentTipChan :
142+ end = tip .Point
143+ // Clear out intersect result channel to prevent blocking
144+ <- c .intersectResultChan
145+ case err := <- c .intersectResultChan :
146+ return start , end , err
147+ }
148+ c .wantCurrentTip = false
149+ // Request the next block. This should result in a rollback
150+ msgRequestNext := NewMsgRequestNext ()
151+ if err := c .SendMessage (msgRequestNext ); err != nil {
152+ return start , end , err
153+ }
154+ for {
155+ select {
156+ case point := <- c .firstBlockChan :
157+ start = point
158+ c .wantFirstBlock = false
159+ case <- c .readyForNextBlockChan :
160+ // Request the next block
161+ msg := NewMsgRequestNext ()
162+ if err := c .SendMessage (msg ); err != nil {
163+ return start , end , err
164+ }
165+ }
166+ if ! c .wantFirstBlock {
167+ break
168+ }
169+ }
170+ return start , end , nil
171+ }
172+
123173// Sync begins a chain-sync operation using the provided intersect point(s). Incoming blocks will be delivered
124174// via the RollForward callback function specified in the protocol config
125175func (c * Client ) Sync (intersectPoints []common.Point ) error {
@@ -172,13 +222,13 @@ func (c *Client) handleAwaitReply() error {
172222}
173223
174224func (c * Client ) handleRollForward (msgGeneric protocol.Message ) error {
175- if c .config .RollForwardFunc == nil {
225+ if c .config .RollForwardFunc == nil && ! c . wantFirstBlock {
176226 return fmt .Errorf ("received chain-sync RollForward message but no callback function is defined" )
177227 }
178228 var callbackErr error
179229 if c .Mode () == protocol .ProtocolModeNodeToNode {
180230 msg := msgGeneric .(* MsgRollForwardNtN )
181- var blockHeader interface {}
231+ var blockHeader ledger. BlockHeader
182232 var blockType uint
183233 blockEra := msg .WrappedHeader .Era
184234 switch blockEra {
@@ -205,6 +255,15 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
205255 return err
206256 }
207257 }
258+ if c .wantFirstBlock {
259+ blockHash , err := hex .DecodeString (blockHeader .Hash ())
260+ if err != nil {
261+ return err
262+ }
263+ point := common .NewPoint (blockHeader .SlotNumber (), blockHash )
264+ c .firstBlockChan <- point
265+ return nil
266+ }
208267 // Call the user callback function
209268 callbackErr = c .config .RollForwardFunc (blockType , blockHeader , msg .Tip )
210269 } else {
@@ -213,32 +272,46 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
213272 if err != nil {
214273 return err
215274 }
275+ if c .wantFirstBlock {
276+ blockHash , err := hex .DecodeString (blk .Hash ())
277+ if err != nil {
278+ return err
279+ }
280+ point := common .NewPoint (blk .SlotNumber (), blockHash )
281+ c .firstBlockChan <- point
282+ return nil
283+ }
216284 // Call the user callback function
217285 callbackErr = c .config .RollForwardFunc (msg .BlockType (), blk , msg .Tip )
218286 }
219- if callbackErr == StopSyncProcessError {
220- // Signal that we're cancelling the sync
221- c .readyForNextBlockChan <- false
287+ if callbackErr != nil {
288+ if callbackErr == StopSyncProcessError {
289+ // Signal that we're cancelling the sync
290+ c .readyForNextBlockChan <- false
291+ } else {
292+ return callbackErr
293+ }
222294 }
223295 // Signal that we're ready for the next block
224296 c .readyForNextBlockChan <- true
225297 return nil
226298}
227299
228300func (c * Client ) handleRollBackward (msgGeneric protocol.Message ) error {
229- if c .config .RollBackwardFunc == nil {
230- return fmt .Errorf ("received chain-sync RollBackward message but no callback function is defined" )
231- }
232- msg := msgGeneric .(* MsgRollBackward )
233- // Signal that we're ready for the next block after we finish handling the rollback
234- defer func () {
235- c .readyForNextBlockChan <- true
236- }()
237- // Call the user callback function
238- callbackErr := c .config .RollBackwardFunc (msg .Point , msg .Tip )
239- if callbackErr == StopSyncProcessError {
240- // Signal that we're cancelling the sync
241- c .readyForNextBlockChan <- false
301+ if ! c .wantFirstBlock {
302+ if c .config .RollBackwardFunc == nil {
303+ return fmt .Errorf ("received chain-sync RollBackward message but no callback function is defined" )
304+ }
305+ msg := msgGeneric .(* MsgRollBackward )
306+ // Call the user callback function
307+ if callbackErr := c .config .RollBackwardFunc (msg .Point , msg .Tip ); callbackErr != nil {
308+ if callbackErr == StopSyncProcessError {
309+ // Signal that we're cancelling the sync
310+ c .readyForNextBlockChan <- false
311+ } else {
312+ return callbackErr
313+ }
314+ }
242315 }
243316 // Signal that we're ready for the next block
244317 c .readyForNextBlockChan <- true
@@ -249,18 +322,16 @@ func (c *Client) handleIntersectFound(msgGeneric protocol.Message) error {
249322 if c .wantCurrentTip {
250323 msgIntersectFound := msgGeneric .(* MsgIntersectFound )
251324 c .currentTipChan <- msgIntersectFound .Tip
252- } else {
253- c .intersectResultChan <- nil
254325 }
326+ c .intersectResultChan <- nil
255327 return nil
256328}
257329
258330func (c * Client ) handleIntersectNotFound (msgGeneric protocol.Message ) error {
259331 if c .wantCurrentTip {
260332 msgIntersectNotFound := msgGeneric .(* MsgIntersectNotFound )
261333 c .currentTipChan <- msgIntersectNotFound .Tip
262- } else {
263- c .intersectResultChan <- IntersectNotFoundError {}
264334 }
335+ c .intersectResultChan <- IntersectNotFoundError {}
265336 return nil
266337}
0 commit comments