Skip to content

Commit 2d790a7

Browse files
committed
fix(direct):range err
1 parent eeab695 commit 2d790a7

File tree

2 files changed

+33
-22
lines changed

2 files changed

+33
-22
lines changed

internal/adapt/direct_adapter/adapter.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package direct_adapter
22

33
import (
4+
"context"
45
"fmt"
56
"runtime"
67
"sync"
@@ -33,6 +34,8 @@ type DirectAdapter struct {
3334
ibtps sync.Map
3435
appchainID string
3536
remotePierID string
37+
ctx context.Context
38+
cancel context.CancelFunc
3639
gopool *pool
3740
}
3841

@@ -55,7 +58,7 @@ func (d *DirectAdapter) GetServiceIDList() ([]string, error) {
5558
func New(peerMgr peermgr.PeerManager, appchainAdapt adapt.Adapt, logger logrus.FieldLogger) (*DirectAdapter, error) {
5659

5760
appchainID := appchainAdapt.ID()
58-
61+
ctx, cancel := context.WithCancel(context.Background())
5962
da := &DirectAdapter{
6063
logger: logger,
6164
peerMgr: peerMgr,
@@ -64,6 +67,8 @@ func New(peerMgr peermgr.PeerManager, appchainAdapt adapt.Adapt, logger logrus.F
6467
ibtpC: make(chan *pb.IBTP, maxChSize),
6568
appchainID: appchainID,
6669
gopool: NewGoPool(runtime.GOMAXPROCS(runtime.NumCPU())),
70+
ctx: ctx,
71+
cancel: cancel,
6772
}
6873

6974
return da, nil

internal/adapt/direct_adapter/handler.go

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -82,31 +82,37 @@ func (d *DirectAdapter) handleSendIBTPMessage(stream network.Stream, msg *pb.Mes
8282
}()
8383
d.ibtpC <- ibtp
8484
index := ibtp.Index
85+
ticker := time.NewTicker(time.Second)
86+
defer ticker.Stop()
8587
for {
86-
pool.lock.Lock()
87-
if item := pool.ibtps.Min(); item != nil {
88-
if item.(*MyTree).index < index+1 {
89-
pool.ibtps.DeleteMin()
90-
}
91-
92-
if item.(*MyTree).index == index+1 {
93-
d.ibtpC <- item.(*MyTree).ibtp
94-
pool.ibtps.DeleteMin()
95-
index++
96-
pool.time = time.Now()
97-
}
98-
99-
// By default, the index will be equalized after 5 seconds
100-
if time.Now().Sub(pool.time).Seconds() > 5.0 {
101-
d.ibtpC <- item.(*MyTree).ibtp
102-
pool.ibtps.DeleteMin()
103-
index = item.(*MyTree).index
104-
pool.time = time.Now()
88+
select {
89+
case <-ticker.C:
90+
pool.lock.Lock()
91+
if item := pool.ibtps.Min(); item != nil {
92+
if item.(*MyTree).index < index+1 {
93+
pool.ibtps.DeleteMin()
94+
}
95+
96+
if item.(*MyTree).index == index+1 {
97+
d.ibtpC <- item.(*MyTree).ibtp
98+
pool.ibtps.DeleteMin()
99+
index++
100+
pool.time = time.Now()
101+
}
102+
103+
// By default, the index will be equalized after 5 seconds
104+
if time.Now().Sub(pool.time).Seconds() > 5.0 {
105+
d.ibtpC <- item.(*MyTree).ibtp
106+
pool.ibtps.DeleteMin()
107+
index = item.(*MyTree).index
108+
pool.time = time.Now()
109+
}
105110
}
111+
pool.lock.Unlock()
112+
case <-d.ctx.Done():
113+
return
106114
}
107-
pool.lock.Unlock()
108115
}
109-
110116
}(pool, ibtp)
111117
} else {
112118
pool.lock.Lock()

0 commit comments

Comments
 (0)