Skip to content

Commit 81d38ce

Browse files
committed
digest request
1 parent a69b84e commit 81d38ce

File tree

7 files changed

+163257
-33
lines changed

7 files changed

+163257
-33
lines changed

epoch.go

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ func (e *Epoch) HandleMessage(msg *Message, from NodeID) error {
167167
return e.handleReplicationResponse(msg.ReplicationResponse, from)
168168
case msg.ReplicationRequest != nil && e.ReplicationEnabled:
169169
return e.handleReplicationRequest(msg.ReplicationRequest, from)
170+
case msg.BlockDigestRequest != nil:
171+
return e.handleBlockDigestRequest(msg.BlockDigestRequest, from)
170172
default:
171173
e.Logger.Debug("Invalid message type", zap.Stringer("from", from))
172174
return nil
@@ -623,7 +625,7 @@ func (e *Epoch) handleFinalizeVoteMessage(message *FinalizeVote, from NodeID) er
623625
vote := message.Finalization
624626

625627
e.Logger.Verbo("Received finalize vote",
626-
zap.Stringer("from", from), zap.Uint64("round", vote.Round))
628+
zap.Stringer("from", from), zap.Uint64("round", vote.Round), zap.Uint64("nextSeqToCommit", e.nextSeqToCommit()))
627629

628630
// Only process a point to point finalizations.
629631
// This is needed to prevent a malicious node from sending us a finalization of a different node for a future round.
@@ -1113,7 +1115,7 @@ func (e *Epoch) rebroadcastPastFinalizeVotes() error {
11131115
}
11141116
finalizeVoteMessage = msg
11151117
}
1116-
e.Logger.Debug("Rebroadcasting finalization", zap.Uint64("round", r), zap.Uint64("seq", finalizeVoteMessage.FinalizeVote.Finalization.Seq))
1118+
e.Logger.Debug("Rebroadcasting finalize vote", zap.Uint64("round", r), zap.Uint64("seq", finalizeVoteMessage.FinalizeVote.Finalization.Seq))
11171119
e.Comm.Broadcast(finalizeVoteMessage)
11181120
}
11191121

@@ -2886,6 +2888,35 @@ func (e *Epoch) haveNotFinalizedNotarizedRound() (uint64, bool) {
28862888
return minRoundNum, found
28872889
}
28882890

2891+
func (e *Epoch) handleBlockDigestRequest(req *BlockDigestRequest, from NodeID) error {
2892+
e.Logger.Debug("Received block digest request", zap.Stringer("from", from), zap.Uint64("seq", req.Seq))
2893+
block, notarizationOrFinalization, ok := e.locateBlock(req.Seq, req.Digest[:])
2894+
2895+
if !ok {
2896+
e.Logger.Debug("Block not found for digest request", zap.Uint64("seq", req.Seq), zap.Stringer("digest", req.Digest))
2897+
return nil
2898+
}
2899+
2900+
if notarizationOrFinalization == nil {
2901+
e.Logger.Debug("No notarization or finalization found for block digest request", zap.Uint64("seq", req.Seq), zap.Stringer("digest", req.Digest))
2902+
return nil
2903+
}
2904+
2905+
qr := VerifiedQuorumRound{
2906+
VerifiedBlock: block,
2907+
Notarization: notarizationOrFinalization.Notarization,
2908+
Finalization: notarizationOrFinalization.Finalization,
2909+
}
2910+
2911+
response := &VerifiedReplicationResponse{
2912+
Data: []VerifiedQuorumRound{qr},
2913+
}
2914+
2915+
msg := &Message{VerifiedReplicationResponse: response}
2916+
e.Comm.Send(msg, from)
2917+
return nil
2918+
}
2919+
28892920
func (e *Epoch) handleReplicationResponse(resp *ReplicationResponse, from NodeID) error {
28902921
if !e.ReplicationEnabled {
28912922
return nil

msg.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type Message struct {
2222
ReplicationResponse *ReplicationResponse
2323
VerifiedReplicationResponse *VerifiedReplicationResponse
2424
ReplicationRequest *ReplicationRequest
25+
BlockDigestRequest *BlockDigestRequest
2526
}
2627

2728
type EmptyVoteMetadata struct {
@@ -359,3 +360,8 @@ type VerifiedFinalizedBlock struct {
359360
type verifiableMessage interface {
360361
Verify() error
361362
}
363+
364+
type BlockDigestRequest struct {
365+
Seq uint64
366+
Digest Digest
367+
}

out.txt

Lines changed: 163125 additions & 0 deletions
Large diffs are not rendered by default.

replication_state.go

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,18 @@ import (
1212
"go.uber.org/zap"
1313
)
1414

15+
type missingBlock struct {
16+
digest Digest
17+
seq uint64
18+
}
19+
20+
func missingBlockFromBlock(block Block) missingBlock {
21+
return missingBlock{
22+
digest: block.BlockHeader().Digest,
23+
seq: block.BlockHeader().Seq,
24+
}
25+
}
26+
1527
type finalizedQuorumRound struct {
1628
block Block
1729
finalization *Finalization
@@ -33,7 +45,7 @@ type ReplicationState struct {
3345
// it means a prior notarization for that block exists but is missing.
3446
// Since we may not know which round that dependency belongs to,
3547
// digestTimeouts ensures we re-request the missing digest until it arrives.
36-
digestTimeouts *TimeoutHandler[Digest]
48+
digestTimeouts *TimeoutHandler[missingBlock]
3749

3850
// emptyRoundTimeouts handles timeouts for fetching missing empty round notarizations.
3951
// When replication encounters a notarized block that depends on an empty round we haven't received,
@@ -42,6 +54,9 @@ type ReplicationState struct {
4254

4355
roundRequestor *requestor
4456
finalizationRequestor *requestor
57+
58+
sender sender
59+
epochLock *sync.Mutex
4560
}
4661

4762
func NewReplicationState(logger Logger, comm Communication, myNodeID NodeID, maxRoundWindow uint64, enabled bool, start time.Time, lock *sync.Mutex) *ReplicationState {
@@ -64,6 +79,10 @@ func NewReplicationState(logger Logger, comm Communication, myNodeID NodeID, max
6479
// round replication
6580
rounds: make(map[uint64]*QuorumRound),
6681
roundRequestor: newRequestor(logger, start, lock, maxRoundWindow, comm, false),
82+
83+
// sender for missing dependencies
84+
sender: comm,
85+
epochLock: lock,
6786
}
6887

6988
r.digestTimeouts = NewTimeoutHandler(logger, "digest", start, DefaultReplicationRequestTimeout, r.requestDigests)
@@ -110,15 +129,15 @@ func (r *ReplicationState) storeSequence(block Block, finalization *Finalization
110129
}
111130

112131
r.finalizationRequestor.removeTask(finalization.Finalization.Seq)
113-
r.digestTimeouts.RemoveTask(block.BlockHeader().Digest)
132+
r.digestTimeouts.RemoveTask(missingBlockFromBlock(block))
114133
}
115134

116135
// storeRound adds or updates a quorum round in the replication state.
117136
// If the round already exists, it merges any missing notarizations or empty notarizations
118137
// from the provided quorum round. Otherwise, it stores the new round as is.
119138
func (r *ReplicationState) storeRound(qr *QuorumRound) {
120139
if qr.Block != nil {
121-
r.digestTimeouts.RemoveTask(qr.Block.BlockHeader().Digest)
140+
r.digestTimeouts.RemoveTask(missingBlockFromBlock(qr.Block))
122141
}
123142

124143
existing, exists := r.rounds[qr.GetRound()]
@@ -222,7 +241,7 @@ func (r *ReplicationState) ResendFinalizationRequest(seq uint64, signers []NodeI
222241
// TODO: in a future PR, these requests will be sent as specific digest requests.
223242
func (r *ReplicationState) CreateDependencyTasks(parent *Digest, parentSeq uint64, emptyRounds []uint64) {
224243
if parent != nil {
225-
r.digestTimeouts.AddTask(*parent)
244+
r.digestTimeouts.AddTask(missingBlock{digest: *parent, seq: parentSeq})
226245
}
227246

228247
if len(emptyRounds) > 0 {
@@ -296,9 +315,41 @@ func (r *ReplicationState) GetBlockWithSeq(seq uint64) Block {
296315
return nil
297316
}
298317

299-
func (r *ReplicationState) requestDigests(digests []Digest) {
300-
// TODO: In a future PR, I will add a message that requests a specific digest.
301-
r.logger.Debug("Not implemented yet", zap.Stringers("Digests", digests))
318+
func (r *ReplicationState) requestDigests(missingBlocks []missingBlock) {
319+
// grab the lock since this is called in the timeout handler goroutine
320+
r.epochLock.Lock()
321+
defer r.epochLock.Unlock()
322+
323+
signedQuorum := r.roundRequestor.getHighestObserved()
324+
if signedQuorum == nil {
325+
signedQuorum = r.finalizationRequestor.getHighestObserved()
326+
}
327+
if signedQuorum == nil {
328+
r.logger.Warn("Replication State cannot request missing block digests, no known nodes to request from")
329+
return
330+
}
331+
332+
randInt, err := rand.Int(rand.Reader, big.NewInt(int64(len(signedQuorum.signers))))
333+
if err != nil {
334+
r.logger.Info("Replication State failed to generate random starting index", zap.Error(err))
335+
return
336+
}
337+
startingIndex := int(randInt.Int64())
338+
339+
for i, mb := range missingBlocks {
340+
// grab the node to send it to
341+
index := (i + startingIndex) % len(signedQuorum.signers)
342+
node := signedQuorum.signers[index]
343+
344+
r.logger.Debug("Replication State requesting missing block digest", zap.Uint64("seq", mb.seq), zap.Stringer("digest", &mb.digest))
345+
blockRequest := &BlockDigestRequest{
346+
Seq: mb.seq,
347+
Digest: mb.digest,
348+
}
349+
r.sender.Send(&Message{
350+
BlockDigestRequest: blockRequest,
351+
}, node)
352+
}
302353
}
303354

304355
func (r *ReplicationState) requestEmptyRounds(emptyRounds []uint64) {

replication_test.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1328,7 +1328,6 @@ func allowFinalizeVotes(msg *simplex.Message, from, to simplex.NodeID) bool {
13281328
// TestReplicationChain tests that a node can both empty notarizations and notarizations for the same round.
13291329
func TestReplicationChain(t *testing.T) {
13301330
// Digest message requests are needed for this test
1331-
t.Skip()
13321331
nodes := []simplex.NodeID{{1}, {2}, {3}, {4}}
13331332
net := NewInMemNetwork(t, nodes)
13341333

@@ -1348,11 +1347,11 @@ func TestReplicationChain(t *testing.T) {
13481347
fullNode2.Silence()
13491348
// node 3 will not receive finalize votes & finalizations
13501349
blockFinalize3 := NewSimplexNode(t, nodes[2], net, newNodeConfig(nodes[2]))
1351-
blockFinalize3.Silence()
1350+
// blockFinalize3.Silence()
13521351
// lagging node is disconnected initially. It initially receives only empty notarizations
13531352
// but then later receives notarizations and must send finalize votes for them
13541353
laggingNode := NewSimplexNode(t, nodes[3], net, newNodeConfig(nodes[3]))
1355-
1354+
laggingNode.Silence()
13561355
net.StartInstances()
13571356
net.Disconnect(laggingNode.E.ID)
13581357

@@ -1408,7 +1407,32 @@ func TestReplicationChain(t *testing.T) {
14081407
net.SetAllNodesMessageFilter(allowFinalizeVotes)
14091408

14101409
for _, n := range net.Instances {
1411-
n.TickUntilRoundAdvanced(numNotarizations+1, simplex.DefaultReplicationRequestTimeout)
1412-
require.Equal(t, numNotarizations+1-missedNotarizations, n.Storage.NumBlocks())
1410+
// the message filter blocks finalizations for blockFinalize3
1411+
if n.E.ID.Equals(blockFinalize3.E.ID) {
1412+
continue
1413+
}
1414+
1415+
for {
1416+
numBlocks := n.Storage.NumBlocks()
1417+
if numBlocks >= numNotarizations-missedNotarizations {
1418+
break
1419+
}
1420+
1421+
net.AdvanceTime(simplex.DefaultReplicationRequestTimeout)
1422+
}
1423+
}
1424+
1425+
// just for fun check the blockFinalize3 also replicated properly
1426+
net.SetAllNodesMessageFilter(AllowAllMessages)
1427+
for {
1428+
numBlocks := blockFinalize3.Storage.NumBlocks()
1429+
fmt.Println("blockFinalize3 num blocks:", numBlocks)
1430+
if numBlocks == numNotarizations-missedNotarizations {
1431+
break
1432+
}
1433+
1434+
net.AdvanceTime(simplex.DefaultReplicationRequestTimeout)
1435+
time.Sleep(100 * time.Millisecond)
1436+
// break
14131437
}
14141438
}

testutil/network.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,3 +162,9 @@ func (n *InMemNetwork) AdvanceWithoutLeader(round uint64, laggingNodeId simplex.
162162
require.Equal(n.t, record.EmptyNotarizationRecordType, recordType)
163163
}
164164
}
165+
166+
func (n *InMemNetwork) AdvanceTime(duration time.Duration) {
167+
for _, instance := range n.Instances {
168+
instance.AdvanceTime(duration)
169+
}
170+
}

testutil/node.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -135,22 +135,3 @@ func (t *TestNode) TimeoutOnRound(round uint64) {
135135
time.Sleep(50 * time.Millisecond)
136136
}
137137
}
138-
139-
func (t *TestNode) TickUntilRoundAdvanced(round uint64, tick time.Duration) {
140-
timeout := time.NewTimer(time.Minute)
141-
defer timeout.Stop()
142-
143-
for {
144-
if t.E.Metadata().Round >= round {
145-
return
146-
}
147-
148-
select {
149-
case <-time.After(time.Millisecond * 10):
150-
t.AdvanceTime(tick)
151-
continue
152-
case <-timeout.C:
153-
require.Fail(t.t, "timed out waiting to enter round", "current round %d, waiting for round %d", t.E.Metadata().Round, round)
154-
}
155-
}
156-
}

0 commit comments

Comments
 (0)