Skip to content

Commit 9059382

Browse files
committed
gnosis-keyper: add synchandler for new chainsyncer
1 parent 6d5dca7 commit 9059382

File tree

5 files changed

+512
-1
lines changed

5 files changed

+512
-1
lines changed

rolling-shutter/keyperimpl/gnosis/database/gnosiskeyper.sqlc.gen.go

Lines changed: 32 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rolling-shutter/keyperimpl/gnosis/database/sql/queries/gnosiskeyper.sql

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ SET block_hash = $1, block_number = $2, slot = $3;
3333

3434
-- name: GetTransactionSubmittedEventsSyncedUntil :one
3535
SELECT * FROM transaction_submitted_events_synced_until LIMIT 1;
36+
-- name: GetLatestTransactionSubmittedEvent :one
37+
SELECT * FROM transaction_submitted_event
38+
ORDER BY block_number DESC
39+
LIMIT 1;
3640

3741
-- name: GetTransactionSubmittedEventCount :one
3842
SELECT max(index) + 1 FROM transaction_submitted_event
@@ -41,6 +45,9 @@ WHERE eon = $1;
4145
-- name: DeleteTransactionSubmittedEventsFromBlockNumber :exec
4246
DELETE FROM transaction_submitted_event WHERE block_number >= $1;
4347

48+
-- name: DeleteTransactionSubmittedEventsFromBlockHash :exec
49+
DELETE FROM transaction_submitted_event WHERE block_hash == $1;
50+
4451
-- name: GetTxPointer :one
4552
SELECT * FROM tx_pointer
4653
WHERE eon = $1;
@@ -119,4 +126,4 @@ ORDER BY block_number DESC, tx_index DESC, log_index DESC
119126
LIMIT 1;
120127

121128
-- name: GetNumValidatorRegistrations :one
122-
SELECT COUNT(*) FROM validator_registrations;
129+
SELECT COUNT(*) FROM validator_registrations;
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package synchandler
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/ethereum/go-ethereum/core/types"
8+
"github.com/hashicorp/go-multierror"
9+
10+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/syncer"
11+
)
12+
13+
var _ syncer.ChainUpdateHandler = &DecryptOnChainUpdate{}
14+
15+
type DecryptionFunction = func(context.Context, *types.Header) error
16+
17+
func NewDecryptOnChainUpdate(fn DecryptionFunction) *DecryptOnChainUpdate {
18+
return &DecryptOnChainUpdate{
19+
decrypt: fn,
20+
}
21+
}
22+
23+
type DecryptOnChainUpdate struct {
24+
decrypt DecryptionFunction
25+
}
26+
27+
func (cu *DecryptOnChainUpdate) Handle(
28+
ctx context.Context,
29+
update syncer.ChainUpdateContext,
30+
) (result error) {
31+
// in case of a reorg (non-nil update.Remove segment) we can't roll back any
32+
// changes, since the keys have been release already publicly.
33+
if update.Append != nil {
34+
for _, header := range update.Append.Get() {
35+
// We can call the decrypt function with all updated headers,
36+
// even if this was a reorg.
37+
// This is because the downstream function is expected to keep track of
38+
// what slots have already been sent out and decide on itself wether
39+
// to re-release keys.
40+
err := cu.decrypt(ctx, header)
41+
if err != nil {
42+
result = multierror.Append(result,
43+
fmt.Errorf("failed to decrypt for block %s (num=%d): %w",
44+
header.Hash().String(),
45+
header.Number.Uint64(),
46+
err))
47+
}
48+
}
49+
}
50+
return nil
51+
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package synchandler
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math"
7+
8+
"github.com/ethereum/go-ethereum/accounts/abi"
9+
"github.com/ethereum/go-ethereum/common"
10+
"github.com/ethereum/go-ethereum/core/types"
11+
"github.com/jackc/pgx/v4"
12+
"github.com/jackc/pgx/v4/pgxpool"
13+
"github.com/pkg/errors"
14+
"github.com/rs/zerolog/log"
15+
bindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/sequencer"
16+
17+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database"
18+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/metrics"
19+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/syncer"
20+
"github.com/shutter-network/rolling-shutter/rolling-shutter/shdb"
21+
)
22+
23+
func init() {
24+
var err error
25+
SequencerContractABI, err = bindings.SequencerMetaData.GetAbi()
26+
if err != nil {
27+
panic(err)
28+
}
29+
}
30+
31+
var SequencerContractABI *abi.ABI
32+
33+
func NewSequencerTransactionSubmitted(dbPool *pgxpool.Pool, address common.Address) (syncer.ContractEventHandler, error) {
34+
return syncer.WrapHandler(
35+
&SequencerTransactionSubmitted{
36+
evABI: SequencerContractABI,
37+
address: address,
38+
dbPool: dbPool,
39+
})
40+
}
41+
42+
type SequencerTransactionSubmitted struct {
43+
evABI *abi.ABI
44+
address common.Address
45+
46+
dbPool *pgxpool.Pool
47+
}
48+
49+
func (sts *SequencerTransactionSubmitted) Address() common.Address {
50+
return sts.address
51+
}
52+
53+
func (*SequencerTransactionSubmitted) Event() string {
54+
return "TransactionSubmitted"
55+
}
56+
57+
func (sts *SequencerTransactionSubmitted) ABI() abi.ABI {
58+
return *sts.evABI
59+
}
60+
61+
func (sts *SequencerTransactionSubmitted) Accept(
62+
_ context.Context,
63+
_ types.Header,
64+
_ bindings.SequencerTransactionSubmitted,
65+
) (bool, error) {
66+
return true, nil
67+
}
68+
69+
func (sts *SequencerTransactionSubmitted) Handle(
70+
ctx context.Context,
71+
update syncer.ChainUpdateContext,
72+
events []bindings.SequencerTransactionSubmitted,
73+
) error {
74+
numInsertedEvents := 0
75+
numDiscardedEvents := 0
76+
err := sts.dbPool.BeginFunc(ctx, func(tx pgx.Tx) error {
77+
db := database.New(tx)
78+
if update.Remove != nil {
79+
for _, header := range update.Remove.Get() {
80+
if err := db.DeleteTransactionSubmittedEventsFromBlockHash(ctx, header.Hash().Bytes()); err != nil {
81+
return errors.Wrap(err, "failed to delete transaction submitted events from db")
82+
}
83+
}
84+
log.Info().
85+
Int("depth", update.Remove.Len()).
86+
Int64("previous-synced-until", update.Remove.Latest().Number.Int64()).
87+
Int64("new-synced-until", update.Append.Latest().Number.Int64()).
88+
Msg("sync status reset due to reorg")
89+
}
90+
filteredEvents := sts.filterEvents(events)
91+
numDiscardedEvents = len(events) - len(filteredEvents)
92+
for _, event := range filteredEvents {
93+
_, err := db.InsertTransactionSubmittedEvent(ctx, database.InsertTransactionSubmittedEventParams{
94+
Index: int64(event.TxIndex),
95+
BlockNumber: int64(event.Raw.BlockNumber),
96+
BlockHash: event.Raw.BlockHash[:],
97+
TxIndex: int64(event.Raw.TxIndex),
98+
LogIndex: int64(event.Raw.Index),
99+
Eon: int64(event.Eon),
100+
IdentityPrefix: event.IdentityPrefix[:],
101+
Sender: shdb.EncodeAddress(event.Sender),
102+
GasLimit: event.GasLimit.Int64(),
103+
})
104+
if err != nil {
105+
return errors.Wrap(err, "failed to insert transaction submitted event into db")
106+
}
107+
numInsertedEvents++
108+
metrics.LatestTxSubmittedEventIndex.WithLabelValues(fmt.Sprint(event.Eon)).Set(float64(event.TxIndex))
109+
log.Debug().
110+
Uint64("index", event.TxIndex).
111+
Uint64("block", event.Raw.BlockNumber).
112+
Uint64("eon", event.Eon).
113+
Hex("identityPrefix", event.IdentityPrefix[:]).
114+
Hex("sender", event.Sender.Bytes()).
115+
Uint64("gasLimit", event.GasLimit.Uint64()).
116+
Msg("synced new transaction submitted event")
117+
}
118+
return nil
119+
})
120+
log.Info().
121+
Uint64("start-block", update.Append.Earliest().Number.Uint64()).
122+
Uint64("end-block", update.Append.Latest().Number.Uint64()).
123+
Int("num-inserted-events", numInsertedEvents).
124+
Int("num-discarded-events", numDiscardedEvents).
125+
Msg("synced sequencer contract")
126+
metrics.TxSubmittedEventsSyncedUntil.Set(float64(update.Append.Latest().Number.Uint64()))
127+
return err
128+
}
129+
130+
func (sts *SequencerTransactionSubmitted) filterEvents(
131+
events []bindings.SequencerTransactionSubmitted,
132+
) []bindings.SequencerTransactionSubmitted {
133+
filteredEvents := []bindings.SequencerTransactionSubmitted{}
134+
// TODO: before the refactoring, a mux'ed field was read here
135+
// in order to filter out Eons that are before "our" first
136+
// keyper-sets eon.
137+
// Do this by direcly reading "our" keyper-sets from the DB
138+
// without manually iterating over all keyper-sets / addresses.
139+
// This requires persisting this information upon insertion
140+
// in the KeyperSetAdded event handler and thus a database migration.
141+
//
142+
// For now, insert all events into the DB.
143+
// We could clean up this from time to time as a temporary measure
144+
// to not make the DB grow unnecessarily.
145+
for _, event := range events {
146+
if event.Eon > math.MaxInt64 ||
147+
!event.GasLimit.IsInt64() {
148+
log.Debug().
149+
Uint64("eon", event.Eon).
150+
Uint64("block-number", event.Raw.BlockNumber).
151+
Str("block-hash", event.Raw.BlockHash.Hex()).
152+
Uint("tx-index", event.Raw.TxIndex).
153+
Uint("log-index", event.Raw.Index).
154+
Msg("ignoring transaction submitted event")
155+
continue
156+
}
157+
filteredEvents = append(filteredEvents, event)
158+
}
159+
return filteredEvents
160+
}

0 commit comments

Comments
 (0)