Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app/featureset/featureset.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ const (
// ChainSplitHalt compares locally fetched attestation's target and source to leader's proposed target and source attestation.
// In case they differ, Charon does not sign the attestation.
ChainSplitHalt = "chain_split_halt"

// PrepareProposer enables scheduling and processing of prepare proposer duties.
PrepareProposer = "prepare_proposer"
)

var (
Expand All @@ -88,6 +91,7 @@ var (
QUIC: statusAlpha,
FetchOnlyCommIdx0: statusAlpha,
ChainSplitHalt: statusAlpha,
PrepareProposer: statusAlpha,
// Add all features and their status here.
}

Expand Down
135 changes: 128 additions & 7 deletions core/consensus/qbft/qbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

k1 "github.com/decred/dcrd/dcrec/secp256k1/v4"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -40,15 +41,15 @@ var supportedCompareDuties = []core.DutyType{core.DutyAttester}

// newDefinition returns a qbft definition (this is constant across all consensus instances).
func newDefinition(nodes int, subs func() []subscriber, roundTimer timer.RoundTimer,
decideCallback func(qcommit []qbft.Msg[core.Duty, [32]byte, proto.Message]), compareAttestations bool,
decideCallback func(qcommit []qbft.Msg[core.Duty, [32]byte, proto.Message], value proto.Message),
isLeader func(duty core.Duty, round, process int64) bool,
compareAttestations bool,
) qbft.Definition[core.Duty, [32]byte, proto.Message] {
quorum := qbft.Definition[core.Duty, [32]byte, proto.Message]{Nodes: nodes}.Quorum()

return qbft.Definition[core.Duty, [32]byte, proto.Message]{
// IsLeader is a deterministic leader election function.
IsLeader: func(duty core.Duty, round, process int64) bool {
return leader(duty, round, nodes) == process
},
IsLeader: isLeader,

// Decide sends consensus output to subscribers.
Decide: func(ctx context.Context, duty core.Duty, _ [32]byte, qcommit []qbft.Msg[core.Duty, [32]byte, proto.Message]) {
Expand All @@ -70,7 +71,7 @@ func newDefinition(nodes int, subs func() []subscriber, roundTimer timer.RoundTi
return
}

decideCallback(qcommit)
decideCallback(qcommit, value)

for _, sub := range subs() {
if err := sub(ctx, duty, value); err != nil {
Expand Down Expand Up @@ -279,6 +280,7 @@ func NewConsensus(p2pNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKe
compareAttestations: compareAttestations,
}
c.mutable.instances = make(map[core.Duty]*instance.IO[Msg])
c.prepareParticipation.data = make(map[uint64][]int64)

return c, nil
}
Expand Down Expand Up @@ -307,6 +309,15 @@ type Consensus struct {

instances map[core.Duty]*instance.IO[Msg]
}

// prepareParticipation stores peer indices that participated in DutyPrepareProposer consensus.
// This is used to adjust leader election for DutyProposer to skip offline/malicious nodes.
// Key is the slot of DutyPrepareProposer (which is slot-1 of the corresponding DutyProposer).
prepareParticipation struct {
sync.RWMutex

data map[uint64][]int64
}
}

// ProtocolID returns the protocol ID.
Expand Down Expand Up @@ -374,6 +385,26 @@ func (c *Consensus) Start(ctx context.Context) {
// waits until it completes, in both cases it returns the resulting error.
// Note this errors if called multiple times for the same duty.
func (c *Consensus) Propose(ctx context.Context, duty core.Duty, data core.UnsignedDataSet) error {
// Inject visible peers for PrepareProposer duty.
if duty.Type == core.DutyPrepareProposer {
var visible []uint64

for i, p := range c.peers {
// Include self and connected peers.
if p.ID == c.p2pNode.ID() || c.p2pNode.Network().Connectedness(p.ID) == network.Connected {
visible = append(visible, uint64(i))
}
}

// Update the data set.
for k, v := range data {
if p, ok := v.(core.PrepareProposerData); ok {
p.VisiblePeers = visible
data[k] = p
}
}
}

// Hash the proposed data, since qbft only supports simple comparable values.
value, err := core.UnsignedDataSetToProto(data)
if err != nil {
Expand Down Expand Up @@ -545,7 +576,7 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err
span.End()
}()

decideCallback := func(qcommit []qbft.Msg[core.Duty, [32]byte, proto.Message]) {
decideCallback := func(qcommit []qbft.Msg[core.Duty, [32]byte, proto.Message], value proto.Message) {
round := qcommit[0].Round()
decided = true

Expand All @@ -568,12 +599,23 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err
span.SetAttributes(attribute.String("leader_name", leaderName))
span.AddEvent("qbft.Decided")

// Store participation for DutyPrepareProposer to be used in DutyProposer leader election.
if duty.Type == core.DutyPrepareProposer {
c.storeParticipation(duty.Slot, value)
}

// qbft.Run() is stopped by cancelling the context, or if an error occurred.
cancel()
}

// isLeader returns true if the given process is the leader for the given duty and round.
// For DutyProposer, it uses participation data from DutyPrepareProposer (slot-1) if available.
isLeader := func(d core.Duty, round, process int64) bool {
return c.leaderWithParticipation(d, round, nodes) == process
}

// Create a new qbft definition for this instance.
def := newDefinition(len(c.peers), c.subscribers, roundTimer, decideCallback, c.compareAttestations)
def := newDefinition(len(c.peers), c.subscribers, roundTimer, decideCallback, isLeader, c.compareAttestations)
origLogRoundChange := def.LogRoundChange
def.LogRoundChange = func(ctx context.Context, instance core.Duty, process, round, newRound int64, uponRule qbft.UponRule, msgs []qbft.Msg[core.Duty, [32]byte, proto.Message]) {
if origLogRoundChange != nil {
Expand Down Expand Up @@ -881,6 +923,85 @@ func fmtStepPeers(step roundStep) string {
return strings.Join(resp, "")
}

// storeParticipation stores peer indices that participated in DutyPrepareProposer consensus.
// It also cleans up entries older than 2 slots to prevent memory growth.
func (c *Consensus) storeParticipation(slot uint64, value proto.Message) {
// Convert proto message back to UnsignedDataSet
unsignedPB, ok := value.(*pbv1.UnsignedDataSet)
if !ok {
return
}

unsignedSet, err := core.UnsignedDataSetFromProto(core.DutyPrepareProposer, unsignedPB)
if err != nil {
return
}

var participants []int64

// Extract visible peers from the dataset
for _, v := range unsignedSet {
if p, ok := v.(core.PrepareProposerData); ok {
for _, peerIdx := range p.VisiblePeers {
participants = append(participants, int64(peerIdx))
}
// All entries in the set should have the same VisiblePeers since they come from the same leader proposal
break
}
}

// Sort for deterministic leader election across all nodes.
slices.Sort(participants)

c.prepareParticipation.Lock()
defer c.prepareParticipation.Unlock()

// Store participation for this slot.
c.prepareParticipation.data[slot] = participants

// Clean up entries older than 2 slots.
for s := range c.prepareParticipation.data {
if slot > 1 && s < slot-1 {
delete(c.prepareParticipation.data, s)
}
}
}

// getParticipants returns the list of peer indices that participated in DutyPrepareProposer
// for the given slot. Returns nil if no participation data is available.
func (c *Consensus) getParticipants(prepareSlot uint64) []int64 {
c.prepareParticipation.RLock()
defer c.prepareParticipation.RUnlock()

return c.prepareParticipation.data[prepareSlot]
}

// leaderWithParticipation returns the leader index for the given duty and round.
// For DutyProposer, it uses participation data from DutyPrepareProposer (slot-1) if available,
// which allows skipping offline/malicious nodes in leader election.
// For all other duties, it falls back to the standard leader election.
func (c *Consensus) leaderWithParticipation(duty core.Duty, round int64, nodes int) int64 {
if duty.Type != core.DutyProposer {
return leader(duty, round, nodes)
}

if duty.Slot == 0 {
return leader(duty, round, nodes)
}

participants := c.getParticipants(duty.Slot - 1)

// If no participation data (e.g., DutyPrepareProposer didn't reach consensus),
// fall back to standard leader election.
if len(participants) == 0 {
return leader(duty, round, nodes)
}

idx := (int64(duty.Slot) + int64(duty.Type) + round) % int64(len(participants))

return participants[idx]
}

// leader return the deterministic leader index.
func leader(duty core.Duty, round int64, nodes int) int64 {
return (int64(duty.Slot) + int64(duty.Type) + round) % int64(nodes)
Expand Down
Loading
Loading