Skip to content

Commit ca187b3

Browse files
committed
Improve contract syncing
Instead of syncing the whole block range in a single request to the execution node, split it up into multiple. This is important at initial sync and after a long period of being offline. Also, perform an initial sync at startup, so that this potentially long operation does not happen during normal slot processing.
1 parent 5c981ef commit ca187b3

File tree

5 files changed

+264
-106
lines changed

5 files changed

+264
-106
lines changed

rolling-shutter/keyperimpl/gnosis/keyper.go

Lines changed: 59 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -100,28 +100,6 @@ func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error {
100100
return errors.Wrap(err, "failed to initialize beacon API client")
101101
}
102102

103-
validatorSyncerClient, err := ethclient.DialContext(ctx, kpr.config.Gnosis.Node.EthereumURL)
104-
if err != nil {
105-
return errors.Wrap(err, "failed to dial ethereum node")
106-
}
107-
chainID, err := validatorSyncerClient.ChainID(ctx)
108-
if err != nil {
109-
return errors.Wrap(err, "failed to get chain ID")
110-
}
111-
validatorRegistryContract, err := validatorRegistryBindings.NewValidatorregistry(
112-
kpr.config.Gnosis.Contracts.ValidatorRegistry,
113-
validatorSyncerClient,
114-
)
115-
if err != nil {
116-
return errors.Wrap(err, "failed to instantiate validator registry contract")
117-
}
118-
kpr.validatorSyncer = &ValidatorSyncer{
119-
Contract: validatorRegistryContract,
120-
DBPool: kpr.dbpool,
121-
BeaconAPIClient: kpr.beaconAPIClient,
122-
ChainID: chainID.Uint64(),
123-
}
124-
125103
messageSender, err := p2p.New(kpr.config.P2P)
126104
if err != nil {
127105
return errors.Wrap(err, "failed to initialize p2p messaging")
@@ -183,6 +161,10 @@ func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error {
183161
if err != nil {
184162
return err
185163
}
164+
err = kpr.initValidatorSyncer(ctx)
165+
if err != nil {
166+
return err
167+
}
186168

187169
runner.Go(func() error { return kpr.processInputs(ctx) })
188170
return runner.StartService(kpr.core, kpr.chainSyncClient, kpr.slotTicker, kpr.eonKeyPublisher)
@@ -223,22 +205,24 @@ func (kpr *Keyper) initSequencerSyncer(ctx context.Context) error {
223205
}
224206

225207
func (kpr *Keyper) ensureSequencerSyncing(ctx context.Context, eon uint64) error {
208+
client, err := ethclient.DialContext(ctx, kpr.config.Gnosis.Node.ContractsURL)
209+
if err != nil {
210+
return errors.Wrap(err, "failed to dial Ethereum execution node")
211+
}
212+
226213
if kpr.sequencerSyncer == nil {
227214
log.Info().
228215
Uint64("eon", eon).
229216
Str("contract-address", kpr.config.Gnosis.Contracts.KeyperSetManager.Hex()).
230217
Msg("initializing sequencer syncer")
231-
client, err := ethclient.DialContext(ctx, kpr.config.Gnosis.Node.ContractsURL)
232-
if err != nil {
233-
return err
234-
}
235218
contract, err := sequencerBindings.NewSequencer(kpr.config.Gnosis.Contracts.Sequencer, client)
236219
if err != nil {
237220
return err
238221
}
239222
kpr.sequencerSyncer = &SequencerSyncer{
240223
Contract: contract,
241224
DBPool: kpr.dbpool,
225+
ExecutionClient: client,
242226
StartEon: eon,
243227
GenesisSlotTimestamp: kpr.config.Gnosis.GenesisSlotTimestamp,
244228
SecondsPerSlot: kpr.config.Gnosis.SecondsPerSlot,
@@ -252,6 +236,55 @@ func (kpr *Keyper) ensureSequencerSyncing(ctx context.Context, eon uint64) error
252236
Msg("decreasing sequencer syncing start eon")
253237
kpr.sequencerSyncer.StartEon = eon
254238
}
239+
240+
// Perform an initial sync now because it might take some time and doing so during regular
241+
// slot processing might hold up things
242+
latestHeader, err := client.HeaderByNumber(ctx, nil)
243+
if err != nil {
244+
return errors.Wrap(err, "failed to get latest block header")
245+
}
246+
err = kpr.sequencerSyncer.Sync(ctx, latestHeader)
247+
if err != nil {
248+
return err
249+
}
250+
251+
return nil
252+
}
253+
254+
func (kpr *Keyper) initValidatorSyncer(ctx context.Context) error {
255+
validatorSyncerClient, err := ethclient.DialContext(ctx, kpr.config.Gnosis.Node.EthereumURL)
256+
if err != nil {
257+
return errors.Wrap(err, "failed to dial ethereum node")
258+
}
259+
chainID, err := validatorSyncerClient.ChainID(ctx)
260+
if err != nil {
261+
return errors.Wrap(err, "failed to get chain ID")
262+
}
263+
validatorRegistryContract, err := validatorRegistryBindings.NewValidatorregistry(
264+
kpr.config.Gnosis.Contracts.ValidatorRegistry,
265+
validatorSyncerClient,
266+
)
267+
if err != nil {
268+
return errors.Wrap(err, "failed to instantiate validator registry contract")
269+
}
270+
kpr.validatorSyncer = &ValidatorSyncer{
271+
Contract: validatorRegistryContract,
272+
DBPool: kpr.dbpool,
273+
BeaconAPIClient: kpr.beaconAPIClient,
274+
ExecutionClient: validatorSyncerClient,
275+
ChainID: chainID.Uint64(),
276+
}
277+
278+
// Perform an initial sync now because it might take some time and doing so during regular
279+
// slot processing might hold up things
280+
latestHeader, err := validatorSyncerClient.HeaderByNumber(ctx, nil)
281+
if err != nil {
282+
return errors.Wrap(err, "failed to get latest block header")
283+
}
284+
err = kpr.validatorSyncer.Sync(ctx, latestHeader)
285+
if err != nil {
286+
return err
287+
}
255288
return nil
256289
}
257290

rolling-shutter/keyperimpl/gnosis/sequencersyncer.go

Lines changed: 81 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package gnosis
33
import (
44
"context"
55
"math"
6+
"math/big"
67

78
"github.com/ethereum/go-ethereum/accounts/abi/bind"
89
"github.com/ethereum/go-ethereum/core/types"
10+
"github.com/ethereum/go-ethereum/ethclient"
911
"github.com/jackc/pgx/v4"
1012
"github.com/jackc/pgx/v4/pgxpool"
1113
"github.com/pkg/errors"
@@ -21,6 +23,7 @@ import (
2123
type SequencerSyncer struct {
2224
Contract *sequencerBindings.Sequencer
2325
DBPool *pgxpool.Pool
26+
ExecutionClient *ethclient.Client
2427
StartEon uint64
2528
GenesisSlotTimestamp uint64
2629
SecondsPerSlot uint64
@@ -41,69 +44,103 @@ func (s *SequencerSyncer) Sync(ctx context.Context, header *types.Header) error
4144
} else {
4245
start = uint64(syncedUntil.BlockNumber + 1)
4346
}
44-
47+
endBlock := header.Number.Uint64()
4548
log.Debug().
4649
Uint64("start-block", start).
47-
Uint64("end-block", header.Number.Uint64()).
50+
Uint64("end-block", endBlock).
4851
Msg("syncing sequencer contract")
4952

50-
endBlock := header.Number.Uint64()
53+
syncRanges := medley.GetSyncRanges(start, endBlock, maxRequestBlockRange)
54+
for _, r := range syncRanges {
55+
err = s.syncRange(ctx, r[0], r[1])
56+
if err != nil {
57+
return err
58+
}
59+
}
60+
return nil
61+
}
62+
63+
func (s *SequencerSyncer) syncRange(
64+
ctx context.Context,
65+
start,
66+
end uint64,
67+
) error {
68+
events, err := s.fetchEvents(ctx, start, end)
69+
if err != nil {
70+
return err
71+
}
72+
filteredEvents := s.filterEvents(events)
73+
74+
header, err := s.ExecutionClient.HeaderByNumber(ctx, new(big.Int).SetUint64(end))
75+
if err != nil {
76+
return errors.Wrap(err, "failed to get execution block header by number")
77+
}
78+
err = s.DBPool.BeginFunc(ctx, func(tx pgx.Tx) error {
79+
err = s.insertTransactionSubmittedEvents(ctx, tx, filteredEvents)
80+
if err != nil {
81+
return err
82+
}
83+
84+
slot := medley.BlockTimestampToSlot(header.Time, s.GenesisSlotTimestamp, s.SecondsPerSlot)
85+
return database.New(tx).SetTransactionSubmittedEventsSyncedUntil(ctx, database.SetTransactionSubmittedEventsSyncedUntilParams{
86+
BlockNumber: int64(end),
87+
BlockHash: header.Hash().Bytes(),
88+
Slot: int64(slot),
89+
})
90+
})
91+
log.Info().
92+
Uint64("start-block", start).
93+
Uint64("end-block", end).
94+
Int("num-inserted-events", len(filteredEvents)).
95+
Int("num-discarded-events", len(events)-len(filteredEvents)).
96+
Msg("synced sequencer contract")
97+
return nil
98+
}
99+
100+
func (s *SequencerSyncer) fetchEvents(
101+
ctx context.Context,
102+
start,
103+
end uint64,
104+
) ([]*sequencerBindings.SequencerTransactionSubmitted, error) {
51105
opts := bind.FilterOpts{
52106
Start: start,
53-
End: &endBlock,
107+
End: &end,
54108
Context: ctx,
55109
}
56110
it, err := s.Contract.SequencerFilterer.FilterTransactionSubmitted(&opts)
57111
if err != nil {
58-
return errors.Wrap(err, "failed to query transaction submitted events")
112+
return nil, errors.Wrap(err, "failed to query transaction submitted events")
59113
}
60114
events := []*sequencerBindings.SequencerTransactionSubmitted{}
61115
for it.Next() {
62-
if it.Event.Eon < s.StartEon ||
63-
it.Event.Eon > math.MaxInt64 ||
64-
!it.Event.GasLimit.IsInt64() {
65-
log.Debug().
66-
Uint64("eon", it.Event.Eon).
67-
Uint64("block-number", it.Event.Raw.BlockNumber).
68-
Str("block-hash", it.Event.Raw.BlockHash.Hex()).
69-
Uint("tx-index", it.Event.Raw.TxIndex).
70-
Uint("log-index", it.Event.Raw.Index).
71-
Msg("ignoring transaction submitted event")
72-
continue
73-
}
74116
events = append(events, it.Event)
75117
}
76118
if it.Error() != nil {
77-
return errors.Wrap(it.Error(), "failed to iterate transaction submitted events")
78-
}
79-
if len(events) == 0 {
80-
log.Debug().
81-
Uint64("start-block", start).
82-
Uint64("end-block", endBlock).
83-
Msg("no transaction submitted events found")
119+
return nil, errors.Wrap(it.Error(), "failed to iterate transaction submitted events")
84120
}
121+
return events, nil
122+
}
85123

86-
return s.DBPool.BeginFunc(ctx, func(tx pgx.Tx) error {
87-
err = s.insertTransactionSubmittedEvents(ctx, tx, events)
88-
if err != nil {
89-
return err
90-
}
91-
92-
newSyncedUntilBlock, err := medley.Uint64ToInt64Safe(endBlock)
93-
if err != nil {
94-
return err
95-
}
96-
slot := medley.BlockTimestampToSlot(header.Time, s.GenesisSlotTimestamp, s.SecondsPerSlot)
97-
err = queries.SetTransactionSubmittedEventsSyncedUntil(ctx, database.SetTransactionSubmittedEventsSyncedUntilParams{
98-
BlockNumber: newSyncedUntilBlock,
99-
BlockHash: header.Hash().Bytes(),
100-
Slot: int64(slot),
101-
})
102-
if err != nil {
103-
return err
124+
func (s *SequencerSyncer) filterEvents(
125+
events []*sequencerBindings.SequencerTransactionSubmitted,
126+
) []*sequencerBindings.SequencerTransactionSubmitted {
127+
filteredEvents := []*sequencerBindings.SequencerTransactionSubmitted{}
128+
for _, event := range events {
129+
if event.Eon < s.StartEon ||
130+
event.Eon > math.MaxInt64 ||
131+
!event.GasLimit.IsInt64() {
132+
log.Debug().
133+
Uint64("eon", event.Eon).
134+
Uint64("block-number", event.Raw.BlockNumber).
135+
Str("block-hash", event.Raw.BlockHash.Hex()).
136+
Uint("tx-index", event.Raw.TxIndex).
137+
Uint("log-index", event.Raw.Index).
138+
Msg("ignoring transaction submitted event")
139+
continue
104140
}
105-
return nil
106-
})
141+
filteredEvents = append(filteredEvents, event)
142+
}
143+
return filteredEvents
107144
}
108145

109146
// insertTransactionSubmittedEvents inserts the given events into the database and updates the

0 commit comments

Comments
 (0)