Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use alloy_signer::Signer;

use crate::{
error::ConsensusError,
events::ConsensusEventBus,
protos::consensus::v1::{Proposal, Vote},
scope::ConsensusScope,
session::ConsensusConfig,
storage::ConsensusStorage,
types::CreateProposalRequest,
};

pub trait ConsensusServiceAPI<Scope, S, E>
where
Scope: ConsensusScope,
S: ConsensusStorage<Scope>,
E: ConsensusEventBus<Scope>,
{
fn create_proposal(
&self,
scope: &Scope,
request: CreateProposalRequest,
) -> impl Future<Output = Result<Proposal, ConsensusError>> + Send;
fn create_proposal_with_config(
&self,
scope: &Scope,
request: CreateProposalRequest,
config: Option<ConsensusConfig>,
) -> impl Future<Output = Result<Proposal, ConsensusError>> + Send;

fn cast_vote<SN: Signer + Sync + Send>(
&self,
scope: &Scope,
proposal_id: u32,
choice: bool,
signer: SN,
) -> impl Future<Output = Result<Vote, ConsensusError>> + Send;
fn cast_vote_and_get_proposal<SN: Signer + Sync + Send>(
&self,
scope: &Scope,
proposal_id: u32,
choice: bool,
signer: SN,
) -> impl Future<Output = Result<Proposal, ConsensusError>> + Send;

fn process_incoming_proposal(
&self,
scope: &Scope,
proposal: Proposal,
) -> impl Future<Output = Result<(), ConsensusError>> + Send;
fn process_incoming_vote(
&self,
scope: &Scope,
vote: Vote,
) -> impl Future<Output = Result<(), ConsensusError>> + Send;

fn get_proposal(
&self,
scope: &Scope,
proposal_id: u32,
) -> impl Future<Output = Result<Proposal, ConsensusError>> + Send;
fn get_proposal_payload(
&self,
scope: &Scope,
proposal_id: u32,
) -> impl Future<Output = Result<Vec<u8>, ConsensusError>> + Send;
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod protos {
}
}

pub mod api;
pub mod error;
pub mod events;
pub mod scope;
Expand Down
2 changes: 1 addition & 1 deletion src/protos/messages/v1/consensus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package consensus.v1;
// Proposal represents a consensus proposal that needs voting
message Proposal {
string name = 10; // Proposal name
string payload = 11; // Payload with the proposal data
bytes payload = 11; // Payload with the proposal data
uint32 proposal_id = 12; // Unique identifier of the proposal
bytes proposal_owner = 13; // Public key of the creator
repeated Vote votes = 14; // Vote list in the proposal
Expand Down
32 changes: 28 additions & 4 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
session::{ConsensusConfig, ConsensusSession, ConsensusState},
storage::{ConsensusStorage, InMemoryConsensusStorage},
types::{ConsensusEvent, SessionTransition},
utils::{calculate_consensus_result, has_sufficient_votes},
utils::{calculate_consensus_result, current_timestamp, has_sufficient_votes},
};
/// The main service that handles proposals, votes, and consensus.
///
Expand Down Expand Up @@ -147,6 +147,16 @@ where
Ok(Some(result))
}

/// Get the resolved per-proposal consensus configuration for an existing session.
pub async fn get_proposal_config(
&self,
scope: &Scope,
proposal_id: u32,
) -> Result<ConsensusConfig, ConsensusError> {
let session = self.get_session(scope, proposal_id).await?;
Ok(session.config)
}

/// Get all proposals that have reached consensus, along with their results.
///
/// Returns a map from proposal ID to result (`true` for YES, `false` for NO).
Expand Down Expand Up @@ -270,6 +280,9 @@ where
proposal: Option<&Proposal>,
) -> Result<ConsensusConfig, ConsensusError> {
// 1. If explicit config override exists, use it as base
// NOTE: if a per-proposal override is provided, we should not stomp its timeout
// from the proposal's expiration fields (the caller explicitly chose it).
let has_explicit_override = proposal_override.is_some();
let base_config = if let Some(override_config) = proposal_override {
override_config
} else if let Some(scope_config) = self.storage.get_scope_config(scope).await? {
Expand All @@ -280,8 +293,11 @@ where

// 2. Apply proposal field overrides if proposal is provided
if let Some(prop) = proposal {
// Calculate timeout from expiration_timestamp (absolute timestamp) - timestamp (creation time)
let timeout_seconds = if prop.expiration_timestamp > prop.timestamp {
// Calculate timeout from expiration_timestamp (absolute timestamp) - timestamp (creation time),
// unless an explicit override was supplied.
let timeout_seconds = if has_explicit_override {
base_config.consensus_timeout()
} else if prop.expiration_timestamp > prop.timestamp {
Duration::from_secs(prop.expiration_timestamp - prop.timestamp)
} else {
base_config.consensus_timeout()
Expand Down Expand Up @@ -342,12 +358,19 @@ where
ConsensusEvent::ConsensusReached {
proposal_id,
result: consensus_result,
timestamp: current_timestamp()?,
},
);
Ok(consensus_result)
}
None => {
self.emit_event(scope, ConsensusEvent::ConsensusFailed { proposal_id });
self.emit_event(
scope,
ConsensusEvent::ConsensusFailed {
proposal_id,
timestamp: current_timestamp()?,
},
);
Err(ConsensusError::InsufficientVotesAtTimeout)
}
}
Expand Down Expand Up @@ -423,6 +446,7 @@ where
ConsensusEvent::ConsensusReached {
proposal_id,
result,
timestamp: current_timestamp().unwrap_or(0),
},
);
}
Expand Down
37 changes: 26 additions & 11 deletions src/service_consensus.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use alloy_signer::Signer;

use crate::{
api::ConsensusServiceAPI,
error::ConsensusError,
events::ConsensusEventBus,
protos::consensus::v1::{Proposal, Vote},
Expand All @@ -12,7 +13,7 @@ use crate::{
utils::{build_vote, validate_proposal_timestamp, validate_vote},
};

impl<Scope, S, E> ConsensusService<Scope, S, E>
impl<Scope, S, E> ConsensusServiceAPI<Scope, S, E> for ConsensusService<Scope, S, E>
where
Scope: ConsensusScope,
S: ConsensusStorage<Scope>,
Expand Down Expand Up @@ -56,7 +57,7 @@ where
/// Ok(())
/// }
/// ```
pub async fn create_proposal(
async fn create_proposal(
&self,
scope: &Scope,
request: CreateProposalRequest,
Expand Down Expand Up @@ -109,7 +110,7 @@ where
/// Ok(())
/// }
/// ```
pub async fn create_proposal_with_config(
async fn create_proposal_with_config(
&self,
scope: &Scope,
request: CreateProposalRequest,
Expand All @@ -132,7 +133,7 @@ where
/// Vote is cryptographically signed and linked to previous votes in the hashgraph.
/// Returns the signed vote, which you can then send to other peers in the network.
/// Each voter can only vote once per proposal.
pub async fn cast_vote<SN: Signer + Sync>(
async fn cast_vote<SN: Signer + Sync + Send>(
&self,
scope: &Scope,
proposal_id: u32,
Expand Down Expand Up @@ -166,7 +167,7 @@ where
/// This is a convenience method that combines `cast_vote` and fetching the proposal.
/// Useful for proposal creator as they can immediately see the proposal with their vote
/// and share it with other peers.
pub async fn cast_vote_and_get_proposal<SN: Signer + Sync>(
async fn cast_vote_and_get_proposal<SN: Signer + Sync + Send>(
&self,
scope: &Scope,
proposal_id: u32,
Expand All @@ -185,7 +186,7 @@ where
/// If it necessary the consensus configuration is resolved from the proposal.
/// If the proposal already has enough votes, consensus is reached
/// immediately and an event is emitted.
pub async fn process_incoming_proposal(
async fn process_incoming_proposal(
&self,
scope: &Scope,
proposal: Proposal,
Expand All @@ -208,11 +209,7 @@ where
/// The vote is validated (signature, timestamp, vote chain) and added to the proposal.
/// If this vote brings the total to the consensus threshold, consensus is reached and
/// an event is emitted.
pub async fn process_incoming_vote(
&self,
scope: &Scope,
vote: Vote,
) -> Result<(), ConsensusError> {
async fn process_incoming_vote(&self, scope: &Scope, vote: Vote) -> Result<(), ConsensusError> {
let session = self.get_session(scope, vote.proposal_id).await?;
validate_vote(
&vote,
Expand All @@ -227,4 +224,22 @@ where
self.handle_transition(scope, proposal_id, transition);
Ok(())
}

async fn get_proposal(
&self,
scope: &Scope,
proposal_id: u32,
) -> Result<Proposal, ConsensusError> {
let session = self.get_session(scope, proposal_id).await?;
Ok(session.proposal)
}

async fn get_proposal_payload(
&self,
scope: &Scope,
proposal_id: u32,
) -> Result<Vec<u8>, ConsensusError> {
let session = self.get_session(scope, proposal_id).await?;
Ok(session.proposal.payload)
}
}
54 changes: 39 additions & 15 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,24 @@ impl ConsensusConfig {
ConsensusConfig::from(NetworkType::Gossipsub)
}

pub fn set_up_rounds(&mut self, max_rounds: u32) -> Result<(), ConsensusError> {
if max_rounds == 0 {
return Err(ConsensusError::InvalidMaxRounds);
}
self.max_rounds = max_rounds;
Ok(())
/// Set consensus timeout (validated) and return the updated config.
pub fn with_timeout(mut self, consensus_timeout: Duration) -> Result<Self, ConsensusError> {
crate::utils::validate_timeout(consensus_timeout)?;
self.consensus_timeout = consensus_timeout;
Ok(self)
}

/// Set consensus threshold (validated) and return the updated config.
pub fn with_threshold(mut self, consensus_threshold: f64) -> Result<Self, ConsensusError> {
crate::utils::validate_threshold(consensus_threshold)?;
self.consensus_threshold = consensus_threshold;
Ok(self)
}

/// Set liveness criteria and return the updated config.
pub fn with_liveness_criteria(mut self, liveness_criteria: bool) -> Self {
self.liveness_criteria = liveness_criteria;
self
}

/// Create a new ConsensusConfig with the given values.
Expand Down Expand Up @@ -381,8 +393,7 @@ mod tests {
.unwrap();

let proposal = request.into_proposal().unwrap();
let mut config = ConsensusConfig::gossipsub();
config.set_up_rounds(2).unwrap();
let config = ConsensusConfig::gossipsub();
let mut session = ConsensusSession::new(proposal, config);

// Round 1 -> Round 2 (first vote)
Expand All @@ -409,12 +420,14 @@ mod tests {

#[tokio::test]
async fn enforce_max_rounds_p2p() {
// P2P: max_rounds = 2 means maximum 2 votes
// Round 1 = 0 votes, Round 2 = 1 vote, Round 3 = 2 votes
// So max_rounds = 2 allows up to round 3 (2 votes)
// P2P defaults: max_rounds = 0 triggers dynamic calculation based on expected voters.
// For threshold=2/3 and expected_voters=5, max_round_limit = ceil(2n/3) = 4 votes.
// Round 1 = 0 votes, Round 2 = 1 vote, ... Round 5 = 4 votes.
let signer1 = PrivateKeySigner::random();
let signer2 = PrivateKeySigner::random();
let signer3 = PrivateKeySigner::random();
let signer4 = PrivateKeySigner::random();
let signer5 = PrivateKeySigner::random();

let request = CreateProposalRequest::new(
"Test".into(),
Expand All @@ -427,8 +440,7 @@ mod tests {
.unwrap();

let proposal = request.into_proposal().unwrap();
let mut config = ConsensusConfig::p2p();
config.set_up_rounds(2).unwrap();
let config = ConsensusConfig::p2p();
let mut session = ConsensusSession::new(proposal, config);

// Round 1 -> Round 2 (first vote, 1 vote total)
Expand All @@ -443,9 +455,21 @@ mod tests {
assert_eq!(session.proposal.round, 3);
assert_eq!(session.votes.len(), 2);

// Third vote would be round 4 (3 votes total), which exceeds max_rounds = 2
// Round 3 -> Round 4 (third vote, 3 votes total) - should succeed
let vote3 = build_vote(&session.proposal, true, signer3).await.unwrap();
let err = session.add_vote(vote3).unwrap_err();
session.add_vote(vote3).unwrap();
assert_eq!(session.proposal.round, 4);
assert_eq!(session.votes.len(), 3);

// Round 4 -> Round 5 (fourth vote, 4 votes total) - should succeed (dynamic limit = 4)
let vote4 = build_vote(&session.proposal, true, signer4).await.unwrap();
session.add_vote(vote4).unwrap();
assert_eq!(session.proposal.round, 5);
assert_eq!(session.votes.len(), 4);

// Fifth vote would exceed dynamic max_round_limit (=4 votes)
let vote5 = build_vote(&session.proposal, true, signer5).await.unwrap();
let err = session.add_vote(vote5).unwrap_err();
assert!(matches!(err, ConsensusError::MaxRoundsExceeded));
}
}
14 changes: 9 additions & 5 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ use crate::{
utils::{current_timestamp, generate_id, validate_expected_voters_count, validate_timeout},
};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConsensusEvent {
/// Consensus was reached! The proposal has a final result (yes or no).
ConsensusReached { proposal_id: u32, result: bool },
ConsensusReached {
proposal_id: u32,
result: bool,
timestamp: u64,
},
/// Consensus failed - not enough votes were collected before the timeout.
ConsensusFailed { proposal_id: u32 },
ConsensusFailed { proposal_id: u32, timestamp: u64 },
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand All @@ -27,7 +31,7 @@ pub struct CreateProposalRequest {
/// A short name for the proposal (e.g., "Upgrade to v2").
pub name: String,
/// Additional details about what's being voted on.
pub payload: String,
pub payload: Vec<u8>,
/// The address (public key bytes) of whoever created this proposal.
pub proposal_owner: Vec<u8>,
/// How many people are expected to vote (used to calculate consensus threshold).
Expand All @@ -42,7 +46,7 @@ impl CreateProposalRequest {
/// Create a new proposal request with validation.
pub fn new(
name: String,
payload: String,
payload: Vec<u8>,
proposal_owner: Vec<u8>,
expected_voters_count: u32,
expiration_timestamp: u64,
Expand Down
Loading