Skip to content

Commit 3e7a3bf

Browse files
authored
Rebroadcast immediately once a round threshold is met (#1003)
* Rebroadcast immediately once a round threshold is met The original GPBFT protocol dictates that the rebroadcast only gets triggered after the phase timeout has expired. This is reasonable for low number of rounds, where the vast majority of instances finalize within a single round. However, there is an edge-case where when an instance enters more than a few rounds the exponential increase of phase timeout results in extended periods of total radio silence. As a result, the state propagation is hindered greatly, which in turn increases the chances of the instance taking even longer to finalize. The changes here introduce a threshold at which rebroadcast is scheduled without waiting for phase timeout to expire first (defaulting to 3). In this design, the rebroadcast and successive rebroadcasts work exactly the same way as dictated in GPBFT design, except the trigger is adjsuted to consider rounds larger than the threshold as "insufficient progress". The implementation reverts back to the vanilla GPBFT phase timeout as the phases progress and resets rebroadcast parameters accordingly A test is added to assert that when an instance skips to future rounds, rebroadcast is triggered without wait, and continues to do so after phase change. * Refine logging
1 parent 7b90493 commit 3e7a3bf

File tree

5 files changed

+142
-25
lines changed

5 files changed

+142
-25
lines changed

emulator/driver.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"errors"
77
"testing"
8+
"time"
89

910
"github.com/filecoin-project/go-f3/gpbft"
1011
"github.com/stretchr/testify/require"
@@ -105,3 +106,12 @@ func (d *Driver) deliverMessage(msg *gpbft.GMessage) error {
105106
return d.subject.ReceiveMessage(ctx, validated)
106107
}
107108
}
109+
110+
// AdvanceTimeBy advances the current time of the driver by the given amount.
111+
// This allows the driver to simulate the passage of time in the emulated
112+
// gpbft.Participant. This is useful for testing timeouts and other time-based
113+
// behavior in the gpbft.Participant in high number of rounds, which
114+
// exponentially increases the phase timeout.
115+
func (d *Driver) AdvanceTimeBy(t time.Duration) {
116+
d.host.now = d.host.now.Add(t)
117+
}

emulator/host.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ func (h *driverHost) maybeReceiveAlarm() bool {
3939
if h.pendingAlarm == nil {
4040
return false
4141
}
42-
h.now = *h.pendingAlarm
42+
if h.now.Before(*h.pendingAlarm) {
43+
h.now = *h.pendingAlarm
44+
}
4345
h.pendingAlarm = nil
4446
return true
4547
}

gpbft/gpbft.go

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ func (i *instance) tryQuality() error {
550550
// Wait either for a strong quorum that agree on our proposal, or for the timeout
551551
// to expire.
552552
foundQuorum := i.quality.HasStrongQuorumFor(i.proposal.Key())
553-
timeoutExpired := atOrAfter(i.participant.host.Time(), i.phaseTimeout)
553+
timeoutExpired := i.phaseTimeoutElapsed()
554554

555555
if foundQuorum || timeoutExpired {
556556
// If strong quorum of input is found the proposal will remain unchanged.
@@ -606,8 +606,11 @@ func (i *instance) tryConverge() error {
606606
return fmt.Errorf("unexpected phase %s, expected %s", i.current.Phase, CONVERGE_PHASE)
607607
}
608608
// The CONVERGE phase timeout doesn't wait to hear from >⅔ of power.
609-
timeoutExpired := atOrAfter(i.participant.host.Time(), i.phaseTimeout)
609+
timeoutExpired := i.phaseTimeoutElapsed()
610610
if !timeoutExpired {
611+
if i.shouldRebroadcast() {
612+
i.tryRebroadcast()
613+
}
611614
return nil
612615
}
613616
commitRoundState := i.getRound(i.current.Round - 1).committed
@@ -666,9 +669,8 @@ func (i *instance) tryPrepare() error {
666669
prepared := i.getRound(i.current.Round).prepared
667670
proposalKey := i.proposal.Key()
668671
foundQuorum := prepared.HasStrongQuorumFor(proposalKey)
669-
timedOut := atOrAfter(i.participant.host.Time(), i.phaseTimeout)
670672
quorumNotPossible := !prepared.CouldReachStrongQuorumFor(proposalKey, false)
671-
phaseComplete := timedOut && prepared.ReceivedFromStrongQuorum()
673+
phaseComplete := i.phaseTimeoutElapsed() && prepared.ReceivedFromStrongQuorum()
672674

673675
if foundQuorum {
674676
i.value = i.proposal
@@ -678,7 +680,7 @@ func (i *instance) tryPrepare() error {
678680

679681
if foundQuorum || quorumNotPossible || phaseComplete {
680682
i.beginCommit()
681-
} else if timedOut {
683+
} else if i.shouldRebroadcast() {
682684
i.tryRebroadcast()
683685
}
684686
return nil
@@ -714,8 +716,7 @@ func (i *instance) tryCommit(round uint64) error {
714716
// no check on the current phase.
715717
committed := i.getRound(round).committed
716718
quorumValue, foundStrongQuorum := committed.FindStrongQuorumValue()
717-
timedOut := atOrAfter(i.participant.host.Time(), i.phaseTimeout)
718-
phaseComplete := timedOut && committed.ReceivedFromStrongQuorum()
719+
phaseComplete := i.phaseTimeoutElapsed() && committed.ReceivedFromStrongQuorum()
719720

720721
switch {
721722
case foundStrongQuorum && !quorumValue.IsZero():
@@ -751,7 +752,7 @@ func (i *instance) tryCommit(round uint64) error {
751752
}
752753
}
753754
i.beginNextRound()
754-
case timedOut:
755+
case i.shouldRebroadcast():
755756
// The phase has timed out. Attempt to re-broadcast messages.
756757
i.tryRebroadcast()
757758
}
@@ -940,16 +941,39 @@ func (i *instance) tryRebroadcast() {
940941
// instance phase and schedule the first alarm:
941942
// * If in DECIDE phase, use current time as offset. Because, DECIDE phase does
942943
// not have any phase timeout and may be too far in the past.
944+
// * If the current phase is beyond the immediate rebroadcast threshold, use
945+
// the current time as offset to avoid extended periods of radio silence
946+
// when phase timeout grows exponentially large.
943947
// * Otherwise, use the phase timeout.
944948
var rebroadcastTimeoutOffset time.Time
945-
if i.current.Phase == DECIDE_PHASE {
949+
if i.current.Phase == DECIDE_PHASE || i.current.Round > i.participant.rebroadcastImmediatelyAfterRound {
946950
rebroadcastTimeoutOffset = i.participant.host.Time()
947951
} else {
948952
rebroadcastTimeoutOffset = i.phaseTimeout
949953
}
950954
i.rebroadcastTimeout = rebroadcastTimeoutOffset.Add(i.participant.rebroadcastAfter(0))
951-
i.participant.host.SetAlarm(i.rebroadcastTimeout)
952-
i.log("scheduled initial rebroadcast at %v", i.rebroadcastTimeout)
955+
if i.phaseTimeoutElapsed() {
956+
// The phase timeout has already elapsed; therefore, there's no risk of
957+
// overriding any existing alarm. Simply set the alarm for rebroadcast.
958+
i.participant.host.SetAlarm(i.rebroadcastTimeout)
959+
i.log("scheduled initial rebroadcast at %v", i.rebroadcastTimeout)
960+
} else if i.rebroadcastTimeout.Before(i.phaseTimeout) {
961+
// The rebroadcast timeout is set before the phase timeout; therefore, it should
962+
// trigger before the phase timeout. Override the alarm with rebroadcast timeout
963+
// and check for phase timeout in the next cycle of rebroadcast.
964+
i.participant.host.SetAlarm(i.rebroadcastTimeout)
965+
i.log("scheduled initial rebroadcast at %v before phase timeout at %v", i.rebroadcastTimeout, i.phaseTimeout)
966+
} else {
967+
// The phase timeout is set before the rebroadcast timeout. Therefore, there must
968+
// have been an alarm set already for the phase. Do nothing, because the GPBFT
969+
// process loop will trigger the phase alarm, which in turn tries the current
970+
// phase and eventually will try rebroadcast.
971+
//
972+
// Therefore, reset the rebroadcast parameters to re-attempt setting the initial
973+
// rebroadcast timeout once the phase expires.
974+
i.log("Resetting rebroadcast as rebroadcast timeout at %v is after phase timeout at %v and the current phase has not timed out yet.", i.rebroadcastTimeout, i.phaseTimeout)
975+
i.resetRebroadcastParams()
976+
}
953977
case i.rebroadcastTimeoutElapsed():
954978
// Rebroadcast now that the corresponding timeout has elapsed, and schedule the
955979
// successive rebroadcast.
@@ -958,13 +982,27 @@ func (i *instance) tryRebroadcast() {
958982

959983
// Use current host time as the offset for the next alarm to assure that rate of
960984
// broadcasted messages grows relative to the actual time at which an alarm is
961-
// triggered , not the absolute alarm time. This would avoid a "runaway
985+
// triggered, not the absolute alarm time. This would avoid a "runaway
962986
// rebroadcast" scenario where rebroadcast timeout consistently remains behind
963987
// current time due to the discrepancy between set alarm time and the actual time
964988
// at which the alarm is triggered.
965989
i.rebroadcastTimeout = i.participant.host.Time().Add(i.participant.rebroadcastAfter(i.rebroadcastAttempts))
966-
i.participant.host.SetAlarm(i.rebroadcastTimeout)
967-
i.log("scheduled next rebroadcast at %v", i.rebroadcastTimeout)
990+
if i.phaseTimeoutElapsed() {
991+
// The phase timeout has already elapsed; therefore, there's no risk of
992+
// overriding any existing alarm. Simply set the alarm for rebroadcast.
993+
i.participant.host.SetAlarm(i.rebroadcastTimeout)
994+
i.log("scheduled next rebroadcast at %v", i.rebroadcastTimeout)
995+
} else if i.rebroadcastTimeout.Before(i.phaseTimeout) {
996+
// The rebroadcast timeout is set before the phase timeout; therefore, it should
997+
// trigger before the phase timeout. Override the alarm with rebroadcast timeout
998+
// and check for phase timeout in the next cycle of rebroadcast.
999+
i.participant.host.SetAlarm(i.rebroadcastTimeout)
1000+
i.log("scheduled next rebroadcast at %v before phase timeout at %v", i.rebroadcastTimeout, i.phaseTimeout)
1001+
} else {
1002+
// The rebroadcast timeout is set after the phase timeout. Set the alarm for phase timeout instead.
1003+
i.log("Reverted to phase timeout at %v as it is before the next rebroadcast timeout at %v", i.phaseTimeout, i.rebroadcastTimeout)
1004+
i.participant.host.SetAlarm(i.phaseTimeout)
1005+
}
9681006
default:
9691007
// Rebroadcast timeout is set but has not elapsed yet; nothing to do.
9701008
}
@@ -980,6 +1018,14 @@ func (i *instance) rebroadcastTimeoutElapsed() bool {
9801018
return atOrAfter(now, i.rebroadcastTimeout)
9811019
}
9821020

1021+
func (i *instance) shouldRebroadcast() bool {
1022+
return i.phaseTimeoutElapsed() || i.current.Round > i.participant.rebroadcastImmediatelyAfterRound
1023+
}
1024+
1025+
func (i *instance) phaseTimeoutElapsed() bool {
1026+
return atOrAfter(i.participant.host.Time(), i.phaseTimeout)
1027+
}
1028+
9831029
func (i *instance) rebroadcast() {
9841030
// Rebroadcast quality and all messages from the current and previous rounds, unless the
9851031
// instance has progressed to DECIDE phase. In which case, only DECIDE message is

gpbft/gpbft_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/json"
77
"io"
88
"testing"
9+
"time"
910

1011
"github.com/filecoin-project/go-f3/emulator"
1112
"github.com/filecoin-project/go-f3/gpbft"
@@ -522,6 +523,51 @@ func TestGPBFT_WithEvenPowerDistribution(t *testing.T) {
522523
})
523524
driver.RequireDecision(instance.ID(), futureRoundProposal)
524525
})
526+
527+
t.Run("Rebroadcasts independent of phase timeout after 3 rounds", func(t *testing.T) {
528+
instance, driver := newInstanceAndDriver(t)
529+
driver.RequireStartInstance(instance.ID())
530+
driver.RequireQuality()
531+
justification := instance.NewJustification(12, gpbft.PREPARE_PHASE, instance.Proposal(), 0, 1)
532+
driver.RequireDeliverMessage(&gpbft.GMessage{
533+
Sender: 0,
534+
Vote: instance.NewConverge(13, instance.Proposal()),
535+
Ticket: emulator.ValidTicket,
536+
Justification: justification,
537+
})
538+
driver.RequireDeliverMessage(&gpbft.GMessage{
539+
Sender: 0,
540+
Vote: instance.NewPrepare(13, instance.Proposal()),
541+
Ticket: emulator.ValidTicket,
542+
Justification: justification,
543+
})
544+
545+
requireRebroadcast := func() {
546+
driver.RequireQuality()
547+
driver.RequireConverge(13, instance.Proposal(), justification)
548+
driver.RequireNoBroadcast()
549+
}
550+
551+
driver.RequireConverge(13, instance.Proposal(), justification) // Should schedule rebroadcast.
552+
driver.RequireNoBroadcast() // Assert no broadcast until timeout expires.
553+
driver.RequireDeliverAlarm() // Expire the timeout.
554+
requireRebroadcast() // Assert rebroadcast, which should schedule the next rebroadcast.
555+
driver.RequireDeliverAlarm() // Trigger alarm
556+
requireRebroadcast() // Expect rebroadcast again, because phase timeout should be enough in the future since we are in round 13.
557+
driver.AdvanceTimeBy(24 * time.Hour) // Now, advance the clock beyond the rebroadcast timeout.
558+
driver.RequireDeliverAlarm() // Trigger alarm, which should trigger the phase timeout instead of rebroadcast timeout.
559+
driver.RequirePrepareAtRound(13, instance.Proposal(), justification) // Expect progress to PREPARE phase; that is no rebroadcast.
560+
driver.RequireNoBroadcast() // Expect no further broadcast because PREPARE phase is not timed out yet.
561+
562+
// Now, because we are beyond round 3, we should expect rebroadcast even though the phase timeout
563+
// hasn't expired yet. This is because the rebroadcast is set to trigger immediately beyond round 3.
564+
// Therefore, assert that rebroadcast is triggered, and this time it includes a PREPARE message.
565+
driver.RequireDeliverAlarm()
566+
driver.RequireQuality()
567+
driver.RequirePrepareAtRound(13, instance.Proposal(), justification)
568+
driver.RequireConverge(13, instance.Proposal(), justification)
569+
driver.RequireNoBroadcast() // Nothing else should be broadcast until the next alarm.
570+
})
525571
}
526572

527573
func TestGPBFT_WithExactOneThirdToTwoThirdPowerDistribution(t *testing.T) {

gpbft/options.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ type options struct {
2525

2626
qualityDeltaMulti float64
2727

28-
committeeLookback uint64
29-
maxLookaheadRounds uint64
30-
rebroadcastAfter func(int) time.Duration
28+
committeeLookback uint64
29+
maxLookaheadRounds uint64
30+
rebroadcastAfter func(int) time.Duration
31+
rebroadcastImmediatelyAfterRound uint64
3132

3233
maxCachedInstances int
3334
maxCachedMessagesPerInstance int
@@ -38,13 +39,14 @@ type options struct {
3839

3940
func newOptions(o ...Option) (*options, error) {
4041
opts := &options{
41-
delta: defaultDelta,
42-
deltaBackOffExponent: defaultDeltaBackOffExponent,
43-
qualityDeltaMulti: 1.0,
44-
committeeLookback: defaultCommitteeLookback,
45-
rebroadcastAfter: defaultRebroadcastAfter,
46-
maxCachedInstances: defaultMaxCachedInstances,
47-
maxCachedMessagesPerInstance: defaultMaxCachedMessagesPerInstance,
42+
delta: defaultDelta,
43+
deltaBackOffExponent: defaultDeltaBackOffExponent,
44+
qualityDeltaMulti: 1.0,
45+
committeeLookback: defaultCommitteeLookback,
46+
rebroadcastAfter: defaultRebroadcastAfter,
47+
rebroadcastImmediatelyAfterRound: 3,
48+
maxCachedInstances: defaultMaxCachedInstances,
49+
maxCachedMessagesPerInstance: defaultMaxCachedMessagesPerInstance,
4850
}
4951
for _, apply := range o {
5052
if err := apply(opts); err != nil {
@@ -143,6 +145,17 @@ func WithCommitteeLookback(lookback uint64) Option {
143145
}
144146
}
145147

148+
// WithRebroadcastImmediatelyAfterRound sets the round after which rebroadcast is
149+
// scheduled without waiting for phase timeout to expire first.
150+
//
151+
// Defaults to 3 if unset.
152+
func WithRebroadcastImmediatelyAfterRound(round uint64) Option {
153+
return func(o *options) error {
154+
o.rebroadcastImmediatelyAfterRound = round
155+
return nil
156+
}
157+
}
158+
146159
var defaultRebroadcastAfter = exponentialBackoffer(1.3, 0.1, 3*time.Second, 30*time.Second)
147160

148161
// WithRebroadcastBackoff sets the duration after the gPBFT timeout has elapsed, at

0 commit comments

Comments
 (0)