Skip to content

Commit e710a92

Browse files
authored
Merge branch 'main' into connection
2 parents 278c74e + dedd30c commit e710a92

File tree

3 files changed

+54
-46
lines changed

3 files changed

+54
-46
lines changed

epoch.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,6 +1246,9 @@ func (e *Epoch) createBlockVerificationTask(block Block, from NodeID, vote Vote)
12461246

12471247
record := BlockRecord(md, block.Bytes())
12481248
e.WAL.Append(record)
1249+
e.Logger.Debug("Persisted block to WAL",
1250+
zap.Uint64("round", md.Round),
1251+
zap.Stringer("digest", md.Digest))
12491252

12501253
if !e.storeProposal(block) {
12511254
e.Logger.Warn("Unable to store proposed block for the round", zap.Stringer("NodeID", from), zap.Uint64("round", md.Round))
@@ -1556,6 +1559,9 @@ func (e *Epoch) triggerProposalWaitTimeExpired(round uint64) {
15561559
e.Logger.Error("Failed appending empty vote", zap.Error(err))
15571560
return
15581561
}
1562+
e.Logger.Debug("Persisted empty vote to WAL",
1563+
zap.Uint64("round", round),
1564+
zap.Int("size", len(emptyVoteRecord)))
15591565

15601566
emptyVotes := e.getOrCreateEmptyVoteSetForRound(round)
15611567
emptyVotes.timedOut = true

epoch_failover_test.go

Lines changed: 27 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,17 @@ import (
1616
)
1717

1818
func TestEpochLeaderFailover(t *testing.T) {
19-
timeoutDetected := make(chan struct{})
20-
2119
l := testutil.MakeLogger(t, 1)
22-
l.Intercept(func(entry zapcore.Entry) error {
23-
if entry.Message == `Timed out on block agreement` {
24-
close(timeoutDetected)
25-
}
26-
return nil
27-
})
2820

2921
bb := &testBlockBuilder{out: make(chan *testBlock, 1), blockShouldBeBuilt: make(chan struct{}, 1)}
3022
storage := newInMemStorage()
3123

3224
nodes := []NodeID{{1}, {2}, {3}, {4}}
3325
quorum := Quorum(len(nodes))
3426

35-
start := time.Now()
36-
3727
wal := newTestWAL(t)
3828

29+
start := time.Now()
3930
conf := EpochConfig{
4031
MaxProposalWait: DefaultMaxProposalWaitTime,
4132
StartTime: start,
@@ -61,20 +52,20 @@ func TestEpochLeaderFailover(t *testing.T) {
6152
// Then, don't do anything and wait for our node
6253
// to start complaining about a block not being notarized
6354

64-
for _, round := range []uint64{0, 1, 2} {
55+
for round := uint64(0); round < 3; round++ {
6556
notarizeAndFinalizeRound(t, nodes, round, round, e, bb, quorum, storage, false)
6657
}
6758

6859
bb.blockShouldBeBuilt <- struct{}{}
6960

70-
waitForEvent(t, start, e, timeoutDetected)
61+
waitForBlockProposerTimeout(t, e, start)
7162

7263
lastBlock, _, ok := storage.Retrieve(storage.Height() - 1)
7364
require.True(t, ok)
7465

7566
prev := lastBlock.BlockHeader().Digest
7667

77-
md := ProtocolMetadata{
68+
emptyBlockMd := ProtocolMetadata{
7869
Round: 3,
7970
Seq: 2,
8071
Prev: prev,
@@ -83,8 +74,8 @@ func TestEpochLeaderFailover(t *testing.T) {
8374
nextBlockSeqToCommit := uint64(3)
8475
nextRoundToCommit := uint64(4)
8576

86-
emptyVoteFrom1 := createEmptyVote(md, nodes[1])
87-
emptyVoteFrom2 := createEmptyVote(md, nodes[2])
77+
emptyVoteFrom1 := createEmptyVote(emptyBlockMd, nodes[1])
78+
emptyVoteFrom2 := createEmptyVote(emptyBlockMd, nodes[2])
8879

8980
e.HandleMessage(&Message{
9081
EmptyVoteMessage: emptyVoteFrom1,
@@ -93,25 +84,25 @@ func TestEpochLeaderFailover(t *testing.T) {
9384
EmptyVoteMessage: emptyVoteFrom2,
9485
}, nodes[2])
9586

96-
// Ensure our node proposes block with sequence 3 for round 4
97-
notarizeAndFinalizeRound(t, nodes, nextRoundToCommit, nextBlockSeqToCommit, e, bb, quorum, storage, false)
98-
99-
// WAL must contain an empty vote and an empty block.
87+
wal.lock.Lock()
10088
walContent, err := wal.ReadAll()
10189
require.NoError(t, err)
90+
wal.lock.Unlock()
10291

103-
// WAL should be: [..., <empty vote>, <empty block>, <notarization for 4>, <block3>]
104-
rawEmptyVote, rawEmptyNotarization := walContent[len(walContent)-4], walContent[len(walContent)-3]
105-
92+
rawEmptyVote, rawEmptyNotarization := walContent[len(walContent)-2], walContent[len(walContent)-1]
10693
emptyVote, err := ParseEmptyVoteRecord(rawEmptyVote)
10794
require.NoError(t, err)
108-
require.Equal(t, createEmptyVote(md, nodes[0]).Vote, emptyVote)
95+
require.Equal(t, createEmptyVote(emptyBlockMd, nodes[0]).Vote, emptyVote)
10996

11097
emptyNotarization, err := EmptyNotarizationFromRecord(rawEmptyNotarization, &testQCDeserializer{t: t})
11198
require.NoError(t, err)
11299
require.Equal(t, emptyVoteFrom1.Vote, emptyNotarization.Vote)
113100
require.Equal(t, uint64(3), emptyNotarization.Vote.Round)
114101
require.Equal(t, uint64(2), emptyNotarization.Vote.Seq)
102+
require.Equal(t, uint64(3), storage.Height())
103+
104+
// Ensure our node proposes block with sequence 3 for round 4
105+
notarizeAndFinalizeRound(t, nodes, nextRoundToCommit, nextBlockSeqToCommit, e, bb, quorum, storage, false)
115106
require.Equal(t, uint64(4), storage.Height())
116107
}
117108

@@ -141,10 +132,9 @@ func TestEpochLeaderFailoverAfterProposal(t *testing.T) {
141132
nodes := []NodeID{{1}, {2}, {3}, {4}}
142133
quorum := Quorum(len(nodes))
143134

144-
start := time.Now()
145-
146135
wal := newTestWAL(t)
147136

137+
start := time.Now()
148138
conf := EpochConfig{
149139
MaxProposalWait: DefaultMaxProposalWaitTime,
150140
StartTime: start,
@@ -194,7 +184,7 @@ func TestEpochLeaderFailoverAfterProposal(t *testing.T) {
194184

195185
bb.blockShouldBeBuilt <- struct{}{}
196186

197-
waitForEvent(t, start, e, timeoutDetected)
187+
waitForBlockProposerTimeout(t, e, start)
198188

199189
for i := 1; i < quorum; i++ {
200190
// Skip the vote of the block proposer
@@ -204,7 +194,7 @@ func TestEpochLeaderFailoverAfterProposal(t *testing.T) {
204194
injectTestVote(t, e, block, nodes[i])
205195
}
206196

207-
waitForEvent(t, start, e, alreadyTimedOut)
197+
waitForBlockProposerTimeout(t, e, start)
208198

209199
lastBlock, _, ok := storage.Retrieve(storage.Height() - 1)
210200
require.True(t, ok)
@@ -254,27 +244,17 @@ func TestEpochLeaderFailoverAfterProposal(t *testing.T) {
254244
}
255245

256246
func TestEpochLeaderFailoverTwice(t *testing.T) {
257-
timeoutDetected := make(chan struct{})
258-
259247
l := testutil.MakeLogger(t, 1)
260-
l.Intercept(func(entry zapcore.Entry) error {
261-
if entry.Message == `Timed out on block agreement` {
262-
close(timeoutDetected)
263-
timeoutDetected = make(chan struct{})
264-
}
265-
return nil
266-
})
267248

268249
bb := &testBlockBuilder{out: make(chan *testBlock, 1), blockShouldBeBuilt: make(chan struct{}, 1)}
269250
storage := newInMemStorage()
270251

271252
nodes := []NodeID{{1}, {2}, {3}, {4}}
272253
quorum := Quorum(len(nodes))
273254

274-
start := time.Now()
275-
276255
wal := newTestWAL(t)
277256

257+
start := time.Now()
278258
conf := EpochConfig{
279259
MaxProposalWait: DefaultMaxProposalWaitTime,
280260
StartTime: start,
@@ -302,7 +282,7 @@ func TestEpochLeaderFailoverTwice(t *testing.T) {
302282

303283
bb.blockShouldBeBuilt <- struct{}{}
304284

305-
waitForEvent(t, start, e, timeoutDetected)
285+
waitForBlockProposerTimeout(t, e, start)
306286

307287
lastBlock, _, ok := storage.Retrieve(storage.Height() - 1)
308288
require.True(t, ok)
@@ -331,7 +311,7 @@ func TestEpochLeaderFailoverTwice(t *testing.T) {
331311

332312
bb.blockShouldBeBuilt <- struct{}{}
333313

334-
waitForEvent(t, start, e, timeoutDetected)
314+
waitForBlockProposerTimeout(t, e, start)
335315

336316
md = ProtocolMetadata{
337317
Round: 3,
@@ -387,17 +367,18 @@ func createEmptyVote(md ProtocolMetadata, signer NodeID) *EmptyVote {
387367
return emptyVoteFrom2
388368
}
389369

390-
func waitForEvent(t *testing.T, start time.Time, e *Epoch, events chan struct{}) {
391-
now := start
370+
func waitForBlockProposerTimeout(t *testing.T, e *Epoch, startTime time.Time) {
371+
startRound := e.Metadata().Round
392372
timeout := time.NewTimer(time.Minute)
393373
defer timeout.Stop()
394374

395375
for {
396-
now = now.Add(e.EpochConfig.MaxProposalWait / 5)
397-
e.AdvanceTime(now)
398-
select {
399-
case <-events:
376+
if e.WAL.(*testWAL).containsEmptyVote(startRound) {
400377
return
378+
}
379+
startTime = startTime.Add(e.EpochConfig.MaxProposalWait / 5)
380+
e.AdvanceTime(startTime)
381+
select {
401382
case <-time.After(time.Millisecond * 10):
402383
continue
403384
case <-timeout.C:

epoch_multinode_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,27 @@ func (tw *testWAL) assertNotarization(round uint64) {
205205

206206
}
207207

208+
func (tw *testWAL) containsEmptyVote(round uint64) bool {
209+
tw.lock.Lock()
210+
defer tw.lock.Unlock()
211+
212+
rawRecords, err := tw.WriteAheadLog.ReadAll()
213+
require.NoError(tw.t, err)
214+
215+
for _, rawRecord := range rawRecords {
216+
if binary.BigEndian.Uint16(rawRecord[:2]) == record.EmptyVoteRecordType {
217+
vote, err := ParseEmptyVoteRecord(rawRecord)
218+
require.NoError(tw.t, err)
219+
220+
if vote.Round == round {
221+
return true
222+
}
223+
}
224+
}
225+
226+
return false
227+
}
228+
208229
type testComm struct {
209230
from NodeID
210231
net *inMemNetwork

0 commit comments

Comments
 (0)