Skip to content

Commit 69442c3

Browse files
Hasyimi Bahrudiniwasaki-kenta
authored andcommitted
Improve nops broadcasting. (#150)
* ledger: add failing test * ledger: stop nop broadcast if depth > last added tx depth * ledger: wait for faucet in test * ledger: fix panic due to closed channel * ledger: check if nop broadcast is stopped in test * ledger: increase consensus timeout in test * ledger: rename test and add comments * ledger: disable GC when running tests * ledger, snowball: revert test code * ledger: poll round to wait for consensus * ledger: don't print missing parent error * ledger: allow test to run up to 5 minutes * api, cmd/graph, cmd/wavelet, ledger: use options * ledger: remove stray defer statement * ledger: reduce number of nodes and add sleep * ledger: add failing test to check excessive consensus round * ledger: fix excessive nop * ledger: revert error silencing * ledger: improve test
1 parent 0c18e67 commit 69442c3

File tree

9 files changed

+553
-40
lines changed

9 files changed

+553
-40
lines changed

accounts.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@ package wavelet
2121

2222
import (
2323
"context"
24-
"github.com/perlin-network/wavelet/avl"
25-
"github.com/perlin-network/wavelet/store"
26-
"github.com/pkg/errors"
2724
"sync"
2825
"sync/atomic"
2926
"time"
3027
"unsafe"
28+
29+
"github.com/perlin-network/wavelet/avl"
30+
"github.com/perlin-network/wavelet/store"
31+
"github.com/pkg/errors"
3132
)
3233

3334
type Accounts struct {

api/mod_test.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,6 @@ import (
2626
"encoding/hex"
2727
"encoding/json"
2828
"fmt"
29-
"github.com/buaazp/fasthttprouter"
30-
"github.com/perlin-network/noise/skademlia"
31-
"github.com/perlin-network/wavelet"
32-
"github.com/perlin-network/wavelet/store"
33-
"github.com/perlin-network/wavelet/sys"
34-
"github.com/pkg/errors"
35-
"github.com/stretchr/testify/assert"
36-
"github.com/valyala/fasthttp"
37-
"github.com/valyala/fastjson"
3829
"io/ioutil"
3930
"net"
4031
"net/http"
@@ -44,6 +35,16 @@ import (
4435
"testing"
4536
"testing/quick"
4637
"time"
38+
39+
"github.com/buaazp/fasthttprouter"
40+
"github.com/perlin-network/noise/skademlia"
41+
"github.com/perlin-network/wavelet"
42+
"github.com/perlin-network/wavelet/store"
43+
"github.com/perlin-network/wavelet/sys"
44+
"github.com/pkg/errors"
45+
"github.com/stretchr/testify/assert"
46+
"github.com/valyala/fasthttp"
47+
"github.com/valyala/fastjson"
4748
)
4849

4950
func TestListTransaction(t *testing.T) {
@@ -744,7 +745,7 @@ func createLedger(t *testing.T) *wavelet.Ledger {
744745
keys, err := skademlia.NewKeys(1, 1)
745746
assert.NoError(t, err)
746747

747-
ledger := wavelet.NewLedger(store.NewInmem(), skademlia.NewClient(":0", keys), nil)
748+
ledger := wavelet.NewLedger(store.NewInmem(), skademlia.NewClient(":0", keys))
748749
return ledger
749750
}
750751

api/mod_ws_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func TestPollLog(t *testing.T) {
4444
keys, err := skademlia.NewKeys(1, 1)
4545
assert.NoError(t, err)
4646

47-
ledger := wavelet.NewLedger(store.NewInmem(), skademlia.NewClient(":0", keys), nil)
47+
ledger := wavelet.NewLedger(store.NewInmem(), skademlia.NewClient(":0", keys))
4848

4949
go gateway.StartHTTP(8080, nil, ledger, keys)
5050
defer gateway.Shutdown()

cmd/graph/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func main() {
113113

114114
client.SetCredentials(noise.NewCredentials(addr, handshake.NewECDH(), cipher.NewAEAD(), client.Protocol()))
115115

116-
ledger := wavelet.NewLedger(store.NewInmem(), client, nil)
116+
ledger := wavelet.NewLedger(store.NewInmem(), client)
117117

118118
go func() {
119119
server := client.Listen()

cmd/wavelet/main.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@ import (
2323
"encoding/hex"
2424
"errors"
2525
"fmt"
26+
"io/ioutil"
27+
"net"
28+
"net/http"
29+
"os"
30+
"sort"
31+
"strconv"
32+
"time"
33+
2634
"github.com/perlin-network/noise"
2735
"github.com/perlin-network/noise/cipher"
2836
"github.com/perlin-network/noise/edwards25519"
@@ -38,13 +46,6 @@ import (
3846
"google.golang.org/grpc"
3947
"gopkg.in/urfave/cli.v1"
4048
"gopkg.in/urfave/cli.v1/altsrc"
41-
"io/ioutil"
42-
"net"
43-
"net/http"
44-
"os"
45-
"sort"
46-
"strconv"
47-
"time"
4849
)
4950

5051
import _ "net/http/pprof"
@@ -304,7 +305,7 @@ func start(cfg *Config) {
304305
logger.Fatal().Err(err).Msgf("Failed to create/open database located at %q.", cfg.Database)
305306
}
306307

307-
ledger := wavelet.NewLedger(kv, client, cfg.Genesis)
308+
ledger := wavelet.NewLedger(kv, client, wavelet.WithGenesis(cfg.Genesis))
308309

309310
go func() {
310311
server := client.Listen()

ledger.go

Lines changed: 61 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ import (
2525
"encoding/binary"
2626
"encoding/hex"
2727
"fmt"
28+
"math/rand"
29+
"strings"
30+
"sync"
31+
"time"
32+
2833
"github.com/perlin-network/noise"
2934
"github.com/perlin-network/noise/skademlia"
3035
"github.com/perlin-network/wavelet/avl"
@@ -36,10 +41,6 @@ import (
3641
"google.golang.org/grpc"
3742
"google.golang.org/grpc/connectivity"
3843
"google.golang.org/grpc/peer"
39-
"math/rand"
40-
"strings"
41-
"sync"
42-
"time"
4344
)
4445

4546
type Ledger struct {
@@ -57,9 +58,10 @@ type Ledger struct {
5758

5859
consensus sync.WaitGroup
5960

60-
broadcastNops bool
61-
broadcastNopsDelay time.Time
62-
broadcastNopsLock sync.Mutex
61+
broadcastNops bool
62+
broadcastNopsMaxDepth uint64
63+
broadcastNopsDelay time.Time
64+
broadcastNopsLock sync.Mutex
6365

6466
sync chan struct{}
6567
syncVotes chan vote
@@ -70,21 +72,49 @@ type Ledger struct {
7072
sendQuota chan struct{}
7173
}
7274

73-
func NewLedger(kv store.KV, client *skademlia.Client, genesis *string) *Ledger {
75+
type config struct {
76+
GCDisabled bool
77+
Genesis *string
78+
}
79+
80+
type Option func(cfg *config)
81+
82+
// WithoutGC disables GC. Used for testing purposes.
83+
func WithoutGC() Option {
84+
return func(cfg *config) {
85+
cfg.GCDisabled = true
86+
}
87+
}
88+
89+
func WithGenesis(genesis *string) Option {
90+
return func(cfg *config) {
91+
cfg.Genesis = genesis
92+
}
93+
}
94+
95+
func NewLedger(kv store.KV, client *skademlia.Client, opts ...Option) *Ledger {
96+
var cfg config
97+
for _, opt := range opts {
98+
opt(&cfg)
99+
}
100+
74101
logger := log.Node()
75102

76103
metrics := NewMetrics(context.TODO())
77104
indexer := NewIndexer()
78105

79106
accounts := NewAccounts(kv)
80-
go accounts.GC(context.Background())
107+
108+
if !cfg.GCDisabled {
109+
go accounts.GC(context.Background())
110+
}
81111

82112
rounds, err := NewRounds(kv, sys.PruningLimit)
83113

84114
var round *Round
85115

86116
if rounds != nil && err != nil {
87-
genesis := performInception(accounts.tree, genesis)
117+
genesis := performInception(accounts.tree, cfg.Genesis)
88118
if err := accounts.Commit(nil); err != nil {
89119
logger.Fatal().Err(err).Msg("BUG: accounts.Commit")
90120
}
@@ -162,12 +192,13 @@ func (l *Ledger) AddTransaction(tx Transaction) error {
162192
l.gossiper.Push(tx)
163193

164194
l.broadcastNopsLock.Lock()
165-
if tx.Tag != sys.TagNop {
195+
if tx.Tag != sys.TagNop && tx.Sender == l.client.Keys().PublicKey() {
196+
l.broadcastNops = true
166197
l.broadcastNopsDelay = time.Now()
167-
}
168198

169-
if tx.Sender == l.client.Keys().PublicKey() && l.finalizer.Preferred() == nil {
170-
l.broadcastNops = true
199+
if tx.Depth > l.broadcastNopsMaxDepth {
200+
l.broadcastNopsMaxDepth = tx.Depth
201+
}
171202
}
172203
l.broadcastNopsLock.Unlock()
173204
}
@@ -271,6 +302,16 @@ func (l *Ledger) Snapshot() *avl.Tree {
271302
return l.accounts.Snapshot()
272303
}
273304

305+
// BroadcastingNop returns true if the node is
306+
// supposed to broadcast nop transaction.
307+
func (l *Ledger) BroadcastingNop() bool {
308+
l.broadcastNopsLock.Lock()
309+
broadcastNops := l.broadcastNops
310+
l.broadcastNopsLock.Unlock()
311+
312+
return broadcastNops
313+
}
314+
274315
// BroadcastNop has the node send a nop transaction should they have sufficient
275316
// balance available. They are broadcasted if no other transaction that is not a nop transaction
276317
// is not broadcasted by the node after 500 milliseconds. These conditions only apply so long as
@@ -387,7 +428,7 @@ func (l *Ledger) PullMissingTransactions() {
387428
for _, buf := range batch.Transactions {
388429
tx, err := UnmarshalTransaction(bytes.NewReader(buf))
389430
if err != nil {
390-
fmt.Printf("error unmarshaling downloaded tx [%v]: %+v", err, tx)
431+
fmt.Printf("error unmarshaling downloaded tx [%v]: %+v\n", err, tx)
391432
continue
392433
}
393434

@@ -474,8 +515,12 @@ FINALIZE_ROUNDS:
474515
continue FINALIZE_ROUNDS
475516
}
476517

518+
// Only stop broadcasting nops if the most recently added transaction
519+
// has been applied
477520
l.broadcastNopsLock.Lock()
478-
l.broadcastNops = false
521+
if l.broadcastNops && l.broadcastNopsMaxDepth <= l.finalizer.Preferred().End.Depth {
522+
l.broadcastNops = false
523+
}
479524
l.broadcastNopsLock.Unlock()
480525

481526
workerChan := make(chan *grpc.ClientConn, 16)

ledger_test.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package wavelet
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
)
11+
12+
// TestLedger_BroadcastNop checks that:
13+
//
14+
// * The ledger will keep broadcasting nop tx as long
15+
// as there are unapplied tx (latestTxDepth <= rootDepth).
16+
//
17+
// * The ledger will stop broadcasting nop once there
18+
// are no more unapplied tx.
19+
func TestLedger_BroadcastNop(t *testing.T) {
20+
testnet := NewTestNetwork(t)
21+
defer testnet.Cleanup()
22+
23+
for i := 0; i < 3; i++ {
24+
testnet.AddNode(t, 0)
25+
}
26+
27+
alice := testnet.AddNode(t, 1000000)
28+
bob := testnet.AddNode(t, 0)
29+
30+
// Wait for alice to receive her PERL from the faucet
31+
for <-alice.WaitForConsensus() {
32+
if alice.Balance() > 0 {
33+
break
34+
}
35+
}
36+
37+
// Add lots of transactions
38+
var txsLock sync.Mutex
39+
txs := make([]Transaction, 0, 10000)
40+
41+
go func() {
42+
for i := 0; i < cap(txs); i++ {
43+
tx, err := alice.Pay(bob, 1)
44+
assert.NoError(t, err)
45+
46+
txsLock.Lock()
47+
txs = append(txs, tx)
48+
txsLock.Unlock()
49+
50+
// Somehow this prevents AddTransaction from
51+
// returning ErrMissingParents
52+
time.Sleep(time.Nanosecond * 1)
53+
}
54+
}()
55+
56+
prevRound := alice.ledger.Rounds().Latest().Index
57+
timeout := time.NewTimer(time.Minute * 5)
58+
for {
59+
select {
60+
case <-timeout.C:
61+
t.Fatal("timed out before all transactions are applied")
62+
63+
case <-alice.WaitForConsensus():
64+
var appliedCount int
65+
var txsCount int
66+
67+
txsLock.Lock()
68+
for _, tx := range txs {
69+
if alice.Applied(tx) {
70+
appliedCount++
71+
}
72+
txsCount++
73+
}
74+
txsLock.Unlock()
75+
76+
currRound := alice.ledger.Rounds().Latest().Index
77+
78+
fmt.Printf("%d/%d tx applied, round=%d, root depth=%d\n",
79+
appliedCount, txsCount,
80+
currRound,
81+
alice.ledger.Graph().RootDepth())
82+
83+
if currRound-prevRound > 1 {
84+
t.Fatal("more than 1 round finalized")
85+
}
86+
87+
prevRound = currRound
88+
89+
if appliedCount < cap(txs) {
90+
assert.True(t, alice.ledger.BroadcastingNop(),
91+
"node should not stop broadcasting nop while there are unapplied tx")
92+
}
93+
94+
// The test is successful if all tx are applied,
95+
// and nop broadcasting is stopped once all tx are applied
96+
if appliedCount == cap(txs) && !alice.ledger.BroadcastingNop() {
97+
return
98+
}
99+
}
100+
}
101+
}
102+
103+
func TestLedger_AddTransaction(t *testing.T) {
104+
testnet := NewTestNetwork(t)
105+
defer testnet.Cleanup()
106+
107+
alice := testnet.AddNode(t, 0) // alice
108+
testnet.AddNode(t, 0) // bob
109+
110+
start := alice.ledger.Rounds().Latest().Index
111+
112+
// Add just 1 transaction
113+
_, err := testnet.faucet.PlaceStake(100)
114+
assert.NoError(t, err)
115+
116+
// Try to wait for 2 rounds of consensus.
117+
// The second call should result in timeout, because
118+
// only 1 round should be finalized.
119+
<-alice.WaitForConsensus()
120+
<-alice.WaitForConsensus()
121+
122+
current := alice.ledger.Rounds().Latest().Index
123+
if current-start > 1 {
124+
t.Fatal("more than 1 round finalized")
125+
}
126+
}

0 commit comments

Comments
 (0)