Skip to content

Commit fe24d12

Browse files
swap: refactor lastReceivedCheque, lastSentCheque, balances to peer (ethersphere#1725)
* swap: move lastReceivedCheque, lastSentCheque and balances to peer * swap: move sendCheque and createCheque to peer * swap: remove accounting and store lock in favour of a per-peer lock
1 parent 43f2b87 commit fe24d12

File tree

5 files changed

+352
-363
lines changed

5 files changed

+352
-363
lines changed

swap/peer.go

Lines changed: 129 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717
package swap
1818

1919
import (
20+
"context"
2021
"errors"
22+
"fmt"
23+
"strconv"
24+
"sync"
2125

2226
"github.com/ethereum/go-ethereum/common"
23-
27+
"github.com/ethersphere/swarm/log"
2428
"github.com/ethersphere/swarm/p2p/protocols"
2529
)
2630

@@ -30,18 +34,140 @@ var ErrDontOwe = errors.New("no negative balance")
3034
// Peer is a devp2p peer for the Swap protocol
3135
type Peer struct {
3236
*protocols.Peer
37+
lock sync.RWMutex
3338
swap *Swap
3439
beneficiary common.Address
3540
contractAddress common.Address
3641
lastReceivedCheque *Cheque
42+
lastSentCheque *Cheque
43+
balance int64
3744
}
3845

3946
// NewPeer creates a new swap Peer instance
40-
func NewPeer(p *protocols.Peer, s *Swap, beneficiary common.Address, contractAddress common.Address) *Peer {
41-
return &Peer{
47+
func NewPeer(p *protocols.Peer, s *Swap, beneficiary common.Address, contractAddress common.Address) (peer *Peer, err error) {
48+
peer = &Peer{
4249
Peer: p,
4350
swap: s,
4451
beneficiary: beneficiary,
4552
contractAddress: contractAddress,
4653
}
54+
55+
if peer.lastReceivedCheque, err = s.loadLastReceivedCheque(p.ID()); err != nil {
56+
return nil, err
57+
}
58+
59+
if peer.lastSentCheque, err = s.loadLastSentCheque(p.ID()); err != nil {
60+
return nil, err
61+
}
62+
63+
if peer.balance, err = s.loadBalance(p.ID()); err != nil {
64+
return nil, err
65+
}
66+
return peer, nil
67+
}
68+
69+
func (p *Peer) getLastReceivedCheque() *Cheque {
70+
return p.lastReceivedCheque
71+
}
72+
73+
func (p *Peer) getLastSentCheque() *Cheque {
74+
return p.lastSentCheque
75+
}
76+
77+
func (p *Peer) setLastReceivedCheque(cheque *Cheque) error {
78+
p.lastReceivedCheque = cheque
79+
return p.swap.saveLastReceivedCheque(p.ID(), cheque)
80+
}
81+
82+
func (p *Peer) setLastSentCheque(cheque *Cheque) error {
83+
p.lastSentCheque = cheque
84+
return p.swap.saveLastSentCheque(p.ID(), cheque)
85+
}
86+
87+
func (p *Peer) getLastCumulativePayout() uint64 {
88+
lastCheque := p.getLastReceivedCheque()
89+
if lastCheque != nil {
90+
return lastCheque.CumulativePayout
91+
}
92+
return 0
93+
}
94+
95+
func (p *Peer) setBalance(balance int64) error {
96+
p.balance = balance
97+
return p.swap.saveBalance(p.ID(), balance)
98+
}
99+
100+
func (p *Peer) getBalance() int64 {
101+
return p.balance
102+
}
103+
104+
// To be called with mutex already held
105+
func (p *Peer) updateBalance(amount int64) error {
106+
//adjust the balance
107+
//if amount is negative, it will decrease, otherwise increase
108+
newBalance := p.getBalance() + amount
109+
if err := p.setBalance(newBalance); err != nil {
110+
return err
111+
}
112+
log.Debug("balance for peer after accounting", "peer", p.ID().String(), "balance", strconv.FormatInt(newBalance, 10))
113+
return nil
114+
}
115+
116+
// createCheque creates a new cheque whose beneficiary will be the peer and
117+
// whose amount is based on the last cheque and current balance for this peer
118+
// The cheque will be signed and point to the issuer's contract
119+
// To be called with mutex already held
120+
// Caller must be careful that the same resources aren't concurrently read and written by multiple routines
121+
func (p *Peer) createCheque() (*Cheque, error) {
122+
var cheque *Cheque
123+
var err error
124+
125+
if p.getBalance() >= 0 {
126+
return nil, fmt.Errorf("expected negative balance, found: %d", p.getBalance())
127+
}
128+
// the balance should be negative here, we take the absolute value:
129+
honey := uint64(-p.getBalance())
130+
131+
amount, err := p.swap.oracle.GetPrice(honey)
132+
if err != nil {
133+
return nil, fmt.Errorf("error getting price from oracle: %v", err)
134+
}
135+
136+
total := p.getLastCumulativePayout()
137+
138+
cheque = &Cheque{
139+
ChequeParams: ChequeParams{
140+
CumulativePayout: total + amount,
141+
Contract: p.swap.owner.Contract,
142+
Beneficiary: p.beneficiary,
143+
},
144+
Honey: honey,
145+
}
146+
cheque.Signature, err = cheque.Sign(p.swap.owner.privateKey)
147+
148+
return cheque, err
149+
}
150+
151+
// sendCheque sends a cheque to peer
152+
// To be called with mutex already held
153+
// Caller must be careful that the same resources aren't concurrently read and written by multiple routines
154+
func (p *Peer) sendCheque() error {
155+
cheque, err := p.createCheque()
156+
if err != nil {
157+
return fmt.Errorf("error while creating cheque: %v", err)
158+
}
159+
160+
if err := p.setLastSentCheque(cheque); err != nil {
161+
return fmt.Errorf("error while storing the last cheque: %v", err)
162+
}
163+
164+
if err := p.updateBalance(int64(cheque.Honey)); err != nil {
165+
return err
166+
}
167+
168+
log.Info("sending cheque", "honey", cheque.Honey, "cumulativePayout", cheque.ChequeParams.CumulativePayout, "beneficiary", cheque.Beneficiary, "contract", cheque.Contract)
169+
170+
return p.Send(context.Background(), &EmitChequeMsg{
171+
Cheque: cheque,
172+
})
47173
}

swap/protocol.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,10 @@ func (s *Swap) run(p *p2p.Peer, rw p2p.MsgReadWriter) error {
121121
return err
122122
}
123123

124-
swapPeer := NewPeer(protoPeer, s, beneficiary, response.ContractAddress)
125-
s.addPeer(swapPeer)
124+
swapPeer, err := s.addPeer(protoPeer, beneficiary, response.ContractAddress)
125+
if err != nil {
126+
return err
127+
}
126128
defer s.removePeer(swapPeer)
127129

128130
return swapPeer.Run(s.handleMsg(swapPeer))
@@ -134,17 +136,22 @@ func (s *Swap) removePeer(p *Peer) {
134136
delete(s.peers, p.ID())
135137
}
136138

137-
func (s *Swap) addPeer(p *Peer) {
139+
func (s *Swap) addPeer(protoPeer *protocols.Peer, beneficiary common.Address, contractAddress common.Address) (*Peer, error) {
138140
s.peersLock.Lock()
139141
defer s.peersLock.Unlock()
142+
p, err := NewPeer(protoPeer, s, beneficiary, contractAddress)
143+
if err != nil {
144+
return nil, err
145+
}
140146
s.peers[p.ID()] = p
147+
return p, nil
141148
}
142149

143-
func (s *Swap) getPeer(id enode.ID) (*Peer, bool) {
150+
func (s *Swap) getPeer(id enode.ID) *Peer {
144151
s.peersLock.RLock()
145152
defer s.peersLock.RUnlock()
146-
peer, ok := s.peers[id]
147-
return peer, ok
153+
peer := s.peers[id]
154+
return peer
148155
}
149156

150157
type swapAPI interface {

swap/protocol_test.go

Lines changed: 46 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func TestHandshake(t *testing.T) {
6363
creditor := protocolTester.Nodes[1]
6464

6565
// set balance artifially
66-
swap.balances[creditor.ID()] = -42
66+
swap.saveBalance(creditor.ID(), -42)
6767

6868
// create the expected cheque to be received
6969
cheque := newTestCheque()
@@ -135,14 +135,17 @@ func TestEmitCheque(t *testing.T) {
135135
// create the debitor peer
136136
dPtpPeer := p2p.NewPeer(enode.ID{}, "debitor", []p2p.Cap{})
137137
dProtoPeer := protocols.NewPeer(dPtpPeer, nil, Spec)
138-
debitor := NewPeer(dProtoPeer, creditorSwap, debitorSwap.owner.address, debitorSwap.owner.Contract)
138+
debitor, err := creditorSwap.addPeer(dProtoPeer, debitorSwap.owner.address, debitorSwap.owner.Contract)
139+
if err != nil {
140+
t.Fatal(err)
141+
}
139142

140143
// set balance artificially
141-
creditorSwap.balances[debitor.ID()] = 42
142-
log.Debug("balance", "balance", creditorSwap.balances[debitor.ID()])
144+
debitor.setBalance(42)
145+
log.Debug("balance", "balance", debitor.getBalance())
143146
// a safe check: at this point no cheques should be in the swap
144-
if len(creditorSwap.cheques) != 0 {
145-
t.Fatalf("Expected no cheques at creditor, but there are %d:", len(creditorSwap.cheques))
147+
if debitor.getLastReceivedCheque() != nil {
148+
t.Fatalf("Expected no cheques at creditor, but there is %v:", debitor.getLastReceivedCheque())
146149
}
147150

148151
log.Debug("create a cheque")
@@ -179,15 +182,15 @@ func TestEmitCheque(t *testing.T) {
179182
case <-time.After(4 * time.Second):
180183
t.Fatalf("Timeout waiting for cash transaction to complete")
181184
}
182-
log.Debug("balance", "balance", creditorSwap.balances[debitor.ID()])
185+
log.Debug("balance", "balance", debitor.getBalance())
183186
// check that the balance has been reset
184-
if creditorSwap.balances[debitor.ID()] != 0 {
185-
t.Fatalf("Expected debitor balance to have been reset to %d, but it is %d", 0, creditorSwap.balances[debitor.ID()])
187+
if debitor.getBalance() != 0 {
188+
t.Fatalf("Expected debitor balance to have been reset to %d, but it is %d", 0, debitor.getBalance())
186189
}
187-
recvCheque := creditorSwap.loadLastReceivedCheque(debitor)
190+
recvCheque := debitor.getLastReceivedCheque()
188191
log.Debug("expected cheque", "cheque", recvCheque)
189192
if recvCheque != cheque {
190-
t.Fatalf("Expected exactly one cheque at creditor, but there are %d:", len(creditorSwap.cheques))
193+
t.Fatalf("Expected cheque at creditor, but it was %v:", recvCheque)
191194
}
192195
}
193196

@@ -205,48 +208,48 @@ func TestTriggerPaymentThreshold(t *testing.T) {
205208

206209
// create a dummy pper
207210
cPeer := newDummyPeerWithSpec(Spec)
208-
creditor := NewPeer(cPeer.Peer, debitorSwap, common.Address{}, common.Address{})
209-
// set the creditor as peer into the debitor's swap
210-
debitorSwap.peers[creditor.ID()] = creditor
211+
creditor, err := debitorSwap.addPeer(cPeer.Peer, common.Address{}, common.Address{})
212+
if err != nil {
213+
t.Fatal(err)
214+
}
211215

212216
// set the balance to manually be at PaymentThreshold
213217
overDraft := 42
214-
debitorSwap.balances[creditor.ID()] = 0 - DefaultPaymentThreshold
218+
creditor.setBalance(-DefaultPaymentThreshold)
215219

216220
// we expect a cheque at the end of the test, but not yet
217-
lenCheques := len(debitorSwap.cheques)
218-
if lenCheques != 0 {
219-
t.Fatalf("Expected no cheques yet, but there are %d", lenCheques)
221+
if creditor.getLastSentCheque() != nil {
222+
t.Fatalf("Expected no cheques yet, but there is %v:", creditor.getLastSentCheque())
220223
}
221224
// do some accounting, no error expected, just a WARN
222-
err := debitorSwap.Add(int64(-overDraft), creditor.Peer)
225+
err = debitorSwap.Add(int64(-overDraft), creditor.Peer)
223226
if err != nil {
224227
t.Fatal(err)
225228
}
226229

227230
// we should now have a cheque
228-
lenCheques = len(debitorSwap.cheques)
229-
if lenCheques != 1 {
230-
t.Fatalf("Expected one cheque, but there are %d", lenCheques)
231+
if creditor.getLastSentCheque() == nil {
232+
t.Fatal("Expected one cheque, but there is none")
231233
}
232-
cheque := debitorSwap.cheques[creditor.ID()]
234+
235+
cheque := creditor.getLastSentCheque()
233236
expectedAmount := uint64(overDraft) + DefaultPaymentThreshold
234237
if cheque.CumulativePayout != expectedAmount {
235238
t.Fatalf("Expected cheque cumulative payout to be %d, but is %d", expectedAmount, cheque.CumulativePayout)
236239
}
237240

238241
// because no other accounting took place in the meantime the balance should be exactly 0
239-
if debitorSwap.balances[creditor.ID()] != 0 {
240-
t.Fatalf("Expected debitorSwap balance to be 0, but is %d", debitorSwap.balances[creditor.ID()])
242+
if creditor.getBalance() != 0 {
243+
t.Fatalf("Expected debitorSwap balance to be 0, but is %d", creditor.getBalance())
241244
}
242245

243246
// do some accounting again to trigger a second cheque
244247
if err = debitorSwap.Add(int64(-DefaultPaymentThreshold), creditor.Peer); err != nil {
245248
t.Fatal(err)
246249
}
247250

248-
if debitorSwap.balances[creditor.ID()] != 0 {
249-
t.Fatalf("Expected debitorSwap balance to be 0, but is %d", debitorSwap.balances[creditor.ID()])
251+
if creditor.getBalance() != 0 {
252+
t.Fatalf("Expected debitorSwap balance to be 0, but is %d", creditor.getBalance())
250253
}
251254
}
252255

@@ -260,34 +263,33 @@ func TestTriggerDisconnectThreshold(t *testing.T) {
260263

261264
// create a dummy pper
262265
cPeer := newDummyPeerWithSpec(Spec)
263-
debitor := NewPeer(cPeer.Peer, creditorSwap, common.Address{}, common.Address{})
264-
// set the debitor as peer into the creditor's swap
265-
creditorSwap.peers[debitor.ID()] = debitor
266+
debitor, err := creditorSwap.addPeer(cPeer.Peer, common.Address{}, common.Address{})
267+
if err != nil {
268+
t.Fatal(err)
269+
}
266270

267271
// set the balance to manually be at DisconnectThreshold
268272
overDraft := 42
269273
expectedBalance := int64(DefaultDisconnectThreshold)
270274
// we don't expect any change after the test
271-
creditorSwap.balances[debitor.ID()] = expectedBalance
275+
debitor.setBalance(expectedBalance)
272276
// we also don't expect any cheques yet
273-
lenCheques := len(creditorSwap.cheques)
274-
if lenCheques != 0 {
275-
t.Fatalf("Expected no cheques yet, but there are %d", lenCheques)
277+
if debitor.getLastSentCheque() != nil {
278+
t.Fatalf("Expected no cheques yet, but there is %v", debitor.getLastSentCheque())
276279
}
277280
// now do some accounting
278-
err := creditorSwap.Add(int64(overDraft), debitor.Peer)
281+
err = creditorSwap.Add(int64(overDraft), debitor.Peer)
279282
// it should fail due to overdraft
280283
if err == nil {
281284
t.Fatal("Expected an error due to overdraft, but did not get any")
282285
}
283286
// no balance change expected
284-
if creditorSwap.balances[debitor.ID()] != expectedBalance {
285-
t.Fatalf("Expected balance to be %d, but is %d", expectedBalance, creditorSwap.balances[debitor.ID()])
287+
if debitor.getBalance() != expectedBalance {
288+
t.Fatalf("Expected balance to be %d, but is %d", expectedBalance, debitor.getBalance())
286289
}
287290
// still no cheques expected
288-
lenCheques = len(creditorSwap.cheques)
289-
if lenCheques != 0 {
290-
t.Fatalf("Expected still no cheque, but there are %d", lenCheques)
291+
if debitor.getLastSentCheque() != nil {
292+
t.Fatalf("Expected still no cheques yet, but there is %v", debitor.getLastSentCheque())
291293
}
292294

293295
// let's do the whole thing again (actually a bit silly, it's somehow simulating the peer would have been dropped)
@@ -296,13 +298,12 @@ func TestTriggerDisconnectThreshold(t *testing.T) {
296298
t.Fatal("Expected an error due to overdraft, but did not get any")
297299
}
298300

299-
if creditorSwap.balances[debitor.ID()] != expectedBalance {
300-
t.Fatalf("Expected balance to be %d, but is %d", expectedBalance, creditorSwap.balances[debitor.ID()])
301+
if debitor.getBalance() != expectedBalance {
302+
t.Fatalf("Expected balance to be %d, but is %d", expectedBalance, debitor.getBalance())
301303
}
302304

303-
lenCheques = len(creditorSwap.cheques)
304-
if lenCheques != 0 {
305-
t.Fatalf("Expected still no cheque, but there are %d", lenCheques)
305+
if debitor.getLastSentCheque() != nil {
306+
t.Fatalf("Expected no cheques yet, but there is %v", debitor.getLastSentCheque())
306307
}
307308
}
308309

0 commit comments

Comments
 (0)