Skip to content

Commit c39959c

Browse files
authored
refactor(chain): event system (#1439)
Signed-off-by: Chris Gianelloni <wolf31o2@blinklabs.io>
1 parent 0a363c9 commit c39959c

File tree

1 file changed

+41
-58
lines changed

1 file changed

+41
-58
lines changed

chain/chain.go

Lines changed: 41 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -125,43 +125,42 @@ func (c *Chain) AddBlock(
125125
block ledger.Block,
126126
txn *database.Txn,
127127
) error {
128-
if c == nil {
129-
return errors.New("chain is nil")
130-
}
131-
// Perform all state mutations under locks, collect event to publish
132-
// outside locks to prevent deadlock if subscribers access chain state.
133-
pendingEvent, err := c.addBlockLocked(block, txn)
128+
evt, err := c.addBlockInternal(block, txn)
134129
if err != nil {
135130
return err
136131
}
137-
// Publish event outside locks to prevent deadlock if subscribers
138-
// call back into chain or manager state
139-
if c.eventBus != nil && pendingEvent != nil {
140-
c.eventBus.Publish(pendingEvent.Type, *pendingEvent)
132+
// Publish event immediately for standalone (non-batched) calls
133+
if c.eventBus != nil {
134+
c.eventBus.Publish(ChainUpdateEventType, evt)
141135
}
142136
return nil
143137
}
144138

145-
// addBlockLocked performs AddBlock state mutations under locks and returns
146-
// any event that should be published after locks are released.
147-
func (c *Chain) addBlockLocked(
139+
// addBlockInternal performs all block-adding logic but returns the event
140+
// instead of publishing it. This allows AddBlocks to defer event
141+
// publication until the entire batch transaction has committed, preventing
142+
// subscribers from observing data that may be rolled back.
143+
func (c *Chain) addBlockInternal(
148144
block ledger.Block,
149145
txn *database.Txn,
150-
) (*event.Event, error) {
146+
) (event.Event, error) {
147+
if c == nil {
148+
return event.Event{}, errors.New("chain is nil")
149+
}
151150
c.mutex.Lock()
152151
defer c.mutex.Unlock()
153152
// We get a write lock on the manager to cover the integrity checks and adding the block below
154153
c.manager.mutex.Lock()
155154
defer c.manager.mutex.Unlock()
156155
// Verify chain integrity
157156
if err := c.reconcile(); err != nil {
158-
return nil, fmt.Errorf("reconcile chain: %w", err)
157+
return event.Event{}, fmt.Errorf("reconcile chain: %w", err)
159158
}
160159
// Check that the new block matches our first header, if any
161160
if len(c.headers) > 0 {
162161
firstHeader := c.headers[0]
163162
if block.Hash().String() != firstHeader.Hash().String() {
164-
return nil, NewBlockNotMatchHeaderError(
163+
return event.Event{}, NewBlockNotMatchHeaderError(
165164
block.Hash().String(),
166165
firstHeader.Hash().String(),
167166
)
@@ -170,7 +169,7 @@ func (c *Chain) addBlockLocked(
170169
// Check that this block fits on the current chain tip
171170
if c.tipBlockIndex >= initialBlockIndex {
172171
if string(block.PrevHash().Bytes()) != string(c.currentTip.Point.Hash) {
173-
return nil, NewBlockNotFitChainTipError(
172+
return event.Event{}, NewBlockNotFitChainTipError(
174173
block.Hash().String(),
175174
block.PrevHash().String(),
176175
hex.EncodeToString(c.currentTip.Point.Hash),
@@ -193,7 +192,7 @@ func (c *Chain) addBlockLocked(
193192
Cbor: block.Cbor(),
194193
}
195194
if err := c.manager.addBlock(tmpBlock, txn, c.persistent); err != nil {
196-
return nil, fmt.Errorf("store block: %w", err)
195+
return event.Event{}, fmt.Errorf("store block: %w", err)
197196
}
198197
if !c.persistent {
199198
c.blocks = append(c.blocks, tmpPoint)
@@ -215,18 +214,15 @@ func (c *Chain) addBlockLocked(
215214
c.waitingChan = nil
216215
}
217216
c.waitingChanMutex.Unlock()
218-
// Build event for deferred publication
219-
if c.eventBus != nil {
220-
evt := event.NewEvent(
221-
ChainUpdateEventType,
222-
ChainBlockEvent{
223-
Point: tmpPoint,
224-
Block: tmpBlock,
225-
},
226-
)
227-
return &evt, nil
228-
}
229-
return nil, nil
217+
// Build event for caller to publish after transaction commit
218+
evt := event.NewEvent(
219+
ChainUpdateEventType,
220+
ChainBlockEvent{
221+
Point: tmpPoint,
222+
Block: tmpBlock,
223+
},
224+
)
225+
return evt, nil
230226
}
231227

232228
func (c *Chain) AddBlocks(blocks []ledger.Block) error {
@@ -243,33 +239,30 @@ func (c *Chain) AddBlocks(blocks []ledger.Block) error {
243239
if batchSize == 0 {
244240
break
245241
}
246-
// Collect events inside the transaction callback and
247-
// publish them only after the transaction commits
248-
// successfully. This prevents subscribers from reacting
249-
// to events whose underlying data has not yet been
250-
// persisted.
242+
// Collect events during the transaction so they can be
243+
// published only after the transaction commits successfully.
244+
// This prevents subscribers from observing rolled-back data
245+
// when a later block in the batch fails.
251246
var pendingEvents []event.Event
252247
txn := c.manager.db.BlobTxn(true)
253248
err := txn.Do(func(txn *database.Txn) error {
254249
pendingEvents = pendingEvents[:0]
255250
for _, tmpBlock := range blocks[batchOffset : batchOffset+batchSize] {
256-
evt, err := c.addBlockLocked(tmpBlock, txn)
251+
evt, err := c.addBlockInternal(tmpBlock, txn)
257252
if err != nil {
258253
return err
259254
}
260-
if evt != nil {
261-
pendingEvents = append(pendingEvents, *evt)
262-
}
255+
pendingEvents = append(pendingEvents, evt)
263256
}
264257
return nil
265258
})
266259
if err != nil {
267260
return err
268261
}
269-
// Publish events after the transaction has committed
262+
// Transaction committed successfully; publish all events
270263
if c.eventBus != nil {
271264
for _, evt := range pendingEvents {
272-
c.eventBus.Publish(evt.Type, evt)
265+
c.eventBus.Publish(ChainUpdateEventType, evt)
273266
}
274267
}
275268
batchOffset += batchSize
@@ -281,14 +274,12 @@ func (c *Chain) Rollback(point ocommon.Point) error {
281274
if c == nil {
282275
return errors.New("chain is nil")
283276
}
284-
// Perform all state mutations under locks, collect events to publish
285-
// outside locks to prevent deadlock if subscribers access chain state.
286277
pendingEvents, err := c.rollbackLocked(point)
287278
if err != nil {
288279
return err
289280
}
290-
// Publish events outside locks to prevent deadlock if subscribers
291-
// call back into chain or manager state
281+
// Publish events after locks are released to prevent deadlocks
282+
// when subscribers call back into chain/manager state.
292283
if c.eventBus != nil {
293284
for _, evt := range pendingEvents {
294285
c.eventBus.Publish(evt.Type, evt)
@@ -297,8 +288,8 @@ func (c *Chain) Rollback(point ocommon.Point) error {
297288
return nil
298289
}
299290

300-
// rollbackLocked performs Rollback state mutations under locks and returns
301-
// any events that should be published after locks are released.
291+
// rollbackLocked performs all rollback logic under locks and returns
292+
// events to be published by the caller after locks are released.
302293
func (c *Chain) rollbackLocked(
303294
point ocommon.Point,
304295
) ([]event.Event, error) {
@@ -344,14 +335,6 @@ func (c *Chain) rollbackLocked(
344335
}
345336
rollbackBlockIndex = tmpBlock.ID
346337
}
347-
// Guard against uint64 underflow from corrupt/stale data
348-
if rollbackBlockIndex > c.tipBlockIndex {
349-
return nil, fmt.Errorf(
350-
"rollback block index %d exceeds tip block index %d",
351-
rollbackBlockIndex,
352-
c.tipBlockIndex,
353-
)
354-
}
355338
// Calculate fork depth before deleting blocks
356339
forkDepth := c.tipBlockIndex - rollbackBlockIndex
357340
// Reject rollbacks that exceed the security parameter K on
@@ -434,10 +417,10 @@ func (c *Chain) rollbackLocked(
434417
iter.needsRollback = true
435418
}
436419
}
437-
// Build events for deferred publication
420+
// Build events for caller to publish after locks are released
438421
var pendingEvents []event.Event
439-
if c.eventBus != nil && len(rolledBackBlocks) > 0 {
440-
// Rollback event only emit when blocks were actually removed
422+
if len(rolledBackBlocks) > 0 {
423+
// Rollback event - only emit when blocks were actually removed
441424
pendingEvents = append(
442425
pendingEvents,
443426
event.NewEvent(

0 commit comments

Comments
 (0)