Skip to content

Commit e86c996

Browse files
authored
Merge pull request #52 from blinklabs-io/feat/plugin-state
feat: chainsync input plugin status update callback
2 parents 5abd83a + f6caee8 commit e86c996

File tree

2 files changed

+41
-11
lines changed

2 files changed

+41
-11
lines changed

input/chainsync/chainsync.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,36 @@ import (
2828
)
2929

3030
type ChainSync struct {
31-
oConn *ouroboros.Connection
32-
network string
33-
networkMagic uint32
34-
address string
35-
socketPath string
36-
ntcTcp bool
37-
intersectTip bool
38-
intersectPoints []ocommon.Point
39-
includeCbor bool
40-
errorChan chan error
41-
eventChan chan event.Event
31+
oConn *ouroboros.Connection
32+
network string
33+
networkMagic uint32
34+
address string
35+
socketPath string
36+
ntcTcp bool
37+
intersectTip bool
38+
intersectPoints []ocommon.Point
39+
includeCbor bool
40+
statusUpdateFunc StatusUpdateFunc
41+
status *ChainSyncStatus
42+
errorChan chan error
43+
eventChan chan event.Event
4244
}
4345

46+
type ChainSyncStatus struct {
47+
SlotNumber uint64
48+
BlockNumber uint64
49+
BlockHash string
50+
}
51+
52+
type StatusUpdateFunc func(ChainSyncStatus)
53+
4454
// New returns a new ChainSync object with the specified options applied
4555
func New(options ...ChainSyncOptionFunc) *ChainSync {
4656
c := &ChainSync{
4757
errorChan: make(chan error),
4858
eventChan: make(chan event.Event, 10),
4959
intersectPoints: []ocommon.Point{},
60+
status: &ChainSyncStatus{},
5061
}
5162
for _, option := range options {
5263
option(c)
@@ -176,6 +187,7 @@ func (c *ChainSync) handleRollForward(blockType uint, blockData interface{}, tip
176187
case ledger.Block:
177188
evt := event.New("chainsync.block", time.Now(), NewBlockEvent(v, c.includeCbor))
178189
c.eventChan <- evt
190+
c.updateStatus(v.SlotNumber(), v.BlockNumber(), v.Hash())
179191
case ledger.BlockHeader:
180192
blockSlot := v.SlotNumber()
181193
blockHash, _ := hex.DecodeString(v.Hash())
@@ -189,6 +201,16 @@ func (c *ChainSync) handleRollForward(blockType uint, blockData interface{}, tip
189201
txEvt := event.New("chainsync.transaction", time.Now(), NewTransactionEvent(block, transaction, c.includeCbor))
190202
c.eventChan <- txEvt
191203
}
204+
c.updateStatus(v.SlotNumber(), v.BlockNumber(), v.Hash())
192205
}
193206
return nil
194207
}
208+
209+
func (c *ChainSync) updateStatus(slotNumber uint64, blockNumber uint64, blockHash string) {
210+
c.status.SlotNumber = slotNumber
211+
c.status.BlockNumber = blockNumber
212+
c.status.BlockHash = blockHash
213+
if c.statusUpdateFunc != nil {
214+
c.statusUpdateFunc(*(c.status))
215+
}
216+
}

input/chainsync/options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,11 @@ func WithIncludeCbor(includeCbor bool) ChainSyncOptionFunc {
7575
c.includeCbor = includeCbor
7676
}
7777
}
78+
79+
// WithStatusUpdateFunc specifies a callback function for status updates. This is useful for tracking the chain-sync status
80+
// to be able to resume a sync at a later time, especially when any filtering could prevent you from getting all block update events
81+
func WithStatusUpdateFunc(statusUpdateFunc StatusUpdateFunc) ChainSyncOptionFunc {
82+
return func(c *ChainSync) {
83+
c.statusUpdateFunc = statusUpdateFunc
84+
}
85+
}

0 commit comments

Comments
 (0)