1
1
package gnosis
2
2
3
3
import (
4
+ "bytes"
4
5
"context"
5
6
"fmt"
6
7
"math"
@@ -20,6 +21,8 @@ import (
20
21
"github.com/shutter-network/rolling-shutter/rolling-shutter/shdb"
21
22
)
22
23
24
+ const AssumedReorgDepth = 10
25
+
23
26
// SequencerSyncer inserts transaction submitted events from the sequencer contract into the database.
24
27
type SequencerSyncer struct {
25
28
Contract * sequencerBindings.Sequencer
@@ -30,10 +33,94 @@ type SequencerSyncer struct {
30
33
SyncStartBlockNumber uint64
31
34
}
32
35
36
+ // getNumReorgedBlocks returns the number of blocks that have already been synced, but are no
37
+ // longer in the chain.
38
+ func getNumReorgedBlocks (syncedUntil * database.TransactionSubmittedEventsSyncedUntil , header * types.Header ) int {
39
+ shouldBeParent := header .Number .Int64 () == syncedUntil .BlockNumber + 1
40
+ isParent := bytes .Equal (header .ParentHash .Bytes (), syncedUntil .BlockHash )
41
+ isReorg := shouldBeParent && ! isParent
42
+ if ! isReorg {
43
+ return 0
44
+ }
45
+ // We don't know how deep the reorg is, so we make a conservative guess. Assuming higher depths
46
+ // is safer because it means we resync a little bit more.
47
+ depth := AssumedReorgDepth
48
+ if syncedUntil .BlockNumber < int64 (depth ) {
49
+ return int (syncedUntil .BlockNumber )
50
+ }
51
+ return depth
52
+ }
53
+
54
+ // resetSyncStatus clears the db from its recent history after a reorg of given depth.
55
+ func (s * SequencerSyncer ) resetSyncStatus (ctx context.Context , numReorgedBlocks int ) error {
56
+ if numReorgedBlocks == 0 {
57
+ return nil
58
+ }
59
+ return s .DBPool .BeginFunc (ctx , func (tx pgx.Tx ) error {
60
+ queries := database .New (tx )
61
+
62
+ syncStatus , err := queries .GetTransactionSubmittedEventsSyncedUntil (ctx )
63
+ if err != nil {
64
+ return errors .Wrap (err , "failed to query sync status from db in order to reset it" )
65
+ }
66
+ if syncStatus .BlockNumber < int64 (numReorgedBlocks ) {
67
+ return errors .Wrapf (err , "detected reorg deeper (%d) than blocks synced (%d)" , syncStatus .BlockNumber , numReorgedBlocks )
68
+ }
69
+
70
+ deleteFromInclusive := syncStatus .BlockNumber - int64 (numReorgedBlocks ) + 1
71
+
72
+ err = queries .DeleteTransactionSubmittedEventsFromBlockNumber (ctx , deleteFromInclusive )
73
+ if err != nil {
74
+ return errors .Wrap (err , "failed to delete transaction submitted events from db" )
75
+ }
76
+ // Currently, we don't have enough information in the db to populate block hash and slot.
77
+ // However, using default values here is fine since the syncer is expected to resync
78
+ // immediately after this function call which will set the correct values. When we do proper
79
+ // reorg handling, we should store the full block data of the previous blocks so that we can
80
+ // avoid this.
81
+ newSyncedUntilBlockNumber := deleteFromInclusive - 1
82
+ err = queries .SetTransactionSubmittedEventsSyncedUntil (ctx , database.SetTransactionSubmittedEventsSyncedUntilParams {
83
+ BlockHash : []byte {},
84
+ BlockNumber : newSyncedUntilBlockNumber ,
85
+ Slot : 0 ,
86
+ })
87
+ if err != nil {
88
+ return errors .Wrap (err , "failed to reset transaction submitted event sync status in db" )
89
+ }
90
+ log .Info ().
91
+ Int ("depth" , numReorgedBlocks ).
92
+ Int64 ("previous-synced-until" , syncStatus .BlockNumber ).
93
+ Int64 ("new-synced-until" , newSyncedUntilBlockNumber ).
94
+ Msg ("sync status reset due to reorg" )
95
+ return nil
96
+ })
97
+ }
98
+
99
+ func (s * SequencerSyncer ) handlePotentialReorg (ctx context.Context , header * types.Header ) error {
100
+ queries := database .New (s .DBPool )
101
+ syncedUntil , err := queries .GetTransactionSubmittedEventsSyncedUntil (ctx )
102
+ if err == pgx .ErrNoRows {
103
+ return nil
104
+ }
105
+ if err != nil {
106
+ return errors .Wrap (err , "failed to query transaction submitted events sync status" )
107
+ }
108
+
109
+ numReorgedBlocks := getNumReorgedBlocks (& syncedUntil , header )
110
+ if numReorgedBlocks > 0 {
111
+ return s .resetSyncStatus (ctx , numReorgedBlocks )
112
+ }
113
+ return nil
114
+ }
115
+
33
116
// Sync fetches transaction submitted events from the sequencer contract and inserts them into the
34
117
// database. It starts at the end point of the previous call to sync (or 0 if it is the first call)
35
118
// and ends at the given block number.
36
119
func (s * SequencerSyncer ) Sync (ctx context.Context , header * types.Header ) error {
120
+ if err := s .handlePotentialReorg (ctx , header ); err != nil {
121
+ return err
122
+ }
123
+
37
124
queries := database .New (s .DBPool )
38
125
syncedUntil , err := queries .GetTransactionSubmittedEventsSyncedUntil (ctx )
39
126
if err != nil && err != pgx .ErrNoRows {
0 commit comments