Skip to content

Commit bed6960

Browse files
committed
gnosis-keyper: add synchandler for new chainsyncer
1 parent 21315f5 commit bed6960

File tree

5 files changed

+500
-41
lines changed

5 files changed

+500
-41
lines changed

rolling-shutter/keyperimpl/gnosis/database/gnosiskeyper.sqlc.gen.go

Lines changed: 32 additions & 33 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rolling-shutter/keyperimpl/gnosis/database/sql/queries/gnosiskeyper.sql

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,10 @@ WHERE eon = $1 AND index >= $2 AND index < $2 + $3
2626
ORDER BY index ASC
2727
LIMIT $3;
2828

29-
-- name: SetTransactionSubmittedEventsSyncedUntil :exec
30-
INSERT INTO transaction_submitted_events_synced_until (block_hash, block_number, slot) VALUES ($1, $2, $3)
31-
ON CONFLICT (enforce_one_row) DO UPDATE
32-
SET block_hash = $1, block_number = $2, slot = $3;
33-
34-
-- name: GetTransactionSubmittedEventsSyncedUntil :one
35-
SELECT * FROM transaction_submitted_events_synced_until LIMIT 1;
29+
-- name: GetLatestTransactionSubmittedEvent :one
30+
SELECT * FROM transaction_submitted_event
31+
ORDER BY block_number DESC
32+
LIMIT 1;
3633

3734
-- name: GetTransactionSubmittedEventCount :one
3835
SELECT max(index) + 1 FROM transaction_submitted_event
@@ -41,6 +38,9 @@ WHERE eon = $1;
4138
-- name: DeleteTransactionSubmittedEventsFromBlockNumber :exec
4239
DELETE FROM transaction_submitted_event WHERE block_number >= $1;
4340

41+
-- name: DeleteTransactionSubmittedEventsFromBlockHash :exec
42+
DELETE FROM transaction_submitted_event WHERE block_hash == $1;
43+
4444
-- name: GetTxPointer :one
4545
SELECT * FROM tx_pointer
4646
WHERE eon = $1;
@@ -119,4 +119,4 @@ ORDER BY block_number DESC, tx_index DESC, log_index DESC
119119
LIMIT 1;
120120

121121
-- name: GetNumValidatorRegistrations :one
122-
SELECT COUNT(*) FROM validator_registrations;
122+
SELECT COUNT(*) FROM validator_registrations;
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package synchandler
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/ethereum/go-ethereum/core/types"
8+
"github.com/hashicorp/go-multierror"
9+
10+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/syncer"
11+
)
12+
13+
var _ syncer.ChainUpdateHandler = &DecryptionChainUpdateHandler{}
14+
15+
type DecryptionFunction = func(context.Context, *types.Header) error
16+
17+
func NewDecryptionChainUpdateHandler(fn DecryptionFunction) *DecryptionChainUpdateHandler {
18+
return &DecryptionChainUpdateHandler{
19+
decrypt: fn,
20+
}
21+
}
22+
23+
type DecryptionChainUpdateHandler struct {
24+
decrypt DecryptionFunction
25+
}
26+
27+
func (cu *DecryptionChainUpdateHandler) Handle(
28+
ctx context.Context,
29+
update syncer.ChainUpdateContext,
30+
) (result error) {
31+
// in case of a reorg (non-nil update.Remove segment) we can't roll back any
32+
// changes, since the keys have been release already publicly.
33+
if update.Append != nil {
34+
for _, header := range update.Append.Get() {
35+
// We can call the decrypt function with all updated headers,
36+
// even if this was a reorg.
37+
// This is because the downstream function is expected to keep track of
38+
// what slots have already been sent out and decide on itself wether
39+
// to re-release keys.
40+
err := cu.decrypt(ctx, header)
41+
if err != nil {
42+
result = multierror.Append(result,
43+
fmt.Errorf("failed to decrypt for block %s (num=%d): %w",
44+
header.Hash().String(),
45+
header.Number.Uint64(),
46+
err))
47+
}
48+
}
49+
}
50+
return nil
51+
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package synchandler
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math"
7+
8+
"github.com/ethereum/go-ethereum/accounts/abi"
9+
"github.com/ethereum/go-ethereum/common"
10+
"github.com/ethereum/go-ethereum/core/types"
11+
"github.com/jackc/pgx/v4"
12+
"github.com/jackc/pgx/v4/pgxpool"
13+
"github.com/pkg/errors"
14+
"github.com/rs/zerolog/log"
15+
bindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/sequencer"
16+
17+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database"
18+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/metrics"
19+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/syncer"
20+
"github.com/shutter-network/rolling-shutter/rolling-shutter/shdb"
21+
)
22+
23+
func init() {
24+
var err error
25+
SequencerContractABI, err = bindings.SequencerMetaData.GetAbi()
26+
if err != nil {
27+
panic(err)
28+
}
29+
}
30+
31+
var SequencerContractABI *abi.ABI
32+
33+
func NewSequencerTransactionSubmitted(dbPool *pgxpool.Pool, address common.Address) (syncer.ContractEventHandler, error) {
34+
return syncer.WrapHandler(
35+
&SequencerTransactionSubmitted{
36+
evABI: SequencerContractABI,
37+
address: address,
38+
dbPool: dbPool,
39+
})
40+
}
41+
42+
type SequencerTransactionSubmitted struct {
43+
evABI *abi.ABI
44+
address common.Address
45+
46+
dbPool *pgxpool.Pool
47+
}
48+
49+
func (sts *SequencerTransactionSubmitted) Address() common.Address {
50+
return sts.address
51+
}
52+
53+
func (*SequencerTransactionSubmitted) Event() string {
54+
return "TransactionSubmitted"
55+
}
56+
57+
func (sts *SequencerTransactionSubmitted) ABI() abi.ABI {
58+
return *sts.evABI
59+
}
60+
61+
func (sts *SequencerTransactionSubmitted) Accept(
62+
ctx context.Context,
63+
header types.Header,
64+
ev bindings.SequencerTransactionSubmitted,
65+
) (bool, error) {
66+
return true, nil
67+
}
68+
func (sts *SequencerTransactionSubmitted) Handle(
69+
ctx context.Context,
70+
update syncer.ChainUpdateContext,
71+
events []bindings.SequencerTransactionSubmitted,
72+
) error {
73+
numInsertedEvents := 0
74+
numDiscardedEvents := 0
75+
err := sts.dbPool.BeginFunc(ctx, func(tx pgx.Tx) error {
76+
db := database.New(tx)
77+
if update.Remove != nil {
78+
for _, header := range update.Remove.Get() {
79+
if err := db.DeleteTransactionSubmittedEventsFromBlockHash(ctx, header.Hash().Bytes()); err != nil {
80+
return errors.Wrap(err, "failed to delete transaction submitted events from db")
81+
}
82+
}
83+
log.Info().
84+
Int("depth", update.Remove.Len()).
85+
Int64("previous-synced-until", update.Remove.Latest().Number.Int64()).
86+
Int64("new-synced-until", update.Append.Latest().Number.Int64()).
87+
Msg("sync status reset due to reorg")
88+
}
89+
filteredEvents := sts.filterEvents(events)
90+
numDiscardedEvents = len(events) - len(filteredEvents)
91+
for _, event := range filteredEvents {
92+
_, err := db.InsertTransactionSubmittedEvent(ctx, database.InsertTransactionSubmittedEventParams{
93+
Index: int64(event.TxIndex),
94+
BlockNumber: int64(event.Raw.BlockNumber),
95+
BlockHash: event.Raw.BlockHash[:],
96+
TxIndex: int64(event.Raw.TxIndex),
97+
LogIndex: int64(event.Raw.Index),
98+
Eon: int64(event.Eon),
99+
IdentityPrefix: event.IdentityPrefix[:],
100+
Sender: shdb.EncodeAddress(event.Sender),
101+
GasLimit: event.GasLimit.Int64(),
102+
})
103+
if err != nil {
104+
return errors.Wrap(err, "failed to insert transaction submitted event into db")
105+
}
106+
numInsertedEvents++
107+
metrics.LatestTxSubmittedEventIndex.WithLabelValues(fmt.Sprint(event.Eon)).Set(float64(event.TxIndex))
108+
log.Debug().
109+
Uint64("index", event.TxIndex).
110+
Uint64("block", event.Raw.BlockNumber).
111+
Uint64("eon", event.Eon).
112+
Hex("identityPrefix", event.IdentityPrefix[:]).
113+
Hex("sender", event.Sender.Bytes()).
114+
Uint64("gasLimit", event.GasLimit.Uint64()).
115+
Msg("synced new transaction submitted event")
116+
}
117+
return nil
118+
})
119+
log.Info().
120+
Uint64("start-block", update.Append.Earliest().Number.Uint64()).
121+
Uint64("end-block", update.Append.Latest().Number.Uint64()).
122+
Int("num-inserted-events", numInsertedEvents).
123+
Int("num-discarded-events", numDiscardedEvents).
124+
Msg("synced sequencer contract")
125+
metrics.TxSubmittedEventsSyncedUntil.Set(float64(update.Append.Latest().Number.Uint64()))
126+
return err
127+
}
128+
129+
func (sts *SequencerTransactionSubmitted) filterEvents(
130+
events []bindings.SequencerTransactionSubmitted,
131+
) []bindings.SequencerTransactionSubmitted {
132+
filteredEvents := []bindings.SequencerTransactionSubmitted{}
133+
for _, event := range events {
134+
if event.Eon > math.MaxInt64 ||
135+
!event.GasLimit.IsInt64() {
136+
log.Debug().
137+
Uint64("eon", event.Eon).
138+
Uint64("block-number", event.Raw.BlockNumber).
139+
Str("block-hash", event.Raw.BlockHash.Hex()).
140+
Uint("tx-index", event.Raw.TxIndex).
141+
Uint("log-index", event.Raw.Index).
142+
Msg("ignoring transaction submitted event with high eon")
143+
continue
144+
}
145+
filteredEvents = append(filteredEvents, event)
146+
}
147+
return filteredEvents
148+
}

0 commit comments

Comments
 (0)