Skip to content

Commit 53dc347

Browse files
committed
Add 4RoundRBC test with dead nodes
Modify receiver handler to use Receive method instead of AddHandler rename method in tests, fix non-working and unify timeouts
1 parent 43c2392 commit 53dc347

File tree

3 files changed

+285
-152
lines changed

3 files changed

+285
-152
lines changed

rbc/fourRounds/four_rounds_rbc.go

Lines changed: 36 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func NewFourRoundRBC[M any](pred func([]M) bool, h hash.Hash, threshold int,
6868
nbNodes: nbNodes,
6969
finalValue: nil,
7070
finished: false,
71-
log: log.New(os.Stdout, fmt.Sprintf("[%d]", i), log.LstdFlags|log.Lshortfile),
71+
log: log.New(os.Stdout, fmt.Sprintf("[%d] ", i), log.LstdFlags|log.Lshortfile),
7272
i: i,
7373
}
7474
}
@@ -103,20 +103,37 @@ func (frRbc *FourRoundRBC[M]) broadcast(ms []M) error {
103103
return err
104104
}
105105

106+
func (frRbc *FourRoundRBC[M]) start(cancelFunc context.CancelFunc) {
107+
go func() {
108+
for {
109+
bs, err := frRbc.iface.Receive()
110+
if err != nil {
111+
frRbc.log.Printf("Error receiving: %v", err)
112+
continue
113+
}
114+
msg := &typedefs.Instruction{}
115+
err = proto.Unmarshal(bs, msg)
116+
if err != nil {
117+
frRbc.log.Printf("Error unmasharalling: %v", err)
118+
continue
119+
}
120+
err, finished := frRbc.handleMessage(msg)
121+
if err != nil {
122+
frRbc.log.Printf("Error handling message: %v", err)
123+
continue
124+
}
125+
if finished {
126+
frRbc.log.Printf("Protocol terminated")
127+
cancelFunc()
128+
return
129+
}
130+
}
131+
}()
132+
}
133+
106134
func (frRbc *FourRoundRBC[M]) RBroadcast(m []M) error {
107135
ctx, cancel := context.WithCancel(context.Background())
108-
frRbc.iface.AddHandler(func(bytes []byte) error {
109-
msg := &typedefs.Instruction{}
110-
err := proto.Unmarshal(bytes, msg)
111-
if err != nil {
112-
return err
113-
}
114-
err, finished := frRbc.handleMessage(msg)
115-
if finished { // TODO check if we should make sure err is nil before reading finished (in theory no if we always return false in case of an error)
116-
cancel()
117-
}
118-
return err
119-
})
136+
frRbc.start(cancel)
120137

121138
// Send the broadcast
122139
err := frRbc.broadcast(m)
@@ -126,35 +143,14 @@ func (frRbc *FourRoundRBC[M]) RBroadcast(m []M) error {
126143
}
127144

128145
<-ctx.Done()
129-
130146
return nil
131147
}
132148

133149
func (frRbc *FourRoundRBC[M]) Listen() error {
134150
ctx, cancel := context.WithCancel(context.Background())
135-
frRbc.iface.AddHandler(func(received []byte) error {
136-
// TODO make this handler method re-usable as it is exactly the same used in the RBroadcast method
137-
if received == nil {
138-
return nil
139-
}
140-
msg := &typedefs.Instruction{}
141-
err := proto.Unmarshal(received, msg)
142-
if err != nil {
143-
return err
144-
}
145-
if err != nil {
146-
return err
147-
}
148-
err, finished := frRbc.handleMessage(msg)
149-
if finished {
150-
cancel()
151-
}
152-
return nil
153-
})
151+
frRbc.start(cancel)
154152

155-
// Now the handler registered above will take over the control flow, and we just need to wait for the context
156153
<-ctx.Done()
157-
158154
return nil
159155
}
160156

@@ -252,7 +248,7 @@ func (frRbc *FourRoundRBC[M]) receiveEcho(msg *typedefs.Message_Echo) error {
252248
hashReady := frRbc.checkReadyThreshold(frRbc.readyCounts.GetOrDefault(string(msg.H), 0))
253249

254250
// Check if we received enough (taking into account if we already received enough ready messages for that hash
255-
if frRbc.checkEchoThreshold(count+1, hashReady) && !frRbc.sentReady {
251+
if frRbc.checkEchoThreshold(count, hashReady) && !frRbc.sentReady {
256252
sendReady = true
257253
}
258254

@@ -284,17 +280,17 @@ func (frRbc *FourRoundRBC[M]) receiveReady(msg *typedefs.Message_Ready) bool {
284280
if !ok {
285281
return 1
286282
}
283+
count += 1
287284

288285
// If enough READY messages have been received and enough ECHO messages, then send a READY message
289286
// if not already sent
290287
echoes, ok := frRbc.echoCount.Get(string(msg.H))
291-
if ok && !frRbc.sentReady && frRbc.checkReadyThreshold(count+1) && frRbc.checkReadyThreshold(echoes) {
288+
if ok && !frRbc.sentReady && frRbc.checkReadyThreshold(count) && frRbc.checkReadyThreshold(echoes) {
292289
sendReady = true
293290
}
294291

295292
// Return the new count to be set
296-
frRbc.log.Printf("Received %d ready message(s)", count+1)
297-
return count + 1
293+
return count
298294
})
299295

300296
if sendReady {
@@ -356,7 +352,7 @@ func (frRbc *FourRoundRBC[M]) receiveReady(msg *typedefs.Message_Ready) bool {
356352

357353
func (frRbc *FourRoundRBC[M]) reconstruct(expHash []byte) ([]M, bool, error) {
358354
for ri := 0; ri < frRbc.r; ri++ {
359-
if len(frRbc.th) < 2*frRbc.threshold+frRbc.r+1 {
355+
if len(frRbc.th) < 2*frRbc.threshold+ri+1 {
360356
// If it is not the case now, it won't be in the next iteration since r increases
361357
return nil, false, nil
362358
}

0 commit comments

Comments
 (0)