Skip to content

Commit 2643a0c

Browse files
committed
wip(chainsync): sync contract event handler with latest head
1 parent d6abdbb commit 2643a0c

File tree

7 files changed

+245
-57
lines changed

7 files changed

+245
-57
lines changed

rolling-shutter/keyperimpl/optimism/keyper.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,14 @@ func (kpr *Keyper) newEonPublicKey(ctx context.Context, pubKey keyper.EonPublicK
144144
log.Info().
145145
Uint64("eon", pubKey.Eon).
146146
Uint64("activation-block", pubKey.ActivationBlock).
147+
Bytes("pub-key", pubKey.PublicKey).
147148
Msg("new eon pk")
148149
// Currently all keypers call this and race to call this function first.
149150
// For now this is fine, but a keyper should only send a transaction if
150151
// the key is not set yet.
151152
// Best would be a coordinatated leader election who will broadcast the key.
153+
// FIXME: the syncer receives an empty key byte.
154+
// Is this already
152155
tx, err := kpr.l2Client.BroadcastEonKey(ctx, pubKey.Eon, pubKey.PublicKey)
153156
if err != nil {
154157
log.Error().Err(err).Msg("error broadcasting eon public key")

rolling-shutter/medley/chainsync/options.go

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func (o *options) apply(ctx context.Context, c *Client) error {
7070

7171
c.Client = client
7272

73+
syncedServices := []syncer.ManualFilterHandler{}
7374
// the nil passthrough will use "latest" for each call,
7475
// but we want to harmonize and fix the sync start to a specific block.
7576
if o.syncStart.IsLatest() {
@@ -85,50 +86,69 @@ func (o *options) apply(ctx context.Context, c *Client) error {
8586
return err
8687
}
8788
c.kssync = &syncer.KeyperSetSyncer{
88-
Client: client,
89-
Contract: c.KeyperSetManager,
90-
Log: c.log,
91-
StartBlock: o.syncStart,
92-
Handler: o.handlerKeyperSet,
89+
Client: client,
90+
Contract: c.KeyperSetManager,
91+
Log: c.log,
92+
StartBlock: o.syncStart,
93+
Handler: o.handlerKeyperSet,
94+
DisableEventWatcher: true,
9395
}
9496
if o.handlerKeyperSet != nil {
9597
c.services = append(c.services, c.kssync)
98+
syncedServices = append(syncedServices, c.kssync)
9699
}
97100

98101
c.KeyBroadcast, err = bindings.NewKeyBroadcastContract(*o.keyBroadcastContractAddress, client)
99102
if err != nil {
100103
return err
101104
}
102105
c.epksync = &syncer.EonPubKeySyncer{
103-
Client: client,
104-
Log: c.log,
105-
KeyBroadcast: c.KeyBroadcast,
106-
KeyperSetManager: c.KeyperSetManager,
107-
Handler: o.handlerEonPublicKey,
108-
StartBlock: o.syncStart,
106+
Client: client,
107+
Log: c.log,
108+
KeyBroadcast: c.KeyBroadcast,
109+
KeyperSetManager: c.KeyperSetManager,
110+
Handler: o.handlerEonPublicKey,
111+
StartBlock: o.syncStart,
112+
DisableEventWatcher: true,
109113
}
110114
if o.handlerEonPublicKey != nil {
111115
c.services = append(c.services, c.epksync)
116+
syncedServices = append(syncedServices, c.kssync)
112117
}
113118

114119
c.sssync = &syncer.ShutterStateSyncer{
115-
Client: client,
116-
Contract: c.KeyperSetManager,
117-
Log: c.log,
118-
Handler: o.handlerShutterState,
119-
StartBlock: o.syncStart,
120+
Client: client,
121+
Contract: c.KeyperSetManager,
122+
Log: c.log,
123+
Handler: o.handlerShutterState,
124+
StartBlock: o.syncStart,
125+
DisableEventWatcher: true,
120126
}
121127
if o.handlerShutterState != nil {
122128
c.services = append(c.services, c.sssync)
129+
syncedServices = append(syncedServices, c.kssync)
123130
}
124131

125-
if o.handlerBlock != nil {
126-
c.uhsync = &syncer.UnsafeHeadSyncer{
127-
Client: client,
128-
Log: c.log,
129-
Handler: o.handlerBlock,
132+
if o.handlerBlock == nil {
133+
// NOOP - but we need to run the UnsafeHeadSyncer.
134+
// This is to keep the inner workings consisten,
135+
// we use the DisableEventWatcher mechanism in combination
136+
// with the UnsafeHeadSyncer instead of the streaming
137+
// Watch... subscription on events.
138+
// TODO: think about allowing the streaming events,
139+
// when guaranteed event order (event1,event2,new-block-event)
140+
// is not required
141+
o.handlerBlock = func(ctx context.Context, lb *event.LatestBlock) error {
142+
return nil
130143
}
131144
}
145+
146+
c.uhsync = &syncer.UnsafeHeadSyncer{
147+
Client: client,
148+
Log: c.log,
149+
Handler: o.handlerBlock,
150+
SyncedHandler: syncedServices,
151+
}
132152
if o.handlerBlock != nil {
133153
c.services = append(c.services, c.uhsync)
134154
}

rolling-shutter/medley/chainsync/syncer/eonpubkey.go

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@ package syncer
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
6+
"math/big"
77

88
"github.com/ethereum/go-ethereum/accounts/abi/bind"
99
"github.com/ethereum/go-ethereum/log"
10+
"github.com/pkg/errors"
1011
"github.com/shutter-network/shop-contracts/bindings"
1112

1213
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client"
@@ -15,17 +16,45 @@ import (
1516
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
1617
)
1718

19+
var _ ManualFilterHandler = &EonPubKeySyncer{}
20+
1821
type EonPubKeySyncer struct {
19-
Client client.Client
20-
Log log.Logger
21-
KeyBroadcast *bindings.KeyBroadcastContract
22-
KeyperSetManager *bindings.KeyperSetManager
23-
StartBlock *number.BlockNumber
24-
Handler event.EonPublicKeyHandler
22+
Client client.Client
23+
Log log.Logger
24+
KeyBroadcast *bindings.KeyBroadcastContract
25+
KeyperSetManager *bindings.KeyperSetManager
26+
StartBlock *number.BlockNumber
27+
Handler event.EonPublicKeyHandler
28+
DisableEventWatcher bool
2529

2630
keyBroadcastCh chan *bindings.KeyBroadcastContractEonKeyBroadcast
2731
}
2832

33+
func (s *EonPubKeySyncer) QueryAndHandle(ctx context.Context, block uint64) error {
34+
opts := &bind.FilterOpts{
35+
Start: block,
36+
End: &block,
37+
Context: ctx,
38+
}
39+
iter, err := s.KeyBroadcast.FilterEonKeyBroadcast(opts)
40+
if err != nil {
41+
return err
42+
}
43+
defer iter.Close()
44+
45+
for iter.Next() {
46+
select {
47+
case s.keyBroadcastCh <- iter.Event:
48+
case <-ctx.Done():
49+
return ctx.Err()
50+
}
51+
}
52+
if err := iter.Error(); err != nil {
53+
return errors.Wrap(err, "filter iterator error")
54+
}
55+
return nil
56+
}
57+
2958
func (s *EonPubKeySyncer) Start(ctx context.Context, runner service.Runner) error {
3059
if s.Handler == nil {
3160
return errors.New("no handler registered")
@@ -59,12 +88,14 @@ func (s *EonPubKeySyncer) Start(ctx context.Context, runner service.Runner) erro
5988
runner.Defer(func() {
6089
close(s.keyBroadcastCh)
6190
})
62-
subs, err := s.KeyBroadcast.WatchEonKeyBroadcast(watchOpts, s.keyBroadcastCh)
63-
// FIXME: what to do on subs.Error()
64-
if err != nil {
65-
return err
91+
if !s.DisableEventWatcher {
92+
subs, err := s.KeyBroadcast.WatchEonKeyBroadcast(watchOpts, s.keyBroadcastCh)
93+
// FIXME: what to do on subs.Error()
94+
if err != nil {
95+
return err
96+
}
97+
runner.Defer(subs.Unsubscribe)
6698
}
67-
runner.Defer(subs.Unsubscribe)
6899
runner.Go(func() error {
69100
return s.watchNewEonPubkey(ctx)
70101
})
@@ -137,10 +168,29 @@ func (s *EonPubKeySyncer) watchNewEonPubkey(ctx context.Context) error {
137168
if !ok {
138169
return nil
139170
}
171+
// FIXME: this happens, why?
172+
if len(newEonKey.Key) == 0 {
173+
opts := &bind.CallOpts{
174+
Context: ctx,
175+
BlockNumber: new(big.Int).SetUint64(newEonKey.Raw.BlockNumber),
176+
}
177+
k, err := s.GetEonPubKeyForEon(ctx, opts, newEonKey.Eon)
178+
s.Log.Error(
179+
"extra call for GetEonPubKeyForEon errored",
180+
"error",
181+
err.Error(),
182+
)
183+
s.Log.Info(
184+
"retrieved eon pubkey by getter",
185+
"eon",
186+
k,
187+
)
188+
}
189+
pubk := newEonKey.Key
140190
bn := newEonKey.Raw.BlockNumber
141191
ev := &event.EonPublicKey{
142192
Eon: newEonKey.Eon,
143-
Key: newEonKey.Key,
193+
Key: pubk,
144194
AtBlockNumber: number.NewBlockNumber(&bn),
145195
}
146196
err := s.Handler(ctx, ev)

rolling-shutter/medley/chainsync/syncer/keyperset.go

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,52 @@ func makeCallError(attrName string, err error) error {
2121

2222
const channelSize = 10
2323

24+
var _ ManualFilterHandler = &KeyperSetSyncer{}
25+
2426
type KeyperSetSyncer struct {
2527
Client client.Client
2628
Contract *bindings.KeyperSetManager
2729
Log log.Logger
2830
StartBlock *number.BlockNumber
2931
Handler event.KeyperSetHandler
32+
// disable this when the QueryAndHandle manual polling should be used:
33+
DisableEventWatcher bool
3034

3135
keyperAddedCh chan *bindings.KeyperSetManagerKeyperSetAdded
3236
}
3337

38+
func (s *KeyperSetSyncer) QueryAndHandle(ctx context.Context, block uint64) error {
39+
opts := &bind.FilterOpts{
40+
// FIXME: does this work, or do we need index -1 or something?
41+
Start: block,
42+
End: &block,
43+
Context: ctx,
44+
}
45+
iter, err := s.Contract.FilterKeyperSetAdded(opts)
46+
// TODO: what errors possible?
47+
if err != nil {
48+
return err
49+
}
50+
defer iter.Close()
51+
52+
for iter.Next() {
53+
select {
54+
// XXX: this can be nil during the handlers startup.
55+
// As far as I understand, a nil channel is never selected.
56+
// Will it be selected as soon as the channel is not nil anymore?
57+
case s.keyperAddedCh <- iter.Event:
58+
case <-ctx.Done():
59+
return ctx.Err()
60+
}
61+
}
62+
// XXX: it looks like this is nil when the iterator is
63+
// exhausted without failures
64+
if err := iter.Error(); err != nil {
65+
return errors.Wrap(err, "filter iterator error")
66+
}
67+
return nil
68+
}
69+
3470
func (s *KeyperSetSyncer) Start(ctx context.Context, runner service.Runner) error {
3571
if s.Handler == nil {
3672
return errors.New("no handler registered")
@@ -69,12 +105,14 @@ func (s *KeyperSetSyncer) Start(ctx context.Context, runner service.Runner) erro
69105
runner.Defer(func() {
70106
close(s.keyperAddedCh)
71107
})
72-
subs, err := s.Contract.WatchKeyperSetAdded(watchOpts, s.keyperAddedCh)
73-
// FIXME: what to do on subs.Error()
74-
if err != nil {
75-
return err
108+
if !s.DisableEventWatcher {
109+
subs, err := s.Contract.WatchKeyperSetAdded(watchOpts, s.keyperAddedCh)
110+
// FIXME: what to do on subs.Error()
111+
if err != nil {
112+
return err
113+
}
114+
runner.Defer(subs.Unsubscribe)
76115
}
77-
runner.Defer(subs.Unsubscribe)
78116
runner.Go(func() error {
79117
return s.watchNewKeypersService(ctx)
80118
})

0 commit comments

Comments
 (0)