Skip to content

Commit 21315f5

Browse files
committed
feat: add new chainsync mechanism
1 parent 0f09d21 commit 21315f5

File tree

22 files changed

+2104
-1016
lines changed

22 files changed

+2104
-1016
lines changed
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
package chainsegment
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"math/big"
8+
9+
"github.com/ethereum/go-ethereum/common"
10+
"github.com/ethereum/go-ethereum/core/types"
11+
12+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client"
13+
)
14+
15+
const MaxNumPollBlocks = 50
16+
17+
var (
18+
ErrReorg = errors.New("detected reorg in updated chain-segment")
19+
ErrEmpty = errors.New("empty chain-segment")
20+
ErrUpdateBlockTooFarInPast = errors.New("the updated block reaches too far in the past for the chain-segment")
21+
ErrOverlapTooBig = errors.New("chain-segment overlap too big")
22+
)
23+
24+
type UpdateLatestResult struct {
25+
// the full new segment with the reorg applied
26+
FullSegment *ChainSegment
27+
// the removed segment that is not part of the new full segment anymore
28+
// (reorged blocks)
29+
RemovedSegment *ChainSegment
30+
// the updated segment of new blocks that were not part of the old chain
31+
// (new blocks including the replacement blocks from a reorg)
32+
UpdatedSegment *ChainSegment
33+
}
34+
35+
func capNumPollBlocks(num int) int {
36+
if num > MaxNumPollBlocks {
37+
return MaxNumPollBlocks
38+
} else if num < 1 {
39+
return 1
40+
}
41+
return num
42+
}
43+
44+
type ChainSegment struct {
45+
chain []*types.Header
46+
}
47+
48+
// assumes reverse order, and that it is
49+
// a connected chain-segment.
50+
func NewChainSegment(chain ...*types.Header) *ChainSegment {
51+
bc := &ChainSegment{
52+
chain: chain,
53+
}
54+
return bc
55+
}
56+
57+
func (bc *ChainSegment) GetHeaderByHash(h common.Hash) *types.Header {
58+
// OPTIM: this should be implemented more efficiently
59+
// with a hash-map
60+
for _, header := range bc.chain {
61+
if header.Hash().Cmp(h) == 0 {
62+
return header
63+
}
64+
}
65+
return nil
66+
}
67+
68+
func (bc *ChainSegment) Len() int {
69+
return len(bc.chain)
70+
}
71+
72+
func (bc *ChainSegment) Earliest() *types.Header {
73+
if len(bc.chain) == 0 {
74+
return nil
75+
}
76+
return bc.chain[0]
77+
}
78+
79+
func (bc *ChainSegment) Latest() *types.Header {
80+
if len(bc.chain) == 0 {
81+
return nil
82+
}
83+
return bc.chain[len(bc.chain)-1]
84+
}
85+
86+
func (bc *ChainSegment) Get() []*types.Header {
87+
return bc.chain
88+
}
89+
90+
func (bc *ChainSegment) Copy() *ChainSegment {
91+
return NewChainSegment(bc.chain...)
92+
}
93+
94+
func (bc *ChainSegment) UpdateLatest(ctx context.Context, c client.Sync, update *ChainSegment) (UpdateLatestResult, error) {
95+
update = update.Copy()
96+
if bc.Len() == 0 {
97+
return UpdateLatestResult{}, ErrEmpty
98+
}
99+
100+
if bc.Earliest().Number.Cmp(update.Earliest().Number) == 1 {
101+
// We don't reach so far in the past.
102+
// This only happens when the cache of used blocks in
103+
// doesn't reach so far.
104+
return UpdateLatestResult{}, fmt.Errorf(
105+
"segment earliest=%d, update earliest=%d: %w",
106+
bc.Earliest().Number.Int64(), update.Earliest().Number.Int64(),
107+
ErrUpdateBlockTooFarInPast,
108+
)
109+
}
110+
overlapBig := new(big.Int).Add(
111+
new(big.Int).Sub(bc.Latest().Number, update.Earliest().Number),
112+
// both being the same height means one block overlap, so add 1
113+
big.NewInt(1),
114+
)
115+
if !overlapBig.IsInt64() {
116+
// this should never happen, this would be too large of a gap
117+
return UpdateLatestResult{}, ErrOverlapTooBig
118+
}
119+
120+
overlap := int(overlapBig.Int64())
121+
if overlap < 0 {
122+
// overlap is negative, this means we have a gap:
123+
extendedUpdate, err := update.ExtendLeft(ctx, c, capNumPollBlocks(-overlap))
124+
if err != nil {
125+
return UpdateLatestResult{}, fmt.Errorf("failed to extend left gap: %w", err)
126+
}
127+
return bc.UpdateLatest(ctx, c, extendedUpdate)
128+
} else if overlap == 0 {
129+
if update.Earliest().ParentHash.Cmp(bc.Latest().Hash()) == 0 {
130+
// the new segment extends the old one perfectly
131+
return UpdateLatestResult{
132+
FullSegment: bc.Copy().AddRight(update),
133+
RemovedSegment: nil,
134+
UpdatedSegment: update,
135+
}, nil
136+
}
137+
// the block-numbers align, but the new segment
138+
// seems to be from a reorg that branches off within the old segment
139+
_, err := update.ExtendLeft(ctx, c, capNumPollBlocks(bc.Len()))
140+
if err != nil {
141+
return UpdateLatestResult{}, fmt.Errorf("failed to extend into reorg: %w", err)
142+
}
143+
return bc.UpdateLatest(ctx, c, update)
144+
}
145+
// implicit case - overlap > 0:
146+
removed, updated := bc.GetLatest(overlap).DiffLeftAligned(update)
147+
full := bc
148+
if removed != nil {
149+
full = full.GetEarliest(full.Len() - removed.Len())
150+
}
151+
if updated != nil {
152+
full.AddRight(updated)
153+
}
154+
return UpdateLatestResult{
155+
FullSegment: full,
156+
RemovedSegment: removed,
157+
UpdatedSegment: updated,
158+
}, nil
159+
}
160+
161+
func (bc *ChainSegment) AddRight(add *ChainSegment) *ChainSegment {
162+
bc.chain = append(bc.chain, add.chain...)
163+
return bc
164+
}
165+
166+
func (bc *ChainSegment) DiffLeftAligned(other *ChainSegment) (remove, update *ChainSegment) {
167+
// 1) assumes both segments start at the same block height (earliest block at index 0 with same blocknum)
168+
// 2) assumes the other.Len() >= bc.Len()
169+
170+
// Compare the two and see if we have to reorg based on the hashes
171+
removed := []*types.Header{}
172+
updated := []*types.Header{}
173+
oldChain := bc.Get()
174+
newChain := other.Get()
175+
176+
for i := 0; i < len(newChain); i++ {
177+
var oldHeader *types.Header
178+
newHeader := newChain[i]
179+
if len(oldChain) > i {
180+
oldHeader = oldChain[i]
181+
}
182+
if oldHeader == nil {
183+
updated = append(updated, newHeader)
184+
// XXX: is just checking the hash sufficient?
185+
// sanity check also the blocknum + parent hash chain?
186+
} else if oldHeader.Hash().Cmp(newHeader.Hash()) != 0 {
187+
removed = append(removed, oldHeader)
188+
updated = append(updated, newHeader)
189+
}
190+
}
191+
var removedSegment, updatedSegment *ChainSegment
192+
if len(removed) > 0 {
193+
removedSegment = NewChainSegment(removed...)
194+
}
195+
if len(updated) > 0 {
196+
updatedSegment = NewChainSegment(updated...)
197+
}
198+
return removedSegment, updatedSegment
199+
}
200+
201+
// GetLatest retrieves the "n" latest blocks from this
202+
// ChainSegment.
203+
// If the segment is shorter than n, the whole segment gets returned.
204+
func (bc *ChainSegment) GetLatest(n int) *ChainSegment {
205+
if n > bc.Len() {
206+
n = bc.Len()
207+
}
208+
return NewChainSegment(bc.chain[len(bc.chain)-n : len(bc.chain)]...)
209+
}
210+
211+
// GetLatest retrieves the "n" earliest blocks from this
212+
// ChainSegment.
213+
// If the segment is shorter than n, the whole segment gets returned.
214+
func (bc *ChainSegment) GetEarliest(n int) *ChainSegment {
215+
if n > bc.Len() {
216+
n = bc.Len()
217+
}
218+
return NewChainSegment(bc.chain[:n]...)
219+
}
220+
221+
func (bc *ChainSegment) NewSegmentRight(ctx context.Context, c client.Sync, num int) (*ChainSegment, error) {
222+
rightMost := bc.Latest()
223+
if rightMost == nil {
224+
return nil, ErrEmpty
225+
}
226+
chain := []*types.Header{}
227+
previous := rightMost
228+
for i := 1; i <= num; i++ {
229+
blockNum := new(big.Int).Sub(rightMost.Number, big.NewInt(int64(i)))
230+
h, err := c.HeaderByNumber(ctx, blockNum)
231+
if err != nil {
232+
return nil, err
233+
}
234+
if h.Hash().Cmp(previous.ParentHash) != 0 {
235+
// the server has a different chain state than this segment,
236+
// so it is part of a reorged away chain-segment
237+
return nil, ErrReorg
238+
}
239+
chain = append(chain, h)
240+
previous = h
241+
}
242+
return NewChainSegment(chain...), nil
243+
}
244+
245+
func (bc *ChainSegment) ExtendLeft(ctx context.Context, c client.Sync, num int) (*ChainSegment, error) {
246+
leftMost := bc.Earliest()
247+
if leftMost == nil {
248+
return nil, ErrEmpty
249+
}
250+
for num > 0 {
251+
blockNum := new(big.Int).Sub(leftMost.Number, big.NewInt(int64(1)))
252+
//OPTIM: we do cap the max poll number when calling this method,
253+
// but then we make one request per block anyways.
254+
// This doesn't make sense, but there currently is no batching
255+
// for retrieving ranges of headers.
256+
h, err := c.HeaderByNumber(ctx, blockNum)
257+
if err != nil {
258+
return nil, fmt.Errorf("failed to retrieve header by number (#%d): %w", blockNum.Uint64(), err)
259+
}
260+
if h.Hash().Cmp(leftMost.ParentHash) != 0 {
261+
// The server has a different chain state than this segment,
262+
// so it is part of a reorged away chain-segment.
263+
// This can also happen when the server reorged during this loop
264+
// and we now polled the parent with an unexpected hash.
265+
return nil, ErrReorg
266+
}
267+
bc.chain = append([]*types.Header{h}, bc.chain...)
268+
leftMost = h
269+
num--
270+
}
271+
return bc, nil
272+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package chainsync
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/ethereum/go-ethereum/common"
8+
"github.com/ethereum/go-ethereum/core/types"
9+
10+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/syncer"
11+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
12+
)
13+
14+
type Chainsync struct {
15+
options *options
16+
fetcher *syncer.Fetcher
17+
}
18+
19+
func New(options ...Option) (*Chainsync, error) {
20+
opts := defaultOptions()
21+
for _, o := range options {
22+
if err := o(opts); err != nil {
23+
return nil, fmt.Errorf("error applying option to Chainsync: %w", err)
24+
}
25+
}
26+
27+
if err := opts.verify(); err != nil {
28+
return nil, fmt.Errorf("error verifying options to Chainsync: %w", err)
29+
}
30+
return &Chainsync{
31+
options: opts,
32+
}, nil
33+
}
34+
35+
func (c *Chainsync) Start(ctx context.Context, runner service.Runner) error {
36+
var err error
37+
c.fetcher, err = c.options.initFetcher(ctx)
38+
if err != nil {
39+
return fmt.Errorf("error initializing Chainsync: %w", err)
40+
}
41+
return c.fetcher.Start(ctx, runner)
42+
}
43+
44+
func (c *Chainsync) GetHeaderByHash(ctx context.Context, h common.Hash) (*types.Header, error) {
45+
return c.fetcher.GetHeaderByHash(ctx, h)
46+
}

0 commit comments

Comments
 (0)