Skip to content

Commit ae857e7

Browse files
holisticodezelig
authored andcommitted
swarm, p2p/protocols: Stream accounting (#18337)
* swarm: completed 1st phase of swap accounting * swarm, p2p/protocols: added stream pricing * swarm/network/stream: gofmt simplify stream.go * swarm: fixed review comments * swarm: used snapshots for swap tests * swarm: custom retrieve for swap (less cascaded requests at any one time) * swarm: addressed PR comments * swarm: log output formatting * swarm: removed parallelism in swap tests * swarm: swap tests simplification * swarm: removed swap_test.go * swarm/network/stream: added prefix space for comments * swarm/network/stream: unit test for prices * swarm/network/stream: don't hardcode price * swarm/network/stream: fixed invalid price check
1 parent 56a3f6c commit ae857e7

File tree

3 files changed

+185
-104
lines changed

3 files changed

+185
-104
lines changed

p2p/protocols/accounting.go

Lines changed: 74 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -22,31 +22,33 @@ import (
2222
"github.com/ethereum/go-ethereum/metrics"
2323
)
2424

25-
//define some metrics
25+
// define some metrics
2626
var (
27-
//All metrics are cumulative
27+
// All metrics are cumulative
2828

29-
//total amount of units credited
29+
// total amount of units credited
3030
mBalanceCredit metrics.Counter
31-
//total amount of units debited
31+
// total amount of units debited
3232
mBalanceDebit metrics.Counter
33-
//total amount of bytes credited
33+
// total amount of bytes credited
3434
mBytesCredit metrics.Counter
35-
//total amount of bytes debited
35+
// total amount of bytes debited
3636
mBytesDebit metrics.Counter
37-
//total amount of credited messages
37+
// total amount of credited messages
3838
mMsgCredit metrics.Counter
39-
//total amount of debited messages
39+
// total amount of debited messages
4040
mMsgDebit metrics.Counter
41-
//how many times local node had to drop remote peers
41+
// how many times local node had to drop remote peers
4242
mPeerDrops metrics.Counter
43-
//how many times local node overdrafted and dropped
43+
// how many times local node overdrafted and dropped
4444
mSelfDrops metrics.Counter
45+
46+
MetricsRegistry metrics.Registry
4547
)
4648

47-
//Prices defines how prices are being passed on to the accounting instance
49+
// Prices defines how prices are being passed on to the accounting instance
4850
type Prices interface {
49-
//Return the Price for a message
51+
// Return the Price for a message
5052
Price(interface{}) *Price
5153
}
5254

@@ -57,20 +59,20 @@ const (
5759
Receiver = Payer(false)
5860
)
5961

60-
//Price represents the costs of a message
62+
// Price represents the costs of a message
6163
type Price struct {
62-
Value uint64 //
63-
PerByte bool //True if the price is per byte or for unit
64+
Value uint64
65+
PerByte bool // True if the price is per byte or for unit
6466
Payer Payer
6567
}
6668

67-
//For gives back the price for a message
68-
//A protocol provides the message price in absolute value
69-
//This method then returns the correct signed amount,
70-
//depending on who pays, which is identified by the `payer` argument:
71-
//`Send` will pass a `Sender` payer, `Receive` will pass the `Receiver` argument.
72-
//Thus: If Sending and sender pays, amount positive, otherwise negative
73-
//If Receiving, and receiver pays, amount positive, otherwise negative
69+
// For gives back the price for a message
70+
// A protocol provides the message price in absolute value
71+
// This method then returns the correct signed amount,
72+
// depending on who pays, which is identified by the `payer` argument:
73+
// `Send` will pass a `Sender` payer, `Receive` will pass the `Receiver` argument.
74+
// Thus: If Sending and sender pays, amount positive, otherwise negative
75+
// If Receiving, and receiver pays, amount positive, otherwise negative
7476
func (p *Price) For(payer Payer, size uint32) int64 {
7577
price := p.Value
7678
if p.PerByte {
@@ -82,22 +84,22 @@ func (p *Price) For(payer Payer, size uint32) int64 {
8284
return int64(price)
8385
}
8486

85-
//Balance is the actual accounting instance
86-
//Balance defines the operations needed for accounting
87-
//Implementations internally maintain the balance for every peer
87+
// Balance is the actual accounting instance
88+
// Balance defines the operations needed for accounting
89+
// Implementations internally maintain the balance for every peer
8890
type Balance interface {
89-
//Adds amount to the local balance with remote node `peer`;
90-
//positive amount = credit local node
91-
//negative amount = debit local node
91+
// Adds amount to the local balance with remote node `peer`;
92+
// positive amount = credit local node
93+
// negative amount = debit local node
9294
Add(amount int64, peer *Peer) error
9395
}
9496

95-
//Accounting implements the Hook interface
96-
//It interfaces to the balances through the Balance interface,
97-
//while interfacing with protocols and its prices through the Prices interface
97+
// Accounting implements the Hook interface
98+
// It interfaces to the balances through the Balance interface,
99+
// while interfacing with protocols and its prices through the Prices interface
98100
type Accounting struct {
99-
Balance //interface to accounting logic
100-
Prices //interface to prices logic
101+
Balance // interface to accounting logic
102+
Prices // interface to prices logic
101103
}
102104

103105
func NewAccounting(balance Balance, po Prices) *Accounting {
@@ -108,87 +110,85 @@ func NewAccounting(balance Balance, po Prices) *Accounting {
108110
return ah
109111
}
110112

111-
//SetupAccountingMetrics creates a separate registry for p2p accounting metrics;
112-
//this registry should be independent of any other metrics as it persists at different endpoints.
113-
//It also instantiates the given metrics and starts the persisting go-routine which
114-
//at the passed interval writes the metrics to a LevelDB
113+
// SetupAccountingMetrics creates a separate registry for p2p accounting metrics;
114+
// this registry should be independent of any other metrics as it persists at different endpoints.
115+
// It also instantiates the given metrics and starts the persisting go-routine which
116+
// at the passed interval writes the metrics to a LevelDB
115117
func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics {
116-
//create an empty registry
117-
registry := metrics.NewRegistry()
118-
//instantiate the metrics
119-
mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", registry)
120-
mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", registry)
121-
mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", registry)
122-
mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", registry)
123-
mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", registry)
124-
mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", registry)
125-
mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", registry)
126-
mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", registry)
127-
//create the DB and start persisting
128-
return NewAccountingMetrics(registry, reportInterval, path)
118+
// create an empty registry
119+
MetricsRegistry = metrics.NewRegistry()
120+
// instantiate the metrics
121+
mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", MetricsRegistry)
122+
mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", MetricsRegistry)
123+
mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", MetricsRegistry)
124+
mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", MetricsRegistry)
125+
mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", MetricsRegistry)
126+
mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", MetricsRegistry)
127+
mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", MetricsRegistry)
128+
mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", MetricsRegistry)
129+
// create the DB and start persisting
130+
return NewAccountingMetrics(MetricsRegistry, reportInterval, path)
129131
}
130132

131-
//Implement Hook.Send
132133
// Send takes a peer, a size and a msg and
133-
// - calculates the cost for the local node sending a msg of size to peer using the Prices interface
134-
// - credits/debits local node using balance interface
134+
// - calculates the cost for the local node sending a msg of size to peer using the Prices interface
135+
// - credits/debits local node using balance interface
135136
func (ah *Accounting) Send(peer *Peer, size uint32, msg interface{}) error {
136-
//get the price for a message (through the protocol spec)
137+
// get the price for a message (through the protocol spec)
137138
price := ah.Price(msg)
138-
//this message doesn't need accounting
139+
// this message doesn't need accounting
139140
if price == nil {
140141
return nil
141142
}
142-
//evaluate the price for sending messages
143+
// evaluate the price for sending messages
143144
costToLocalNode := price.For(Sender, size)
144-
//do the accounting
145+
// do the accounting
145146
err := ah.Add(costToLocalNode, peer)
146-
//record metrics: just increase counters for user-facing metrics
147+
// record metrics: just increase counters for user-facing metrics
147148
ah.doMetrics(costToLocalNode, size, err)
148149
return err
149150
}
150151

151-
//Implement Hook.Receive
152152
// Receive takes a peer, a size and a msg and
153-
// - calculates the cost for the local node receiving a msg of size from peer using the Prices interface
154-
// - credits/debits local node using balance interface
153+
// - calculates the cost for the local node receiving a msg of size from peer using the Prices interface
154+
// - credits/debits local node using balance interface
155155
func (ah *Accounting) Receive(peer *Peer, size uint32, msg interface{}) error {
156-
//get the price for a message (through the protocol spec)
156+
// get the price for a message (through the protocol spec)
157157
price := ah.Price(msg)
158-
//this message doesn't need accounting
158+
// this message doesn't need accounting
159159
if price == nil {
160160
return nil
161161
}
162-
//evaluate the price for receiving messages
162+
// evaluate the price for receiving messages
163163
costToLocalNode := price.For(Receiver, size)
164-
//do the accounting
164+
// do the accounting
165165
err := ah.Add(costToLocalNode, peer)
166-
//record metrics: just increase counters for user-facing metrics
166+
// record metrics: just increase counters for user-facing metrics
167167
ah.doMetrics(costToLocalNode, size, err)
168168
return err
169169
}
170170

171-
//record some metrics
172-
//this is not an error handling. `err` is returned by both `Send` and `Receive`
173-
//`err` will only be non-nil if a limit has been violated (overdraft), in which case the peer has been dropped.
174-
//if the limit has been violated and `err` is thus not nil:
175-
// * if the price is positive, local node has been credited; thus `err` implicitly signals the REMOTE has been dropped
176-
// * if the price is negative, local node has been debited, thus `err` implicitly signals LOCAL node "overdraft"
171+
// record some metrics
172+
// this is not an error handling. `err` is returned by both `Send` and `Receive`
173+
// `err` will only be non-nil if a limit has been violated (overdraft), in which case the peer has been dropped.
174+
// if the limit has been violated and `err` is thus not nil:
175+
// * if the price is positive, local node has been credited; thus `err` implicitly signals the REMOTE has been dropped
176+
// * if the price is negative, local node has been debited, thus `err` implicitly signals LOCAL node "overdraft"
177177
func (ah *Accounting) doMetrics(price int64, size uint32, err error) {
178178
if price > 0 {
179179
mBalanceCredit.Inc(price)
180180
mBytesCredit.Inc(int64(size))
181181
mMsgCredit.Inc(1)
182182
if err != nil {
183-
//increase the number of times a remote node has been dropped due to "overdraft"
183+
// increase the number of times a remote node has been dropped due to "overdraft"
184184
mPeerDrops.Inc(1)
185185
}
186186
} else {
187187
mBalanceDebit.Inc(price)
188188
mBytesDebit.Inc(int64(size))
189189
mMsgDebit.Inc(1)
190190
if err != nil {
191-
//increase the number of times the local node has done an "overdraft" in respect to other nodes
191+
// increase the number of times the local node has done an "overdraft" in respect to other nodes
192192
mSelfDrops.Inc(1)
193193
}
194194
}

0 commit comments

Comments
 (0)