Skip to content

Commit 261b268

Browse files
authored
Merge pull request #237 from blinklabs-io/feat/stop-chainsync
feat: method to stop chainsync process
2 parents 57e9c4e + 91d9678 commit 261b268

File tree

2 files changed

+30
-8
lines changed

2 files changed

+30
-8
lines changed

protocol/chainsync/client.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,12 @@ func (c *Client) Sync(intersectPoints []common.Point) error {
147147
func (c *Client) syncLoop() {
148148
for {
149149
// Wait for a block to be received
150-
if _, ok := <-c.readyForNextBlockChan; !ok {
150+
if ready, ok := <-c.readyForNextBlockChan; !ok {
151151
// Channel is closed, which means we're shutting down
152152
return
153+
} else if !ready {
154+
// Sync was cancelled
155+
return
153156
}
154157
c.busyMutex.Lock()
155158
// Request the next block
@@ -172,10 +175,7 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
172175
if c.config.RollForwardFunc == nil {
173176
return fmt.Errorf("received chain-sync RollForward message but no callback function is defined")
174177
}
175-
// Signal that we're ready for the next block after we finish handling this one
176-
defer func() {
177-
c.readyForNextBlockChan <- true
178-
}()
178+
var callbackErr error
179179
if c.Mode() == protocol.ProtocolModeNodeToNode {
180180
msg := msgGeneric.(*MsgRollForwardNtN)
181181
var blockHeader interface{}
@@ -206,16 +206,23 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
206206
}
207207
}
208208
// Call the user callback function
209-
return c.config.RollForwardFunc(blockType, blockHeader, msg.Tip)
209+
callbackErr = c.config.RollForwardFunc(blockType, blockHeader, msg.Tip)
210210
} else {
211211
msg := msgGeneric.(*MsgRollForwardNtC)
212212
blk, err := ledger.NewBlockFromCbor(msg.BlockType(), msg.BlockCbor())
213213
if err != nil {
214214
return err
215215
}
216216
// Call the user callback function
217-
return c.config.RollForwardFunc(msg.BlockType(), blk, msg.Tip)
217+
callbackErr = c.config.RollForwardFunc(msg.BlockType(), blk, msg.Tip)
218218
}
219+
if callbackErr == StopSyncProcessError {
220+
// Signal that we're cancelling the sync
221+
c.readyForNextBlockChan <- false
222+
}
223+
// Signal that we're ready for the next block
224+
c.readyForNextBlockChan <- true
225+
return nil
219226
}
220227

221228
func (c *Client) handleRollBackward(msgGeneric protocol.Message) error {
@@ -228,7 +235,14 @@ func (c *Client) handleRollBackward(msgGeneric protocol.Message) error {
228235
c.readyForNextBlockChan <- true
229236
}()
230237
// Call the user callback function
231-
return c.config.RollBackwardFunc(msg.Point, msg.Tip)
238+
callbackErr := c.config.RollBackwardFunc(msg.Point, msg.Tip)
239+
if callbackErr == StopSyncProcessError {
240+
// Signal that we're cancelling the sync
241+
c.readyForNextBlockChan <- false
242+
}
243+
// Signal that we're ready for the next block
244+
c.readyForNextBlockChan <- true
245+
return nil
232246
}
233247

234248
func (c *Client) handleIntersectFound(msgGeneric protocol.Message) error {

protocol/chainsync/error.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
package chainsync
22

3+
import (
4+
"fmt"
5+
)
6+
37
// IntersectNotFoundError represents a failure to find a chain intersection
48
type IntersectNotFoundError struct {
59
}
610

711
func (e IntersectNotFoundError) Error() string {
812
return "chain intersection not found"
913
}
14+
15+
// StopChainSync is used as a special return value from a RollForward or RollBackward handler function
16+
// to signify that the sync process should be stopped
17+
var StopSyncProcessError = fmt.Errorf("stop sync process")

0 commit comments

Comments
 (0)