Skip to content

Commit c22dd18

Browse files
authored
Merge pull request #224 from meshplus/fix/rollback-ibtp
fix(*): recover pool index on pier restart
2 parents a61de47 + 80e31ab commit c22dd18

File tree

2 files changed

+25
-10
lines changed

2 files changed

+25
-10
lines changed

internal/exchanger/handler.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,16 @@ func (ex *Exchanger) applyReceipt(wIbtp *model.WrappedIBTP, entry logrus.FieldLo
8888

8989
func (ex *Exchanger) applyInterchain(wIbtp *model.WrappedIBTP, entry logrus.FieldLogger) {
9090
ibtp := wIbtp.Ibtp
91+
_, ok := ex.serviceMeta[ibtp.To]
92+
if !ok {
93+
ex.serviceMeta[ibtp.To] = &pb.Interchain{
94+
ID: ibtp.To,
95+
InterchainCounter: make(map[string]uint64),
96+
ReceiptCounter: make(map[string]uint64),
97+
SourceInterchainCounter: make(map[string]uint64),
98+
SourceReceiptCounter: make(map[string]uint64),
99+
}
100+
}
91101
index := ex.serviceMeta[ibtp.To].SourceInterchainCounter[ibtp.From]
92102
if index >= ibtp.Index {
93103
entry.Infof("Ignore ibtp, expected %d", index+1)

internal/exchanger/receipt.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,7 @@ import (
88
)
99

1010
func (ex *Exchanger) feedIBTPReceipt(receipt *model.WrappedIBTP) {
11-
var pool *Pool
12-
servicePair := fmt.Sprintf("%s-%s", receipt.Ibtp.From, receipt.Ibtp.To)
13-
act, loaded := ex.receipts.Load(servicePair)
14-
if !loaded {
15-
ex.logger.Infof("new pool for service pair %s at begin index %d", servicePair, ex.serviceMeta[receipt.Ibtp.From].ReceiptCounter[receipt.Ibtp.To]+1)
16-
pool = NewPool(ex.serviceMeta[receipt.Ibtp.From].ReceiptCounter[receipt.Ibtp.To] + 1)
17-
ex.receipts.Store(servicePair, pool)
18-
} else {
19-
pool = act.(*Pool)
20-
}
11+
pool, loaded := ex.loadPool(receipt.Ibtp.From, receipt.Ibtp.To)
2112
pool.feed(receipt)
2213

2314
if !loaded {
@@ -70,3 +61,17 @@ func (ex *Exchanger) feedIBTPReceipt(receipt *model.WrappedIBTP) {
7061
}(pool)
7162
}
7263
}
64+
65+
func (ex *Exchanger) loadPool(from, to string) (*Pool, bool) {
66+
servicePair := fmt.Sprintf("%s-%s", from, to)
67+
beginIndex := ex.serviceMeta[from].ReceiptCounter[to] + 1
68+
if ex.callbackMeta[servicePair]+1 > beginIndex {
69+
beginIndex = ex.callbackMeta[servicePair] + 1
70+
}
71+
72+
ex.logger.Infof("new pool for service pair %s at begin index %d", servicePair, beginIndex)
73+
pool := NewPool(beginIndex)
74+
act, loaded := ex.receipts.LoadOrStore(servicePair, pool)
75+
76+
return act.(*Pool), loaded
77+
}

0 commit comments

Comments
 (0)