Skip to content

Commit 9c13274

Browse files
committed
fix(*): rollback when target chain is not available
1 parent 8ad5cfc commit 9c13274

File tree

25 files changed

+1075
-406
lines changed

25 files changed

+1075
-406
lines changed

go.mod

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ go 1.13
44

55
require (
66
github.com/Rican7/retry v0.1.0
7-
github.com/btcsuite/btcd v0.20.1-beta
7+
github.com/btcsuite/btcd v0.21.0-beta
88
github.com/cbergoon/merkletree v0.2.0
99
github.com/fatih/color v1.9.0
10-
github.com/fsnotify/fsnotify v1.4.7
10+
github.com/fsnotify/fsnotify v1.4.9
1111
github.com/gin-gonic/gin v1.6.3
1212
github.com/gobuffalo/packd v1.0.0
1313
github.com/gobuffalo/packr v1.30.1
@@ -18,18 +18,16 @@ require (
1818
github.com/ipfs/go-cid v0.0.7
1919
github.com/lestrrat-go/strftime v1.0.3 // indirect
2020
github.com/libp2p/go-libp2p-core v0.6.1
21-
github.com/meshplus/bitxhub-core v0.1.0-rc1.0.20210330035001-b327cf056572
21+
github.com/meshplus/bitxhub-core v0.1.0-rc1.0.20210514085603-7495e962da7b
2222
github.com/meshplus/bitxhub-kit v1.1.2-0.20201203072410-8a0383a6870d
23-
github.com/meshplus/bitxhub-model v1.1.2-0.20210409071219-0526019e06c4
23+
github.com/meshplus/bitxhub-model v1.1.2-0.20210513074749-d31e04a9f41d
2424
github.com/meshplus/go-bitxhub-client v1.0.0-rc4.0.20210416022059-22729ce4c0f2
2525
github.com/meshplus/go-lightp2p v0.0.0-20200817105923-6b3aee40fa54
2626
github.com/mitchellh/go-homedir v1.1.0
2727
github.com/multiformats/go-multiaddr v0.3.0
2828
github.com/sirupsen/logrus v1.6.0
29-
github.com/spf13/pflag v1.0.5 // indirect
30-
github.com/spf13/viper v1.6.1
31-
github.com/stretchr/objx v0.2.0 // indirect
32-
github.com/stretchr/testify v1.6.0
29+
github.com/spf13/viper v1.7.0
30+
github.com/stretchr/testify v1.7.0
3331
github.com/tidwall/gjson v1.6.8
3432
github.com/urfave/cli v1.22.1
3533
github.com/wonderivan/logger v1.0.0

go.sum

Lines changed: 293 additions & 13 deletions
Large diffs are not rendered by default.

internal/agent/client.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,14 @@ func (client *BxhClient) CommitCallback(ibtp *pb.IBTP) error {
159159
return nil
160160
}
161161

162+
func (client *BxhClient) RollbackIBTP(ibtp *pb.IBTP, isSrcChain bool) (*pb.RollbackIBTPResponse, error) {
163+
return nil, nil
164+
}
165+
166+
func (client *BxhClient) IncreaseInMeta(ibtp *pb.IBTP) (*pb.IBTP, error) {
167+
return nil, nil
168+
}
169+
162170
func (client *BxhClient) GetReceipt(ibtp *pb.IBTP) (*pb.IBTP, error) {
163171
return nil, nil
164172
}

internal/exchanger/direct_handler.go

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,52 +6,54 @@ import (
66
"time"
77

88
"github.com/meshplus/bitxhub-model/pb"
9+
"github.com/meshplus/pier/pkg/model"
910
"github.com/sirupsen/logrus"
1011
)
1112

1213
type Pool struct {
1314
ibtps *sync.Map
14-
ch chan *pb.IBTP
15+
ch chan *model.WrappedIBTP
1516
}
1617

1718
func NewPool() *Pool {
1819
return &Pool{
1920
ibtps: &sync.Map{},
20-
ch: make(chan *pb.IBTP, 40960),
21+
ch: make(chan *model.WrappedIBTP, 40960),
2122
}
2223
}
2324

24-
func (pool *Pool) feed(ibtp *pb.IBTP) {
25+
func (pool *Pool) feed(ibtp *model.WrappedIBTP) {
2526
pool.ch <- ibtp
2627
}
2728

28-
func (pool *Pool) put(ibtp *pb.IBTP) {
29-
pool.ibtps.Store(ibtp.Index, ibtp)
29+
func (pool *Pool) put(ibtp *model.WrappedIBTP) {
30+
pool.ibtps.Store(ibtp.Ibtp.Index, ibtp)
3031
}
3132

3233
func (pool *Pool) delete(idx uint64) {
3334
pool.ibtps.Delete(idx)
3435
}
3536

36-
func (pool *Pool) get(index uint64) *pb.IBTP {
37+
func (pool *Pool) get(index uint64) *model.WrappedIBTP {
3738
ibtp, ok := pool.ibtps.Load(index)
3839
if !ok {
3940
return nil
4041
}
4142

42-
return ibtp.(*pb.IBTP)
43+
return ibtp.(*model.WrappedIBTP)
4344
}
4445

45-
func (ex *Exchanger) feedIBTP(ibtp *pb.IBTP) {
46+
func (ex *Exchanger) feedIBTP(wIbtp *model.WrappedIBTP) {
4647
var pool *Pool
48+
ibtp := wIbtp.Ibtp
4749
act, loaded := ex.ibtps.Load(ibtp.From)
4850
if !loaded {
4951
pool = NewPool()
5052
ex.ibtps.Store(ibtp.From, pool)
5153
} else {
5254
pool = act.(*Pool)
5355
}
54-
pool.feed(ibtp)
56+
pool.feed(wIbtp)
5557

5658
if !loaded {
5759
go func(pool *Pool) {
@@ -61,39 +63,40 @@ func (ex *Exchanger) feedIBTP(ibtp *pb.IBTP) {
6163
}
6264
}()
6365
inMeta := ex.exec.QueryInterchainMeta()
64-
for ibtp := range pool.ch {
66+
for wIbtp := range pool.ch {
67+
ibtp := wIbtp.Ibtp
6568
idx := inMeta[ibtp.From]
6669
if ibtp.Index <= idx {
6770
pool.delete(ibtp.Index)
6871
ex.logger.Warnf("ignore ibtp with invalid index: %d", ibtp.Index)
6972
continue
7073
}
7174
if idx+1 == ibtp.Index {
72-
ex.processIBTP(ibtp)
75+
ex.processIBTP(wIbtp)
7376
pool.delete(ibtp.Index)
7477
index := ibtp.Index + 1
75-
ibtp := pool.get(index)
76-
for ibtp != nil {
77-
ex.processIBTP(ibtp)
78-
pool.delete(ibtp.Index)
78+
wIbtp := pool.get(index)
79+
for wIbtp != nil {
80+
ex.processIBTP(wIbtp)
81+
pool.delete(wIbtp.Ibtp.Index)
7982
index++
80-
ibtp = pool.get(index)
83+
wIbtp = pool.get(index)
8184
}
8285
} else {
83-
pool.put(ibtp)
86+
pool.put(wIbtp)
8487
}
8588
}
8689
}(pool)
8790
}
8891
}
8992

90-
func (ex *Exchanger) processIBTP(ibtp *pb.IBTP) {
91-
receipt, err := ex.exec.ExecuteIBTP(ibtp)
93+
func (ex *Exchanger) processIBTP(wIbtp *model.WrappedIBTP) {
94+
receipt, err := ex.exec.ExecuteIBTP(wIbtp)
9295
if err != nil {
9396
ex.logger.Errorf("Execute ibtp error: %s", err.Error())
9497
return
9598
}
96-
ex.postHandleIBTP(ibtp.From, receipt)
99+
ex.postHandleIBTP(wIbtp.Ibtp.From, receipt)
97100
ex.sendIBTPCounter.Inc()
98101
}
99102

@@ -106,7 +109,7 @@ func (ex *Exchanger) feedReceipt(receipt *pb.IBTP) {
106109
} else {
107110
pool = act.(*Pool)
108111
}
109-
pool.feed(receipt)
112+
pool.feed(&model.WrappedIBTP{Ibtp: receipt, IsValid: true})
110113

111114
if !loaded {
112115
go func(pool *Pool) {
@@ -116,26 +119,28 @@ func (ex *Exchanger) feedReceipt(receipt *pb.IBTP) {
116119
}
117120
}()
118121
callbackMeta := ex.exec.QueryCallbackMeta()
119-
for ibtp := range pool.ch {
122+
for wIbtp := range pool.ch {
123+
ibtp := wIbtp.Ibtp
120124
if ibtp.Index <= callbackMeta[ibtp.To] {
121125
pool.delete(ibtp.Index)
122126
ex.logger.Warn("ignore ibtp with invalid index")
123127
continue
124128
}
125129
if callbackMeta[ibtp.To]+1 == ibtp.Index {
126-
ex.processIBTP(ibtp)
130+
ex.processIBTP(wIbtp)
127131
pool.delete(ibtp.Index)
128132
index := ibtp.Index + 1
129-
ibtp := pool.get(index)
130-
for ibtp != nil {
131-
receipt, _ := ex.exec.ExecuteIBTP(ibtp)
133+
wIbtp := pool.get(index)
134+
for wIbtp != nil {
135+
ibtp := wIbtp.Ibtp
136+
receipt, _ := ex.exec.ExecuteIBTP(wIbtp)
132137
ex.postHandleIBTP(ibtp.From, receipt)
133138
pool.delete(ibtp.Index)
134139
index++
135-
ibtp = pool.get(index)
140+
wIbtp = pool.get(index)
136141
}
137142
} else {
138-
pool.put(ibtp)
143+
pool.put(wIbtp)
139144
}
140145
}
141146
}(pool)

internal/exchanger/exchanger.go

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"sync"
88
"time"
99

10+
"github.com/Rican7/retry/backoff"
11+
1012
"github.com/Rican7/retry"
1113
"github.com/Rican7/retry/strategy"
1214
"github.com/meshplus/bitxhub-kit/storage"
@@ -137,6 +139,9 @@ func (ex *Exchanger) startWithDirectMode() error {
137139
}
138140

139141
func (ex *Exchanger) startWithRelayMode() error {
142+
if err := ex.syncer.RegisterRollbackHandler(ex.handleRollback); err != nil {
143+
return fmt.Errorf("register router handler: %w", err)
144+
}
140145
// syncer should be started first in case to recover ibtp from monitor
141146
if err := ex.syncer.Start(); err != nil {
142147
return fmt.Errorf("syncer start: %w", err)
@@ -204,10 +209,26 @@ func (ex *Exchanger) listenAndSendIBTPFromMnt() {
204209
}
205210
}
206211

207-
ex.interchainCounter[ibtp.To] = ibtp.Index
208-
209-
if err := ex.sendIBTP(ibtp); err != nil {
210-
ex.logger.Infof("Send ibtp: %s", err.Error())
212+
if err := retry.Retry(func(attempt uint) error {
213+
if err := ex.sendIBTP(ibtp); err != nil {
214+
ex.logger.Errorf("Send ibtp: %s", err.Error())
215+
// if err occurs, try to get new ibtp and resend
216+
if err := retry.Retry(func(attempt uint) error {
217+
ibtp, err = ex.mnt.QueryIBTP(ibtp.ID())
218+
if err != nil {
219+
ex.logger.Errorf("Query ibtp %s from appchain: %s", ibtp.ID(), err.Error())
220+
return err
221+
}
222+
return nil
223+
}, strategy.Backoff(backoff.Fibonacci(500*time.Millisecond))); err != nil {
224+
ex.logger.Panic(err)
225+
}
226+
return fmt.Errorf("retry sending ibtp")
227+
}
228+
ex.interchainCounter[ibtp.To] = ibtp.Index
229+
return nil
230+
}, strategy.Backoff(backoff.Fibonacci(500*time.Millisecond))); err != nil {
231+
ex.logger.Panic(err)
211232
}
212233
}
213234
}
@@ -219,18 +240,18 @@ func (ex *Exchanger) listenAndSendIBTPFromSyncer() {
219240
select {
220241
case <-ex.ctx.Done():
221242
return
222-
case ibtp, ok := <-ch:
243+
case wIbtp, ok := <-ch:
223244
if !ok {
224245
ex.logger.Warn("Unexpected closed channel while listening on interchain ibtp")
225246
return
226247
}
227-
entry := ex.logger.WithFields(logrus.Fields{"type": ibtp.Type, "id": ibtp.ID()})
228-
switch ibtp.Type {
248+
entry := ex.logger.WithFields(logrus.Fields{"type": wIbtp.Ibtp.Type, "id": wIbtp.Ibtp.ID()})
249+
switch wIbtp.Ibtp.Type {
229250
case pb.IBTP_INTERCHAIN, pb.IBTP_ASSET_EXCHANGE_INIT,
230251
pb.IBTP_ASSET_EXCHANGE_REDEEM, pb.IBTP_ASSET_EXCHANGE_REFUND:
231-
ex.applyInterchain(ibtp, entry)
252+
ex.applyInterchain(wIbtp, entry)
232253
case pb.IBTP_RECEIPT_SUCCESS, pb.IBTP_RECEIPT_FAILURE, pb.IBTP_ASSET_EXCHANGE_RECEIPT:
233-
ex.applyReceipt(ibtp, entry)
254+
ex.applyReceipt(wIbtp)
234255
default:
235256
entry.Errorf("wrong type of ibtp")
236257
}
@@ -279,7 +300,7 @@ func (ex *Exchanger) sendIBTP(ibtp *pb.IBTP) error {
279300
case repo.RelayMode:
280301
err := ex.syncer.SendIBTP(ibtp)
281302
if err != nil {
282-
entry.Errorf("send ibtp to bitxhub: %s", err.Error())
303+
entry.Errorf("Send ibtp to bitxhub: %s", err.Error())
283304
return fmt.Errorf("send ibtp to bitxhub: %s", err.Error())
284305
}
285306
case repo.DirectMode:
@@ -312,45 +333,48 @@ func (ex *Exchanger) sendIBTP(ibtp *pb.IBTP) error {
312333
return nil
313334
}
314335

315-
func (ex *Exchanger) queryIBTP(from string, idx uint64) (*pb.IBTP, error) {
316-
ibtp := &pb.IBTP{}
317-
id := fmt.Sprintf("%s-%s-%d", from, ex.pierID, idx)
318-
336+
func (ex *Exchanger) queryIBTP(id, target string) (*pb.IBTP, bool, error) {
337+
verifiedTx := &pb.VerifiedTx{}
319338
v := ex.store.Get(model.IBTPKey(id))
320339
if v != nil {
321-
if err := ibtp.Unmarshal(v); err != nil {
322-
return nil, err
340+
if err := verifiedTx.Unmarshal(v); err != nil {
341+
return nil, false, err
323342
}
324-
return ibtp, nil
343+
return verifiedTx.Tx.GetIBTP(), verifiedTx.Valid, nil
325344
}
326345

327346
// query ibtp from counterpart chain
328-
var err error
347+
var (
348+
ibtp *pb.IBTP
349+
isValid bool
350+
err error
351+
)
329352
switch ex.mode {
330353
case repo.RelayMode:
331-
ibtp, err = ex.syncer.QueryIBTP(id)
354+
ibtp, isValid, err = ex.syncer.QueryIBTP(id)
332355
if err != nil {
333356
if errors.Is(err, syncer.ErrIBTPNotFound) {
334357
ex.logger.Panicf("query ibtp by id %s from bitxhub: %s", id, err.Error())
335358
}
336-
return nil, fmt.Errorf("query ibtp from bitxhub: %s", err.Error())
359+
return nil, false, fmt.Errorf("query ibtp from bitxhub: %s", err.Error())
337360
}
338361
case repo.DirectMode:
339362
// query ibtp from another pier
340363
msg := peermgr.Message(peerMsg.Message_IBTP_GET, true, []byte(id))
341-
result, err := ex.peerMgr.Send(from, msg)
364+
result, err := ex.peerMgr.Send(target, msg)
342365
if err != nil {
343-
return nil, err
366+
return nil, false, err
344367
}
345368

369+
ibtp := &pb.IBTP{}
346370
if err := ibtp.Unmarshal(result.Payload.Data); err != nil {
347-
return nil, err
371+
return nil, false, err
348372
}
349373
default:
350-
return nil, fmt.Errorf("unsupported pier mode")
374+
return nil, false, fmt.Errorf("unsupported pier mode")
351375
}
352376

353-
return ibtp, nil
377+
return ibtp, isValid, nil
354378
}
355379

356380
func copyCounterMap(original map[string]uint64) map[string]uint64 {

0 commit comments

Comments
 (0)