Skip to content

Commit 2c8842e

Browse files
committed
feat(chainsync): sync contract event handler with latest head
1 parent acb6f1b commit 2c8842e

File tree

9 files changed

+317
-108
lines changed

9 files changed

+317
-108
lines changed

rolling-shutter/keyperimpl/optimism/keyper.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,14 @@ func (kpr *Keyper) newEonPublicKey(ctx context.Context, pubKey keyper.EonPublicK
144144
log.Info().
145145
Uint64("eon", pubKey.Eon).
146146
Uint64("activation-block", pubKey.ActivationBlock).
147+
Bytes("pub-key", pubKey.PublicKey).
147148
Msg("new eon pk")
148149
// Currently all keypers call this and race to call this function first.
149150
// For now this is fine, but a keyper should only send a transaction if
150151
// the key is not set yet.
151152
// Best would be a coordinatated leader election who will broadcast the key.
153+
// FIXME: the syncer receives an empty key byte.
154+
// Is this already
152155
tx, err := kpr.l2Client.BroadcastEonKey(ctx, pubKey.Eon, pubKey.PublicKey)
153156
if err != nil {
154157
log.Error().Err(err).Msg("error broadcasting eon public key")

rolling-shutter/medley/chainsync/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ var noopLogger = &logger.NoopLogger{}
2424
var ErrServiceNotInstantiated = errors.New("service is not instantiated, pass a handler function option")
2525

2626
type Client struct {
27-
client.Client
27+
client.EthereumClient
2828
log log.Logger
2929

3030
options *options
@@ -136,7 +136,7 @@ func (s *Client) BroadcastEonKey(ctx context.Context, eon uint64, eonPubKey []by
136136
// This value is cached, since it is not expected to change.
137137
func (s *Client) ChainID(ctx context.Context) (*big.Int, error) {
138138
if s.chainID == nil {
139-
cid, err := s.Client.ChainID(ctx)
139+
cid, err := s.EthereumClient.ChainID(ctx)
140140
if err != nil {
141141
return nil, err
142142
}

rolling-shutter/medley/chainsync/client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"github.com/ethereum/go-ethereum/core/types"
1010
)
1111

12-
type Client interface {
12+
type EthereumClient interface {
1313
Close()
1414
ChainID(ctx context.Context) (*big.Int, error)
1515
BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error)

rolling-shutter/medley/chainsync/options.go

Lines changed: 58 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type options struct {
2424
keyperSetManagerAddress *common.Address
2525
keyBroadcastContractAddress *common.Address
2626
clientURL string
27-
client syncclient.Client
27+
ethClient syncclient.EthereumClient
2828
logger log.Logger
2929
runner service.Runner
3030
syncStart *number.BlockNumber
@@ -37,11 +37,11 @@ type options struct {
3737
}
3838

3939
func (o *options) verify() error {
40-
if o.clientURL != "" && o.client != nil {
40+
if o.clientURL != "" && o.ethClient != nil {
4141
// TODO: error message
4242
return errors.New("can't use client and client url")
4343
}
44-
if o.clientURL == "" && o.client == nil {
44+
if o.clientURL == "" && o.ethClient == nil {
4545
// TODO: error message
4646
return errors.New("have to provide either url or client")
4747
}
@@ -56,24 +56,30 @@ func (o *options) verify() error {
5656
// of shutter clients background workers.
5757
func (o *options) apply(ctx context.Context, c *Client) error {
5858
var (
59-
client syncclient.Client
59+
client syncclient.EthereumClient
6060
err error
6161
)
6262
if o.clientURL != "" {
63-
o.client, err = ethclient.DialContext(ctx, o.clientURL)
63+
o.ethClient, err = ethclient.DialContext(ctx, o.clientURL)
6464
if err != nil {
6565
return err
6666
}
6767
}
68-
client = o.client
69-
c.log = o.logger
68+
client = o.ethClient
7069

71-
c.Client = client
70+
c.EthereumClient = client
7271

72+
if o.logger != nil {
73+
c.log = o.logger
74+
// NOCHECKIN:
75+
c.log.Info("got logger in options")
76+
}
77+
78+
syncedServices := []syncer.ManualFilterHandler{}
7379
// the nil passthrough will use "latest" for each call,
7480
// but we want to harmonize and fix the sync start to a specific block.
7581
if o.syncStart.IsLatest() {
76-
latestBlock, err := c.Client.BlockNumber(ctx)
82+
latestBlock, err := c.EthereumClient.BlockNumber(ctx)
7783
if err != nil {
7884
return errors.Wrap(err, "polling latest block")
7985
}
@@ -85,50 +91,69 @@ func (o *options) apply(ctx context.Context, c *Client) error {
8591
return err
8692
}
8793
c.kssync = &syncer.KeyperSetSyncer{
88-
Client: client,
89-
Contract: c.KeyperSetManager,
90-
Log: c.log,
91-
StartBlock: o.syncStart,
92-
Handler: o.handlerKeyperSet,
94+
Client: client,
95+
Contract: c.KeyperSetManager,
96+
Log: c.log,
97+
StartBlock: o.syncStart,
98+
Handler: o.handlerKeyperSet,
99+
DisableEventWatcher: true,
93100
}
94101
if o.handlerKeyperSet != nil {
95102
c.services = append(c.services, c.kssync)
103+
syncedServices = append(syncedServices, c.kssync)
96104
}
97105

98106
c.KeyBroadcast, err = bindings.NewKeyBroadcastContract(*o.keyBroadcastContractAddress, client)
99107
if err != nil {
100108
return err
101109
}
102110
c.epksync = &syncer.EonPubKeySyncer{
103-
Client: client,
104-
Log: c.log,
105-
KeyBroadcast: c.KeyBroadcast,
106-
KeyperSetManager: c.KeyperSetManager,
107-
Handler: o.handlerEonPublicKey,
108-
StartBlock: o.syncStart,
111+
Client: client,
112+
Log: c.log,
113+
KeyBroadcast: c.KeyBroadcast,
114+
KeyperSetManager: c.KeyperSetManager,
115+
Handler: o.handlerEonPublicKey,
116+
StartBlock: o.syncStart,
117+
DisableEventWatcher: true,
109118
}
110119
if o.handlerEonPublicKey != nil {
111120
c.services = append(c.services, c.epksync)
121+
syncedServices = append(syncedServices, c.epksync)
112122
}
113123

114124
c.sssync = &syncer.ShutterStateSyncer{
115-
Client: client,
116-
Contract: c.KeyperSetManager,
117-
Log: c.log,
118-
Handler: o.handlerShutterState,
119-
StartBlock: o.syncStart,
125+
Client: client,
126+
Contract: c.KeyperSetManager,
127+
Log: c.log,
128+
Handler: o.handlerShutterState,
129+
StartBlock: o.syncStart,
130+
DisableEventWatcher: true,
120131
}
121132
if o.handlerShutterState != nil {
122133
c.services = append(c.services, c.sssync)
134+
syncedServices = append(syncedServices, c.sssync)
123135
}
124136

125-
if o.handlerBlock != nil {
126-
c.uhsync = &syncer.UnsafeHeadSyncer{
127-
Client: client,
128-
Log: c.log,
129-
Handler: o.handlerBlock,
137+
if o.handlerBlock == nil {
138+
// NOOP - but we need to run the UnsafeHeadSyncer.
139+
// This is to keep the inner workings consisten,
140+
// we use the DisableEventWatcher mechanism in combination
141+
// with the UnsafeHeadSyncer instead of the streaming
142+
// Watch... subscription on events.
143+
// TODO: think about allowing the streaming events,
144+
// when guaranteed event order (event1,event2,new-block-event)
145+
// is not required
146+
o.handlerBlock = func(ctx context.Context, lb *event.LatestBlock) error {
147+
return nil
130148
}
131149
}
150+
151+
c.uhsync = &syncer.UnsafeHeadSyncer{
152+
Client: client,
153+
Log: c.log,
154+
Handler: o.handlerBlock,
155+
SyncedHandler: syncedServices,
156+
}
132157
if o.handlerBlock != nil {
133158
c.services = append(c.services, c.uhsync)
134159
}
@@ -141,7 +166,7 @@ func defaultOptions() *options {
141166
keyperSetManagerAddress: &predeploy.KeyperSetManagerAddr,
142167
keyBroadcastContractAddress: &predeploy.KeyBroadcastContractAddr,
143168
clientURL: "",
144-
client: nil,
169+
ethClient: nil,
145170
logger: noopLogger,
146171
runner: nil,
147172
syncStart: number.NewBlockNumber(nil),
@@ -193,9 +218,9 @@ func WithLogger(l log.Logger) Option {
193218
}
194219
}
195220

196-
func WithClient(client syncclient.Client) Option {
221+
func WithClient(client syncclient.EthereumClient) Option {
197222
return func(o *options) error {
198-
o.client = client
223+
o.ethClient = client
199224
return nil
200225
}
201226
}

rolling-shutter/medley/chainsync/syncer/eonpubkey.go

Lines changed: 68 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ package syncer
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
76

87
"github.com/ethereum/go-ethereum/accounts/abi/bind"
98
"github.com/ethereum/go-ethereum/log"
9+
"github.com/pkg/errors"
1010
"github.com/shutter-network/shop-contracts/bindings"
1111

1212
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client"
@@ -15,18 +15,54 @@ import (
1515
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
1616
)
1717

18+
var _ ManualFilterHandler = &EonPubKeySyncer{}
19+
1820
type EonPubKeySyncer struct {
19-
Client client.Client
20-
Log log.Logger
21-
KeyBroadcast *bindings.KeyBroadcastContract
22-
KeyperSetManager *bindings.KeyperSetManager
23-
StartBlock *number.BlockNumber
24-
Handler event.EonPublicKeyHandler
21+
Client client.EthereumClient
22+
Log log.Logger
23+
KeyBroadcast *bindings.KeyBroadcastContract
24+
KeyperSetManager *bindings.KeyperSetManager
25+
StartBlock *number.BlockNumber
26+
Handler event.EonPublicKeyHandler
27+
DisableEventWatcher bool
2528

2629
keyBroadcastCh chan *bindings.KeyBroadcastContractEonKeyBroadcast
2730
}
2831

32+
func (s *EonPubKeySyncer) QueryAndHandle(ctx context.Context, block uint64) error {
33+
s.Log.Info(
34+
"pubsyncer query and handle called",
35+
"block",
36+
block,
37+
)
38+
opts := &bind.FilterOpts{
39+
Start: block,
40+
End: &block,
41+
Context: ctx,
42+
}
43+
iter, err := s.KeyBroadcast.FilterEonKeyBroadcast(opts)
44+
if err != nil {
45+
return err
46+
}
47+
defer iter.Close()
48+
49+
for iter.Next() {
50+
select {
51+
case s.keyBroadcastCh <- iter.Event:
52+
case <-ctx.Done():
53+
return ctx.Err()
54+
}
55+
}
56+
if err := iter.Error(); err != nil {
57+
return errors.Wrap(err, "filter iterator error")
58+
}
59+
return nil
60+
}
61+
2962
func (s *EonPubKeySyncer) Start(ctx context.Context, runner service.Runner) error {
63+
s.Log.Info(
64+
"pubsyncer loop started",
65+
)
3066
if s.Handler == nil {
3167
return errors.New("no handler registered")
3268
}
@@ -59,12 +95,14 @@ func (s *EonPubKeySyncer) Start(ctx context.Context, runner service.Runner) erro
5995
runner.Defer(func() {
6096
close(s.keyBroadcastCh)
6197
})
62-
subs, err := s.KeyBroadcast.WatchEonKeyBroadcast(watchOpts, s.keyBroadcastCh)
63-
// FIXME: what to do on subs.Error()
64-
if err != nil {
65-
return err
98+
if !s.DisableEventWatcher {
99+
subs, err := s.KeyBroadcast.WatchEonKeyBroadcast(watchOpts, s.keyBroadcastCh)
100+
// FIXME: what to do on subs.Error()
101+
if err != nil {
102+
return err
103+
}
104+
runner.Defer(subs.Unsubscribe)
66105
}
67-
runner.Defer(subs.Unsubscribe)
68106
runner.Go(func() error {
69107
return s.watchNewEonPubkey(ctx)
70108
})
@@ -88,17 +126,23 @@ func (s *EonPubKeySyncer) getInitialPubKeys(ctx context.Context) ([]*event.EonPu
88126
if err != nil {
89127
return nil, err
90128
}
91-
92129
initialPubKeys := []*event.EonPublicKey{}
130+
// NOTE: These are pubkeys that at the state of s.StartBlock
131+
// are known to the contracts.
132+
// That way we recreate older broadcast publickey events.
133+
// We are only interested for keys that belong to keyper-set
134+
// that are currently active or will become active in
135+
// the future:
93136
for i := activeEon; i < numKS; i++ {
94137
e, err := s.GetEonPubKeyForEon(ctx, opts, i)
95-
// FIXME: translate the error that there is no key
96-
// to a continue of the loop
97-
// (key not in mapping error, how can we catch that?)
98138
if err != nil {
99139
return nil, err
100140
}
101-
initialPubKeys = append(initialPubKeys, e)
141+
// if e == nil, this means the keyperset did not broadcast a
142+
// key (yet)
143+
if e != nil {
144+
initialPubKeys = append(initialPubKeys, e)
145+
}
102146
}
103147
return initialPubKeys, nil
104148
}
@@ -118,11 +162,14 @@ func (s *EonPubKeySyncer) GetEonPubKeyForEon(ctx context.Context, opts *bind.Cal
118162
return nil, err
119163
}
120164
key, err := s.KeyBroadcast.GetEonKey(opts, eon)
121-
// XXX: can the key be a null byte?
122-
// I think we rather get a index out of bounds error.
123165
if err != nil {
124166
return nil, err
125167
}
168+
// NOTE: Solidity returns the null value whenever
169+
// one tries to access a key in mapping that doesn't exist
170+
if len(key) == 0 {
171+
return nil, nil
172+
}
126173
return &event.EonPublicKey{
127174
Eon: eon,
128175
Key: key,
@@ -137,10 +184,11 @@ func (s *EonPubKeySyncer) watchNewEonPubkey(ctx context.Context) error {
137184
if !ok {
138185
return nil
139186
}
187+
pubk := newEonKey.Key
140188
bn := newEonKey.Raw.BlockNumber
141189
ev := &event.EonPublicKey{
142190
Eon: newEonKey.Eon,
143-
Key: newEonKey.Key,
191+
Key: pubk,
144192
AtBlockNumber: number.NewBlockNumber(&bn),
145193
}
146194
err := s.Handler(ctx, ev)

0 commit comments

Comments
 (0)