Skip to content

Commit 89794cf

Browse files
committed
Add basic reorg handling
Reorgs are detected by comparing each incoming header with the current sync head. If the incoming header is supposed to be the child of the sync head according to its block number, but the parent hash is different, a reorg is detected.
1 parent e3e19bb commit 89794cf

File tree

3 files changed

+99
-0
lines changed

3 files changed

+99
-0
lines changed

rolling-shutter/keyperimpl/gnosis/database/gnosiskeyper.sqlc.gen.go

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rolling-shutter/keyperimpl/gnosis/database/sql/queries/gnosiskeyper.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ SELECT event_count FROM transaction_submitted_event_count
3838
WHERE eon = $1
3939
LIMIT 1;
4040

41+
-- name: DeleteTransactionSubmittedEventsFromBlockNumber :exec
42+
DELETE FROM transaction_submitted_event WHERE block_number >= $1;
43+
4144
-- name: GetTxPointer :one
4245
SELECT * FROM tx_pointer
4346
WHERE eon = $1;

rolling-shutter/keyperimpl/gnosis/sequencersyncer.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package gnosis
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
67
"math"
@@ -20,6 +21,8 @@ import (
2021
"github.com/shutter-network/rolling-shutter/rolling-shutter/shdb"
2122
)
2223

24+
const AssumedReorgDepth = 10
25+
2326
// SequencerSyncer inserts transaction submitted events from the sequencer contract into the database.
2427
type SequencerSyncer struct {
2528
Contract *sequencerBindings.Sequencer
@@ -30,10 +33,94 @@ type SequencerSyncer struct {
3033
SyncStartBlockNumber uint64
3134
}
3235

36+
// getNumReorgedBlocks returns the number of blocks that have already been synced, but are no
37+
// longer in the chain.
38+
func getNumReorgedBlocks(syncedUntil *database.TransactionSubmittedEventsSyncedUntil, header *types.Header) int {
39+
shouldBeParent := header.Number.Int64() == syncedUntil.BlockNumber+1
40+
isParent := bytes.Equal(header.ParentHash.Bytes(), syncedUntil.BlockHash)
41+
isReorg := shouldBeParent && !isParent
42+
if !isReorg {
43+
return 0
44+
}
45+
// We don't know how deep the reorg is, so we make a conservative guess. Assuming higher depths
46+
// is safer because it means we resync a little bit more.
47+
depth := AssumedReorgDepth
48+
if syncedUntil.BlockNumber < int64(depth) {
49+
return int(syncedUntil.BlockNumber)
50+
}
51+
return depth
52+
}
53+
54+
// resetSyncStatus clears the db from its recent history after a reorg of given depth.
55+
func (s *SequencerSyncer) resetSyncStatus(ctx context.Context, numReorgedBlocks int) error {
56+
if numReorgedBlocks == 0 {
57+
return nil
58+
}
59+
return s.DBPool.BeginFunc(ctx, func(tx pgx.Tx) error {
60+
queries := database.New(tx)
61+
62+
syncStatus, err := queries.GetTransactionSubmittedEventsSyncedUntil(ctx)
63+
if err != nil {
64+
return errors.Wrap(err, "failed to query sync status from db in order to reset it")
65+
}
66+
if syncStatus.BlockNumber < int64(numReorgedBlocks) {
67+
return errors.Wrapf(err, "detected reorg deeper (%d) than blocks synced (%d)", syncStatus.BlockNumber, numReorgedBlocks)
68+
}
69+
70+
deleteFromInclusive := syncStatus.BlockNumber - int64(numReorgedBlocks) + 1
71+
72+
err = queries.DeleteTransactionSubmittedEventsFromBlockNumber(ctx, deleteFromInclusive)
73+
if err != nil {
74+
return errors.Wrap(err, "failed to delete transaction submitted events from db")
75+
}
76+
// Currently, we don't have enough information in the db to populate block hash and slot.
77+
// However, using default values here is fine since the syncer is expected to resync
78+
// immediately after this function call which will set the correct values. When we do proper
79+
// reorg handling, we should store the full block data of the previous blocks so that we can
80+
// avoid this.
81+
newSyncedUntilBlockNumber := deleteFromInclusive - 1
82+
err = queries.SetTransactionSubmittedEventsSyncedUntil(ctx, database.SetTransactionSubmittedEventsSyncedUntilParams{
83+
BlockHash: []byte{},
84+
BlockNumber: newSyncedUntilBlockNumber,
85+
Slot: 0,
86+
})
87+
if err != nil {
88+
return errors.Wrap(err, "failed to reset transaction submitted event sync status in db")
89+
}
90+
log.Info().
91+
Int("depth", numReorgedBlocks).
92+
Int64("previous-synced-until", syncStatus.BlockNumber).
93+
Int64("new-synced-until", newSyncedUntilBlockNumber).
94+
Msg("sync status reset due to reorg")
95+
return nil
96+
})
97+
}
98+
99+
func (s *SequencerSyncer) handlePotentialReorg(ctx context.Context, header *types.Header) error {
100+
queries := database.New(s.DBPool)
101+
syncedUntil, err := queries.GetTransactionSubmittedEventsSyncedUntil(ctx)
102+
if err == pgx.ErrNoRows {
103+
return nil
104+
}
105+
if err != nil {
106+
return errors.Wrap(err, "failed to query transaction submitted events sync status")
107+
}
108+
109+
numReorgedBlocks := getNumReorgedBlocks(&syncedUntil, header)
110+
if numReorgedBlocks > 0 {
111+
return s.resetSyncStatus(ctx, numReorgedBlocks)
112+
}
113+
return nil
114+
}
115+
33116
// Sync fetches transaction submitted events from the sequencer contract and inserts them into the
34117
// database. It starts at the end point of the previous call to sync (or 0 if it is the first call)
35118
// and ends at the given block number.
36119
func (s *SequencerSyncer) Sync(ctx context.Context, header *types.Header) error {
120+
if err := s.handlePotentialReorg(ctx, header); err != nil {
121+
return err
122+
}
123+
37124
queries := database.New(s.DBPool)
38125
syncedUntil, err := queries.GetTransactionSubmittedEventsSyncedUntil(ctx)
39126
if err != nil && err != pgx.ErrNoRows {

0 commit comments

Comments
 (0)