Skip to content

Commit 2fd714a

Browse files
committed
fix(chainsync): sync all blocks from start to latest head
Before, the chainsync did not fetch all subscribed events from the provided 'WithSyncStartBlock' to the current latest head. This is now fixed with a mechanism where the chainsync is progressing a sync-head in tandem with the incoming updates for the latest head. The syncer will catch up with polling for all older blocks until the current latest head is reached.
1 parent ede0821 commit 2fd714a

File tree

6 files changed

+409
-443
lines changed

6 files changed

+409
-443
lines changed

rolling-shutter/medley/chainsync/options.go

Lines changed: 66 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -39,132 +39,119 @@ type options struct {
3939

4040
func (o *options) verify() error {
4141
if o.clientURL != "" && o.ethClient != nil {
42-
// TODO: error message
43-
return errors.New("can't use client and client url")
42+
return errors.New("'WithClient' and 'WithClientURL' options are mutually exclusive")
4443
}
4544
if o.clientURL == "" && o.ethClient == nil {
46-
// TODO: error message
47-
return errors.New("have to provide either url or client")
45+
return errors.New("either 'WithClient' or 'WithClientURL' options are expected")
4846
}
4947
// TODO: check for the existence of the contract addresses depending on
5048
// what handlers are not nil
5149
return nil
5250
}
5351

54-
// initialize the shutter client and apply the options.
55-
// the context is only the initialisation context,
56-
// and should not be considered to handle the lifecycle
57-
// of shutter clients background workers.
58-
func (o *options) apply(ctx context.Context, c *Client) error {
59-
var (
60-
client syncclient.EthereumClient
61-
err error
62-
)
63-
if o.clientURL != "" {
64-
o.ethClient, err = ethclient.DialContext(ctx, o.clientURL)
65-
if err != nil {
66-
return err
67-
}
68-
}
69-
client = o.ethClient
70-
71-
c.EthereumClient = client
72-
73-
if o.logger != nil {
74-
c.log = o.logger
75-
// NOCHECKIN:
76-
c.log.Info("got logger in options")
77-
}
78-
52+
func (o *options) applyHandler(c *Client) error {
53+
var err error
7954
syncedServices := []syncer.ManualFilterHandler{}
80-
// the nil passthrough will use "latest" for each call,
81-
// but we want to harmonize and fix the sync start to a specific block.
82-
if o.syncStart.IsLatest() {
83-
latestBlock, err := c.EthereumClient.BlockNumber(ctx)
84-
if err != nil {
85-
return errors.Wrap(err, "polling latest block")
86-
}
87-
o.syncStart = number.NewBlockNumber(&latestBlock)
88-
}
8955

90-
c.KeyperSetManager, err = bindings.NewKeyperSetManager(*o.keyperSetManagerAddress, client)
56+
c.KeyperSetManager, err = bindings.NewKeyperSetManager(*o.keyperSetManagerAddress, o.ethClient)
9157
if err != nil {
9258
return err
9359
}
9460
c.kssync = &syncer.KeyperSetSyncer{
95-
Client: client,
96-
Contract: c.KeyperSetManager,
97-
Log: c.log,
98-
StartBlock: o.syncStart,
99-
Handler: o.handlerKeyperSet,
100-
FetchActiveAtStartBlock: o.fetchActivesAtSyncStart,
101-
DisableEventWatcher: true,
61+
Client: o.ethClient,
62+
Contract: c.KeyperSetManager,
63+
Log: c.log,
64+
Handler: o.handlerKeyperSet,
10265
}
10366
if o.handlerKeyperSet != nil {
104-
c.services = append(c.services, c.kssync)
10567
syncedServices = append(syncedServices, c.kssync)
10668
}
10769

108-
c.KeyBroadcast, err = bindings.NewKeyBroadcastContract(*o.keyBroadcastContractAddress, client)
70+
c.KeyBroadcast, err = bindings.NewKeyBroadcastContract(*o.keyBroadcastContractAddress, o.ethClient)
10971
if err != nil {
11072
return err
11173
}
11274
c.epksync = &syncer.EonPubKeySyncer{
113-
Client: client,
114-
Log: c.log,
115-
KeyBroadcast: c.KeyBroadcast,
116-
KeyperSetManager: c.KeyperSetManager,
117-
Handler: o.handlerEonPublicKey,
118-
StartBlock: o.syncStart,
119-
FetchActiveAtStartBlock: o.fetchActivesAtSyncStart,
120-
DisableEventWatcher: true,
75+
Client: o.ethClient,
76+
Log: c.log,
77+
KeyBroadcast: c.KeyBroadcast,
78+
KeyperSetManager: c.KeyperSetManager,
79+
Handler: o.handlerEonPublicKey,
12180
}
12281
if o.handlerEonPublicKey != nil {
123-
c.services = append(c.services, c.epksync)
12482
syncedServices = append(syncedServices, c.epksync)
12583
}
126-
12784
c.sssync = &syncer.ShutterStateSyncer{
128-
Client: client,
129-
Contract: c.KeyperSetManager,
130-
Log: c.log,
131-
Handler: o.handlerShutterState,
132-
StartBlock: o.syncStart,
133-
FetchActiveAtStartBlock: o.fetchActivesAtSyncStart,
134-
DisableEventWatcher: true,
85+
Client: o.ethClient,
86+
Contract: c.KeyperSetManager,
87+
Log: c.log,
88+
Handler: o.handlerShutterState,
13589
}
13690
if o.handlerShutterState != nil {
137-
c.services = append(c.services, c.sssync)
13891
syncedServices = append(syncedServices, c.sssync)
13992
}
14093

14194
if o.handlerBlock == nil {
142-
// NOOP - but we need to run the UnsafeHeadSyncer.
143-
// This is to keep the inner workings consisten,
144-
// we use the DisableEventWatcher mechanism in combination
145-
// with the UnsafeHeadSyncer instead of the streaming
146-
// Watch... subscription on events.
147-
// TODO: think about allowing the streaming events,
148-
// when guaranteed event order (event1,event2,new-block-event)
149-
// is not required
95+
// Even if the user is not interested in handling new block events,
96+
// the streaming block handler must be running in order to
97+
// synchronize polling of new contract events.
98+
// Since the handler function is always called, we need to
99+
// inject a noop-handler
150100
o.handlerBlock = func(ctx context.Context, lb *event.LatestBlock) error {
151101
return nil
152102
}
153103
}
154104

155105
c.uhsync = &syncer.UnsafeHeadSyncer{
156-
Client: client,
157-
Log: c.log,
158-
Handler: o.handlerBlock,
159-
SyncedHandler: syncedServices,
106+
Client: o.ethClient,
107+
Log: c.log,
108+
Handler: o.handlerBlock,
109+
SyncedHandler: syncedServices,
110+
FetchActiveAtStart: o.fetchActivesAtSyncStart,
111+
SyncStartBlock: o.syncStart,
160112
}
161113
if o.handlerBlock != nil {
162114
c.services = append(c.services, c.uhsync)
163115
}
164-
c.privKey = o.privKey
165116
return nil
166117
}
167118

119+
// initialize the shutter client and apply the options.
120+
// the context is only the initialisation context,
121+
// and should not be considered to handle the lifecycle
122+
// of shutter clients background workers.
123+
func (o *options) apply(ctx context.Context, c *Client) error {
124+
var (
125+
client syncclient.EthereumClient
126+
err error
127+
)
128+
if o.clientURL != "" {
129+
o.ethClient, err = ethclient.DialContext(ctx, o.clientURL)
130+
if err != nil {
131+
return err
132+
}
133+
}
134+
client = o.ethClient
135+
c.EthereumClient = client
136+
137+
// the nil passthrough will use "latest" for each call,
138+
// but we want to harmonize and fix the sync start to a specific block.
139+
if o.syncStart.IsLatest() {
140+
latestBlock, err := c.EthereumClient.BlockNumber(ctx)
141+
if err != nil {
142+
return errors.Wrap(err, "polling latest block")
143+
}
144+
o.syncStart = number.NewBlockNumber(&latestBlock)
145+
}
146+
147+
if o.logger != nil {
148+
c.log = o.logger
149+
}
150+
151+
c.privKey = o.privKey
152+
return o.applyHandler(c)
153+
}
154+
168155
func defaultOptions() *options {
169156
return &options{
170157
keyperSetManagerAddress: &predeploy.KeyperSetManagerAddr,

rolling-shutter/medley/chainsync/syncer/eonpubkey.go

Lines changed: 34 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,16 @@ import (
1212
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client"
1313
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/event"
1414
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/number"
15-
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
1615
)
1716

1817
var _ ManualFilterHandler = &EonPubKeySyncer{}
1918

2019
type EonPubKeySyncer struct {
21-
Client client.EthereumClient
22-
Log log.Logger
23-
KeyBroadcast *bindings.KeyBroadcastContract
24-
KeyperSetManager *bindings.KeyperSetManager
25-
StartBlock *number.BlockNumber
26-
Handler event.EonPublicKeyHandler
27-
FetchActiveAtStartBlock bool
28-
DisableEventWatcher bool
29-
30-
keyBroadcastCh chan *bindings.KeyBroadcastContractEonKeyBroadcast
20+
Client client.EthereumClient
21+
Log log.Logger
22+
KeyBroadcast *bindings.KeyBroadcastContract
23+
KeyperSetManager *bindings.KeyperSetManager
24+
Handler event.EonPublicKeyHandler
3125
}
3226

3327
func (s *EonPubKeySyncer) QueryAndHandle(ctx context.Context, block uint64) error {
@@ -48,10 +42,13 @@ func (s *EonPubKeySyncer) QueryAndHandle(ctx context.Context, block uint64) erro
4842
defer iter.Close()
4943

5044
for iter.Next() {
51-
select {
52-
case s.keyBroadcastCh <- iter.Event:
53-
case <-ctx.Done():
54-
return ctx.Err()
45+
err := s.handle(ctx, iter.Event)
46+
if err != nil {
47+
s.Log.Error(
48+
"handler for `NewKeyperSet` errored",
49+
"error",
50+
err.Error(),
51+
)
5552
}
5653
}
5754
if err := iter.Error(); err != nil {
@@ -60,73 +57,49 @@ func (s *EonPubKeySyncer) QueryAndHandle(ctx context.Context, block uint64) erro
6057
return nil
6158
}
6259

63-
func (s *EonPubKeySyncer) Start(ctx context.Context, runner service.Runner) error {
64-
s.Log.Info(
65-
"pubsyncer loop started",
66-
)
67-
if s.Handler == nil {
68-
return errors.New("no handler registered")
69-
}
70-
// the latest block still has to be fixed.
71-
// otherwise we could skip some block events
72-
// between the initial poll and the subscription.
73-
if s.StartBlock.IsLatest() {
74-
latest, err := s.Client.BlockNumber(ctx)
75-
if err != nil {
76-
return err
77-
}
78-
s.StartBlock.SetUint64(latest)
79-
}
80-
81-
if s.FetchActiveAtStartBlock {
82-
pubKs, err := s.getInitialPubKeys(ctx)
83-
if err != nil {
84-
return err
85-
}
86-
for _, k := range pubKs {
87-
err := s.Handler(ctx, k)
88-
if err != nil {
89-
return err
90-
}
91-
}
60+
func (s *EonPubKeySyncer) handle(ctx context.Context, newEonKey *bindings.KeyBroadcastContractEonKeyBroadcast) error {
61+
pubk := newEonKey.Key
62+
bn := newEonKey.Raw.BlockNumber
63+
ev := &event.EonPublicKey{
64+
Eon: newEonKey.Eon,
65+
Key: pubk,
66+
AtBlockNumber: number.NewBlockNumber(&bn),
67+
}
68+
err := s.Handler(ctx, ev)
69+
if err != nil {
70+
return err
9271
}
72+
return nil
73+
}
9374

94-
watchOpts := &bind.WatchOpts{
95-
Start: s.StartBlock.ToUInt64Ptr(),
96-
Context: ctx,
75+
func (s *EonPubKeySyncer) HandleVirtualEvent(ctx context.Context, block *number.BlockNumber) error {
76+
pubKs, err := s.getInitialPubKeys(ctx, block)
77+
if err != nil {
78+
return err
9779
}
98-
s.keyBroadcastCh = make(chan *bindings.KeyBroadcastContractEonKeyBroadcast, channelSize)
99-
runner.Defer(func() {
100-
close(s.keyBroadcastCh)
101-
})
102-
if !s.DisableEventWatcher {
103-
subs, err := s.KeyBroadcast.WatchEonKeyBroadcast(watchOpts, s.keyBroadcastCh)
104-
// FIXME: what to do on subs.Error()
80+
for _, k := range pubKs {
81+
err := s.Handler(ctx, k)
10582
if err != nil {
10683
return err
10784
}
108-
runner.Defer(subs.Unsubscribe)
10985
}
110-
runner.Go(func() error {
111-
return s.watchNewEonPubkey(ctx)
112-
})
11386
return nil
11487
}
11588

116-
func (s *EonPubKeySyncer) getInitialPubKeys(ctx context.Context) ([]*event.EonPublicKey, error) {
89+
func (s *EonPubKeySyncer) getInitialPubKeys(ctx context.Context, block *number.BlockNumber) ([]*event.EonPublicKey, error) {
11790
// This blocknumber specifies AT what state
11891
// the contract is called
11992
opts := &bind.CallOpts{
12093
Context: ctx,
121-
BlockNumber: s.StartBlock.Int,
94+
BlockNumber: block.Int,
12295
}
12396
numKS, err := s.KeyperSetManager.GetNumKeyperSets(opts)
12497
if err != nil {
12598
return nil, err
12699
}
127100
// this blocknumber specifies the argument to the contract
128101
// getter
129-
activeEon, err := s.KeyperSetManager.GetKeyperSetIndexByBlock(opts, s.StartBlock.Uint64())
102+
activeEon, err := s.KeyperSetManager.GetKeyperSetIndexByBlock(opts, block.Uint64())
130103
if err != nil {
131104
return nil, err
132105
}
@@ -180,31 +153,3 @@ func (s *EonPubKeySyncer) GetEonPubKeyForEon(ctx context.Context, opts *bind.Cal
180153
AtBlockNumber: number.BigToBlockNumber(opts.BlockNumber),
181154
}, nil
182155
}
183-
184-
func (s *EonPubKeySyncer) watchNewEonPubkey(ctx context.Context) error {
185-
for {
186-
select {
187-
case newEonKey, ok := <-s.keyBroadcastCh:
188-
if !ok {
189-
return nil
190-
}
191-
pubk := newEonKey.Key
192-
bn := newEonKey.Raw.BlockNumber
193-
ev := &event.EonPublicKey{
194-
Eon: newEonKey.Eon,
195-
Key: pubk,
196-
AtBlockNumber: number.NewBlockNumber(&bn),
197-
}
198-
err := s.Handler(ctx, ev)
199-
if err != nil {
200-
s.Log.Error(
201-
"handler for `NewKeyperSet` errored",
202-
"error",
203-
err.Error(),
204-
)
205-
}
206-
case <-ctx.Done():
207-
return ctx.Err()
208-
}
209-
}
210-
}

0 commit comments

Comments
 (0)