Skip to content

Commit 7eb8c9a

Browse files
authored
Merge pull request #204 from meshplus/fix/fix-direct-v1.6
fix(direct):fix start in direct mode
2 parents 8d397e8 + a0e1dd2 commit 7eb8c9a

File tree

7 files changed

+45
-64
lines changed

7 files changed

+45
-64
lines changed

api/server.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -159,23 +159,23 @@ func (g *Server) getAppchain(c *gin.Context) {
159159
return
160160
}
161161

162+
if !ackMsg.Payload.Ok {
163+
res.Data = ackMsg.Payload.Data
164+
c.JSON(http.StatusInternalServerError, res)
165+
return
166+
}
167+
162168
g.handleAckAppchain(c, ackMsg)
163169
}
164170

165171
func (g *Server) handleAckAppchain(c *gin.Context, msg *peerproto.Message) {
166-
app := appchainmgr.Appchain{}
167-
if err := json.Unmarshal(msg.Payload.Data, &app); err != nil {
168-
g.logger.Error(err)
169-
return
170-
}
171-
172172
res := &response{}
173173

174174
switch msg.Type {
175175
case peerproto.Message_APPCHAIN_REGISTER:
176-
res.Data = []byte(fmt.Sprintf("appchain register successfully, id is %s\n", app.ID))
176+
res.Data = []byte(fmt.Sprintf("appchain register successfully, appchain is %s\n", string(msg.Payload.Data)))
177177
case peerproto.Message_APPCHAIN_UPDATE:
178-
res.Data = []byte(fmt.Sprintf("appchain update successfully, id is %s\n", app.ID))
178+
res.Data = []byte(fmt.Sprintf("appchain update successfully, appchain is %s\n", string(msg.Payload.Data)))
179179
case peerproto.Message_APPCHAIN_GET:
180180
res.Data = msg.Payload.Data
181181
}

cmd/pier/client/appchain_pier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ func savePierAppchain(ctx *cli.Context, path string) error {
219219
validatorsPath := ctx.String("validators")
220220
consensusType := ctx.String("consensusType")
221221

222-
url, err := getURL(ctx, fmt.Sprintf("%s?pier_id=%s", path, pier))
222+
url, err := getURL(ctx, fmt.Sprintf("%s?pier_id=%s", GetAppchainUrl, pier))
223223
if err != nil {
224224
return err
225225
}

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ go 1.13
44

55
require (
66
github.com/Rican7/retry v0.1.0
7-
github.com/btcsuite/btcd v0.21.0-beta
7+
github.com/btcsuite/btcd v0.20.1-beta
88
github.com/cbergoon/merkletree v0.2.0
99
github.com/fatih/color v1.9.0
10-
github.com/fsnotify/fsnotify v1.4.9
10+
github.com/fsnotify/fsnotify v1.4.7
1111
github.com/gin-gonic/gin v1.6.3
1212
github.com/gobuffalo/packd v1.0.0
1313
github.com/gobuffalo/packr v1.30.1

internal/app/pier.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ func NewPier(repoRoot string, config *repo.Config) (*Pier, error) {
123123
if err != nil {
124124
return nil, fmt.Errorf("cryptor create: %w", err)
125125
}
126+
pierHAConstructor, err := agency.GetPierHAConstructor(config.HA.Mode)
127+
if err != nil {
128+
return nil, fmt.Errorf("pier ha constructor not found")
129+
}
130+
pierHA = pierHAConstructor(nil, addr.String())
126131

127132
meta = &pb.Interchain{}
128133
lite = &direct_lite.MockLite{}

internal/appchain/handle.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,8 @@ func (mgr *Manager) handleMessage(s network.Stream, msg *peerproto.Message) {
4040
mgr.logger.Error(err)
4141
}
4242

43-
appchainRes := &appchainmgr.Appchain{}
44-
if err := json.Unmarshal(res, appchainRes); err != nil {
45-
mgr.logger.Error(err)
46-
return
47-
}
48-
4943
mgr.logger.WithFields(logrus.Fields{
50-
"type": msg.Type,
51-
"from_id": appchainRes.ID,
52-
"name": appchainRes.Name,
53-
"desc": appchainRes.Desc,
54-
"chain_type": appchainRes.ChainType,
55-
"consensus_type": appchainRes.ConsensusType,
44+
"type": msg.Type,
45+
"from_id": string(res),
5646
}).Info("Handle appchain message")
5747
}

internal/exchanger/direct_handler.go

Lines changed: 26 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,8 @@ func (pool *Pool) get(index uint64) *model.WrappedIBTP {
4444
}
4545

4646
func (ex *Exchanger) feedIBTP(wIbtp *model.WrappedIBTP) {
47-
var pool *Pool
48-
ibtp := wIbtp.Ibtp
49-
act, loaded := ex.ibtps.Load(ibtp.From)
50-
if !loaded {
51-
pool = NewPool()
52-
ex.ibtps.Store(ibtp.From, pool)
53-
} else {
54-
pool = act.(*Pool)
55-
}
47+
act, loaded := ex.ibtps.LoadOrStore(wIbtp.Ibtp.From, NewPool())
48+
pool := act.(*Pool)
5649
pool.feed(wIbtp)
5750

5851
if !loaded {
@@ -63,53 +56,47 @@ func (ex *Exchanger) feedIBTP(wIbtp *model.WrappedIBTP) {
6356
}
6457
}()
6558
inMeta := ex.exec.QueryInterchainMeta()
59+
idx := inMeta[wIbtp.Ibtp.From]
6660
for wIbtp := range pool.ch {
67-
ibtp := wIbtp.Ibtp
68-
idx := inMeta[ibtp.From]
69-
if ibtp.Index <= idx {
70-
pool.delete(ibtp.Index)
71-
ex.logger.Warnf("ignore ibtp with invalid index: %d", ibtp.Index)
61+
if wIbtp.Ibtp.Index <= idx {
62+
pool.delete(wIbtp.Ibtp.Index)
63+
ex.logger.Warnf("ignore ibtp with invalid index: %d", wIbtp.Ibtp.Index)
7264
continue
7365
}
74-
if idx+1 == ibtp.Index {
75-
ex.processIBTP(wIbtp)
76-
pool.delete(ibtp.Index)
77-
index := ibtp.Index + 1
66+
if idx+1 == wIbtp.Ibtp.Index {
67+
ex.processIBTP(wIbtp, pool)
68+
index := wIbtp.Ibtp.Index + 1
7869
wIbtp := pool.get(index)
7970
for wIbtp != nil {
80-
ex.processIBTP(wIbtp)
81-
pool.delete(wIbtp.Ibtp.Index)
71+
ex.processIBTP(wIbtp, pool)
8272
index++
8373
wIbtp = pool.get(index)
8474
}
75+
idx = index - 1
8576
} else {
8677
pool.put(wIbtp)
8778
}
79+
8880
}
8981
}(pool)
9082
}
9183
}
9284

93-
func (ex *Exchanger) processIBTP(wIbtp *model.WrappedIBTP) {
85+
func (ex *Exchanger) processIBTP(wIbtp *model.WrappedIBTP, pool *Pool) {
9486
receipt, err := ex.exec.ExecuteIBTP(wIbtp)
9587
if err != nil {
9688
ex.logger.Errorf("Execute ibtp error: %s", err.Error())
9789
return
9890
}
9991
ex.postHandleIBTP(wIbtp.Ibtp.From, receipt)
10092
ex.sendIBTPCounter.Inc()
93+
pool.delete(wIbtp.Ibtp.Index)
10194
}
10295

103-
func (ex *Exchanger) feedReceipt(receipt *pb.IBTP) {
104-
var pool *Pool
105-
act, loaded := ex.ibtps.Load(receipt.To)
106-
if !loaded {
107-
pool = NewPool()
108-
ex.ibtps.Store(receipt.To, pool)
109-
} else {
110-
pool = act.(*Pool)
111-
}
112-
pool.feed(&model.WrappedIBTP{Ibtp: receipt, IsValid: true})
96+
func (ex *Exchanger) feedReceipt(ibtp *pb.IBTP) {
97+
act, loaded := ex.ibtps.LoadOrStore(ibtp.From, NewPool())
98+
pool := act.(*Pool)
99+
pool.feed(&model.WrappedIBTP{Ibtp: ibtp, IsValid: true})
113100

114101
if !loaded {
115102
go func(pool *Pool) {
@@ -119,26 +106,25 @@ func (ex *Exchanger) feedReceipt(receipt *pb.IBTP) {
119106
}
120107
}()
121108
callbackMeta := ex.exec.QueryCallbackMeta()
109+
idx := callbackMeta[ibtp.To]
122110
for wIbtp := range pool.ch {
123-
ibtp := wIbtp.Ibtp
124-
if ibtp.Index <= callbackMeta[ibtp.To] {
125-
pool.delete(ibtp.Index)
111+
if ibtp.Index <= idx {
112+
pool.delete(wIbtp.Ibtp.Index)
126113
ex.logger.Warn("ignore ibtp with invalid index")
127114
continue
128115
}
129-
if callbackMeta[ibtp.To]+1 == ibtp.Index {
130-
ex.processIBTP(wIbtp)
131-
pool.delete(ibtp.Index)
116+
if idx+1 == ibtp.Index {
117+
ex.processIBTP(wIbtp, pool)
132118
index := ibtp.Index + 1
133119
wIbtp := pool.get(index)
134120
for wIbtp != nil {
135-
ibtp := wIbtp.Ibtp
136121
receipt, _ := ex.exec.ExecuteIBTP(wIbtp)
137-
ex.postHandleIBTP(ibtp.From, receipt)
138-
pool.delete(ibtp.Index)
122+
ex.postHandleIBTP(wIbtp.Ibtp.From, receipt)
123+
pool.delete(wIbtp.Ibtp.Index)
139124
index++
140125
wIbtp = pool.get(index)
141126
}
127+
idx = index - 1
142128
} else {
143129
pool.put(wIbtp)
144130
}

internal/exchanger/exchanger.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ func (ex *Exchanger) sendIBTP(ibtp *pb.IBTP) error {
329329
ex.logger.Panic(err)
330330
}
331331
}
332-
entry.Info("Send ibtp success from monitor")
332+
//entry.Info("Send ibtp success from monitor")
333333
return nil
334334
}
335335

0 commit comments

Comments
 (0)