@@ -33,7 +33,7 @@ func (s *SequencerSyncer) Sync(ctx context.Context, block uint64) error {
33
33
if err == pgx .ErrNoRows {
34
34
start = 0
35
35
} else {
36
- start = uint64 (syncedUntilBlock ) + 1
36
+ start = uint64 (syncedUntilBlock + 1 )
37
37
}
38
38
39
39
log .Debug ().
@@ -69,19 +69,56 @@ func (s *SequencerSyncer) Sync(ctx context.Context, block uint64) error {
69
69
if it .Error () != nil {
70
70
return errors .Wrap (it .Error (), "failed to iterate transaction submitted events" )
71
71
}
72
-
73
72
if len (events ) == 0 {
74
73
log .Debug ().
75
74
Uint64 ("start-block" , start ).
76
75
Uint64 ("end-block" , block ).
77
76
Msg ("no transaction submitted events found" )
78
- return nil
79
77
}
80
78
81
79
return s .DBPool .BeginFunc (ctx , func (tx pgx.Tx ) error {
80
+ err = s .insertTransactionSubmittedEvents (ctx , tx , events )
81
+ if err != nil {
82
+ return err
83
+ }
84
+
85
+ newSyncedUntilBlock , err := medley .Uint64ToInt64Safe (block )
86
+ if err != nil {
87
+ return err
88
+ }
89
+ err = queries .SetTransactionSubmittedEventsSyncedUntil (ctx , newSyncedUntilBlock )
90
+ if err != nil {
91
+ return err
92
+ }
93
+ return nil
94
+ })
95
+ }
96
+
97
+ // insertTransactionSubmittedEvents inserts the given events into the database and updates the
98
+ // transaction submitted event number accordingly.
99
+ func (s * SequencerSyncer ) insertTransactionSubmittedEvents (
100
+ ctx context.Context ,
101
+ tx pgx.Tx ,
102
+ events []* sequencerBindings.SequencerTransactionSubmitted ,
103
+ ) error {
104
+ if len (events ) > 0 {
82
105
queries := database .New (tx )
106
+ nextEventIndices := make (map [uint64 ]int64 )
83
107
for _ , event := range events {
108
+ nextEventIndex , ok := nextEventIndices [event .Eon ]
109
+ if ! ok {
110
+ nextEventIndexFromDB , err := queries .GetTransactionSubmittedEventCount (ctx , int64 (event .Eon ))
111
+ if err == pgx .ErrNoRows {
112
+ nextEventIndexFromDB = 0
113
+ } else if err != nil {
114
+ return errors .Wrapf (err , "failed to query count of transaction submitted events for eon %d" , event .Eon )
115
+ }
116
+ nextEventIndices [event .Eon ] = nextEventIndexFromDB
117
+ nextEventIndex = nextEventIndexFromDB
118
+ }
119
+
84
120
_ , err := queries .InsertTransactionSubmittedEvent (ctx , database.InsertTransactionSubmittedEventParams {
121
+ Index : nextEventIndex ,
85
122
BlockNumber : int64 (event .Raw .BlockNumber ),
86
123
BlockHash : event .Raw .BlockHash [:],
87
124
TxIndex : int64 (event .Raw .TxIndex ),
@@ -94,18 +131,25 @@ func (s *SequencerSyncer) Sync(ctx context.Context, block uint64) error {
94
131
if err != nil {
95
132
return errors .Wrap (err , "failed to insert transaction submitted event into db" )
96
133
}
134
+ nextEventIndices [event .Eon ]++
97
135
log .Debug ().
136
+ Int64 ("index" , nextEventIndex ).
98
137
Uint64 ("block" , event .Raw .BlockNumber ).
99
138
Uint64 ("eon" , event .Eon ).
100
139
Hex ("identityPrefix" , event .IdentityPrefix [:]).
101
140
Hex ("sender" , event .Sender .Bytes ()).
102
141
Uint64 ("gasLimit" , event .GasLimit .Uint64 ()).
103
142
Msg ("synced new transaction submitted event" )
104
143
}
105
- newSyncedUntilBlock , err := medley .Uint64ToInt64Safe (block )
106
- if err != nil {
107
- return err
144
+ for eon , nextEventIndex := range nextEventIndices {
145
+ err := queries .SetTransactionSubmittedEventCount (ctx , database.SetTransactionSubmittedEventCountParams {
146
+ Eon : int64 (eon ),
147
+ EventCount : nextEventIndex ,
148
+ })
149
+ if err != nil {
150
+ return err
151
+ }
108
152
}
109
- return queries . SetTransactionSubmittedEventsSyncedUntil ( ctx , newSyncedUntilBlock )
110
- })
153
+ }
154
+ return nil
111
155
}
0 commit comments