Skip to content

Commit d57b222

Browse files
committed
Fix data race(s) in replication code and tests
This commit fixes two data races: 1. We send requests to nodes for blocks asynchronously but also process messages in parallel, without synchronization. 2. The 'collectNotarizationComm' communication mock can be invoked from the test without synchronizing with the communication goroutines. Signed-off-by: Yacov Manevich <yacov.manevich@avalabs.org>
1 parent b01c158 commit d57b222

File tree

3 files changed

+10
-2
lines changed

3 files changed

+10
-2
lines changed

epoch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func (e *Epoch) init() error {
183183
e.maxPendingBlocks = DefaultMaxPendingBlocks
184184
e.eligibleNodeIDs = make(map[string]struct{}, len(e.nodes))
185185
e.futureMessages = make(messagesFromNode, len(e.nodes))
186-
e.replicationState = NewReplicationState(e.Logger, e.Comm, e.ID, e.maxRoundWindow, e.ReplicationEnabled, e.StartTime)
186+
e.replicationState = NewReplicationState(e.Logger, e.Comm, e.ID, e.maxRoundWindow, e.ReplicationEnabled, e.StartTime, &e.lock)
187187
e.timeoutHandler = NewTimeoutHandler(e.Logger, e.StartTime, e.nodes)
188188

189189
for _, node := range e.nodes {

replication.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"math"
99
"slices"
10+
"sync"
1011
"time"
1112

1213
"go.uber.org/zap"
@@ -40,6 +41,7 @@ func newSignedSequenceFromRound(round QuorumRound) (*signedSequence, error) {
4041
}
4142

4243
type ReplicationState struct {
44+
lock *sync.Mutex
4345
logger Logger
4446
enabled bool
4547
maxRoundWindow uint64
@@ -61,8 +63,9 @@ type ReplicationState struct {
6163
timeoutHandler *TimeoutHandler
6264
}
6365

64-
func NewReplicationState(logger Logger, comm Communication, id NodeID, maxRoundWindow uint64, enabled bool, start time.Time) *ReplicationState {
66+
func NewReplicationState(logger Logger, comm Communication, id NodeID, maxRoundWindow uint64, enabled bool, start time.Time, lock *sync.Mutex) *ReplicationState {
6567
return &ReplicationState{
68+
lock: lock,
6669
logger: logger,
6770
enabled: enabled,
6871
comm: comm,
@@ -157,6 +160,8 @@ func (r *ReplicationState) sendRequestToNode(start uint64, end uint64, nodes []N
157160

158161
func (r *ReplicationState) createReplicationTimeoutTask(start, end uint64, nodes []NodeID, index int) *TimeoutTask {
159162
taskFunc := func() {
163+
r.lock.Lock()
164+
defer r.lock.Unlock()
160165
r.sendRequestToNode(start, end, nodes, (index+1)%len(nodes))
161166
}
162167
timeoutTask := &TimeoutTask{

replication_timeout_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,9 @@ func (c *collectNotarizationComm) Broadcast(msg *simplex.Message) {
327327
}
328328

329329
func (c *collectNotarizationComm) removeFinalizationsFromReplicationResponses(msg *simplex.Message, from, to simplex.NodeID) bool {
330+
c.lock.Lock()
331+
defer c.lock.Unlock()
332+
330333
if msg.VerifiedReplicationResponse != nil || msg.ReplicationResponse != nil {
331334
newData := make([]simplex.VerifiedQuorumRound, 0, len(msg.VerifiedReplicationResponse.Data))
332335

0 commit comments

Comments
 (0)