@@ -24,7 +24,7 @@ import (
2424 "github.com/ethereum/go-ethereum/common/mclock"
2525)
2626
27- const fcTimeConst = 1000000
27+ const fcTimeConst = time . Millisecond
2828
2929type ServerParams struct {
3030 BufLimit , MinRecharge uint64
@@ -33,7 +33,7 @@ type ServerParams struct {
3333type ClientNode struct {
3434 params * ServerParams
3535 bufValue uint64
36- lastTime int64
36+ lastTime mclock. AbsTime
3737 lock sync.Mutex
3838 cm * ClientManager
3939 cmNode * cmNode
@@ -44,7 +44,7 @@ func NewClientNode(cm *ClientManager, params *ServerParams) *ClientNode {
4444 cm : cm ,
4545 params : params ,
4646 bufValue : params .BufLimit ,
47- lastTime : getTime (),
47+ lastTime : mclock . Now (),
4848 }
4949 node .cmNode = cm .addNode (node )
5050 return node
@@ -54,12 +54,12 @@ func (peer *ClientNode) Remove(cm *ClientManager) {
5454 cm .removeNode (peer .cmNode )
5555}
5656
57- func (peer * ClientNode ) recalcBV (time int64 ) {
57+ func (peer * ClientNode ) recalcBV (time mclock. AbsTime ) {
5858 dt := uint64 (time - peer .lastTime )
5959 if time < peer .lastTime {
6060 dt = 0
6161 }
62- peer .bufValue += peer .params .MinRecharge * dt / fcTimeConst
62+ peer .bufValue += peer .params .MinRecharge * dt / uint64 ( fcTimeConst )
6363 if peer .bufValue > peer .params .BufLimit {
6464 peer .bufValue = peer .params .BufLimit
6565 }
@@ -70,7 +70,7 @@ func (peer *ClientNode) AcceptRequest() (uint64, bool) {
7070 peer .lock .Lock ()
7171 defer peer .lock .Unlock ()
7272
73- time := getTime ()
73+ time := mclock . Now ()
7474 peer .recalcBV (time )
7575 return peer .bufValue , peer .cm .accept (peer .cmNode , time )
7676}
@@ -79,7 +79,7 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) {
7979 peer .lock .Lock ()
8080 defer peer .lock .Unlock ()
8181
82- time := getTime ()
82+ time := mclock . Now ()
8383 peer .recalcBV (time )
8484 peer .bufValue -= cost
8585 peer .recalcBV (time )
@@ -94,66 +94,127 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) {
9494}
9595
9696type ServerNode struct {
97- bufEstimate uint64
98- lastTime int64
99- params * ServerParams
100- sumCost uint64 // sum of req costs sent to this server
101- pending map [uint64 ]uint64 // value = sumCost after sending the given req
102- lock sync.RWMutex
97+ bufEstimate uint64
98+ lastTime mclock.AbsTime
99+ params * ServerParams
100+ sumCost uint64 // sum of req costs sent to this server
101+ pending map [uint64 ]uint64 // value = sumCost after sending the given req
102+ assignedRequest uint64 // when != 0, only the request with the given ID can be sent to this peer
103+ assignToken chan struct {} // send to this channel before assigning, read from it after deassigning
104+ lock sync.RWMutex
103105}
104106
105107func NewServerNode (params * ServerParams ) * ServerNode {
106108 return & ServerNode {
107109 bufEstimate : params .BufLimit ,
108- lastTime : getTime (),
110+ lastTime : mclock . Now (),
109111 params : params ,
110112 pending : make (map [uint64 ]uint64 ),
113+ assignToken : make (chan struct {}, 1 ),
111114 }
112115}
113116
114- func getTime () int64 {
115- return int64 (mclock .Now ())
116- }
117-
118- func (peer * ServerNode ) recalcBLE (time int64 ) {
117+ func (peer * ServerNode ) recalcBLE (time mclock.AbsTime ) {
119118 dt := uint64 (time - peer .lastTime )
120119 if time < peer .lastTime {
121120 dt = 0
122121 }
123- peer .bufEstimate += peer .params .MinRecharge * dt / fcTimeConst
122+ peer .bufEstimate += peer .params .MinRecharge * dt / uint64 ( fcTimeConst )
124123 if peer .bufEstimate > peer .params .BufLimit {
125124 peer .bufEstimate = peer .params .BufLimit
126125 }
127126 peer .lastTime = time
128127}
129128
130- func (peer * ServerNode ) canSend (maxCost uint64 ) uint64 {
129+ // safetyMargin is added to the flow control waiting time when estimated buffer value is low
130+ const safetyMargin = time .Millisecond * 200
131+
132+ func (peer * ServerNode ) canSend (maxCost uint64 ) time.Duration {
133+ maxCost += uint64 (safetyMargin ) * peer .params .MinRecharge / uint64 (fcTimeConst )
134+ if maxCost > peer .params .BufLimit {
135+ maxCost = peer .params .BufLimit
136+ }
131137 if peer .bufEstimate >= maxCost {
132138 return 0
133139 }
134- return ( maxCost - peer .bufEstimate ) * fcTimeConst / peer .params .MinRecharge
140+ return time . Duration (( maxCost - peer .bufEstimate ) * uint64 ( fcTimeConst ) / peer .params .MinRecharge )
135141}
136142
137- func (peer * ServerNode ) CanSend (maxCost uint64 ) uint64 {
143+ // CanSend returns the minimum waiting time required before sending a request
144+ // with the given maximum estimated cost
145+ func (peer * ServerNode ) CanSend (maxCost uint64 ) time.Duration {
138146 peer .lock .RLock ()
139147 defer peer .lock .RUnlock ()
140148
141149 return peer .canSend (maxCost )
142150}
143151
152+ // AssignRequest tries to assign the server node to the given request, guaranteeing
153+ // that once it returns true, no request will be sent to the node before this one
154+ func (peer * ServerNode ) AssignRequest (reqID uint64 ) bool {
155+ select {
156+ case peer .assignToken <- struct {}{}:
157+ default :
158+ return false
159+ }
160+ peer .lock .Lock ()
161+ peer .assignedRequest = reqID
162+ peer .lock .Unlock ()
163+ return true
164+ }
165+
166+ // MustAssignRequest waits until the node can be assigned to the given request.
167+ // It is always guaranteed that assignments are released in a short amount of time.
168+ func (peer * ServerNode ) MustAssignRequest (reqID uint64 ) {
169+ peer .assignToken <- struct {}{}
170+ peer .lock .Lock ()
171+ peer .assignedRequest = reqID
172+ peer .lock .Unlock ()
173+ }
174+
175+ // DeassignRequest releases a request assignment in case the planned request
176+ // is not being sent.
177+ func (peer * ServerNode ) DeassignRequest (reqID uint64 ) {
178+ peer .lock .Lock ()
179+ if peer .assignedRequest == reqID {
180+ peer .assignedRequest = 0
181+ <- peer .assignToken
182+ }
183+ peer .lock .Unlock ()
184+ }
185+
186+ // IsAssigned returns true if the server node has already been assigned to a request
187+ // (note that this function returning false does not guarantee that you can assign a request
188+ // immediately afterwards, its only purpose is to help peer selection)
189+ func (peer * ServerNode ) IsAssigned () bool {
190+ peer .lock .RLock ()
191+ locked := peer .assignedRequest != 0
192+ peer .lock .RUnlock ()
193+ return locked
194+ }
195+
144196// blocks until request can be sent
145197func (peer * ServerNode ) SendRequest (reqID , maxCost uint64 ) {
146198 peer .lock .Lock ()
147199 defer peer .lock .Unlock ()
148200
149- peer .recalcBLE (getTime ())
150- for peer .bufEstimate < maxCost {
151- wait := time .Duration (peer .canSend (maxCost ))
201+ if peer .assignedRequest != reqID {
202+ peer .lock .Unlock ()
203+ peer .MustAssignRequest (reqID )
204+ peer .lock .Lock ()
205+ }
206+
207+ peer .recalcBLE (mclock .Now ())
208+ wait := peer .canSend (maxCost )
209+ for wait > 0 {
152210 peer .lock .Unlock ()
153211 time .Sleep (wait )
154212 peer .lock .Lock ()
155- peer .recalcBLE (getTime ())
213+ peer .recalcBLE (mclock .Now ())
214+ wait = peer .canSend (maxCost )
156215 }
216+ peer .assignedRequest = 0
217+ <- peer .assignToken
157218 peer .bufEstimate -= maxCost
158219 peer .sumCost += maxCost
159220 if reqID >= 0 {
@@ -162,14 +223,18 @@ func (peer *ServerNode) SendRequest(reqID, maxCost uint64) {
162223}
163224
164225func (peer * ServerNode ) GotReply (reqID , bv uint64 ) {
226+
165227 peer .lock .Lock ()
166228 defer peer .lock .Unlock ()
167229
230+ if bv > peer .params .BufLimit {
231+ bv = peer .params .BufLimit
232+ }
168233 sc , ok := peer .pending [reqID ]
169234 if ! ok {
170235 return
171236 }
172237 delete (peer .pending , reqID )
173238 peer .bufEstimate = bv - (peer .sumCost - sc )
174- peer .lastTime = getTime ()
239+ peer .lastTime = mclock . Now ()
175240}
0 commit comments