Skip to content

Commit c066d42

Browse files
seemenkinaclaude
andcommitted
feat(api): add ConsensusServiceAPI trait and refactor types
- Add ConsensusServiceAPI trait defining the public contract - Add get_proposal and get_proposal_payload methods - Refactor Proposal payload from String to Vec<u8> - Add get_proposal_config method to ConsensusService - Enhance ConsensusConfig with new builder methods - Add PartialEq/Eq to ConsensusEvent for testability - Add timestamp to consensus events Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 85c3d1e commit c066d42

15 files changed

+269
-93
lines changed

src/api.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
use alloy_signer::Signer;
2+
3+
use crate::{
4+
error::ConsensusError,
5+
events::ConsensusEventBus,
6+
protos::consensus::v1::{Proposal, Vote},
7+
scope::ConsensusScope,
8+
session::ConsensusConfig,
9+
storage::ConsensusStorage,
10+
types::CreateProposalRequest,
11+
};
12+
13+
pub trait ConsensusServiceAPI<Scope, S, E>
14+
where
15+
Scope: ConsensusScope,
16+
S: ConsensusStorage<Scope>,
17+
E: ConsensusEventBus<Scope>,
18+
{
19+
fn create_proposal(
20+
&self,
21+
scope: &Scope,
22+
request: CreateProposalRequest,
23+
) -> impl Future<Output = Result<Proposal, ConsensusError>> + Send;
24+
fn create_proposal_with_config(
25+
&self,
26+
scope: &Scope,
27+
request: CreateProposalRequest,
28+
config: Option<ConsensusConfig>,
29+
) -> impl Future<Output = Result<Proposal, ConsensusError>> + Send;
30+
31+
fn cast_vote<SN: Signer + Sync + Send>(
32+
&self,
33+
scope: &Scope,
34+
proposal_id: u32,
35+
choice: bool,
36+
signer: SN,
37+
) -> impl Future<Output = Result<Vote, ConsensusError>> + Send;
38+
fn cast_vote_and_get_proposal<SN: Signer + Sync + Send>(
39+
&self,
40+
scope: &Scope,
41+
proposal_id: u32,
42+
choice: bool,
43+
signer: SN,
44+
) -> impl Future<Output = Result<Proposal, ConsensusError>> + Send;
45+
46+
fn process_incoming_proposal(
47+
&self,
48+
scope: &Scope,
49+
proposal: Proposal,
50+
) -> impl Future<Output = Result<(), ConsensusError>> + Send;
51+
fn process_incoming_vote(
52+
&self,
53+
scope: &Scope,
54+
vote: Vote,
55+
) -> impl Future<Output = Result<(), ConsensusError>> + Send;
56+
57+
fn get_proposal(
58+
&self,
59+
scope: &Scope,
60+
proposal_id: u32,
61+
) -> impl Future<Output = Result<Proposal, ConsensusError>> + Send;
62+
fn get_proposal_payload(
63+
&self,
64+
scope: &Scope,
65+
proposal_id: u32,
66+
) -> impl Future<Output = Result<Vec<u8>, ConsensusError>> + Send;
67+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub mod protos {
2323
}
2424
}
2525

26+
pub mod api;
2627
pub mod error;
2728
pub mod events;
2829
pub mod scope;

src/protos/messages/v1/consensus.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ package consensus.v1;
44
// Proposal represents a consensus proposal that needs voting
55
message Proposal {
66
string name = 10; // Proposal name
7-
string payload = 11; // Payload with the proposal data
7+
bytes payload = 11; // Payload with the proposal data
88
uint32 proposal_id = 12; // Unique identifier of the proposal
99
bytes proposal_owner = 13; // Public key of the creator
1010
repeated Vote votes = 14; // Vote list in the proposal

src/service.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010
session::{ConsensusConfig, ConsensusSession, ConsensusState},
1111
storage::{ConsensusStorage, InMemoryConsensusStorage},
1212
types::{ConsensusEvent, SessionTransition},
13-
utils::{calculate_consensus_result, has_sufficient_votes},
13+
utils::{calculate_consensus_result, current_timestamp, has_sufficient_votes},
1414
};
1515
/// The main service that handles proposals, votes, and consensus.
1616
///
@@ -147,6 +147,16 @@ where
147147
Ok(Some(result))
148148
}
149149

150+
/// Get the resolved per-proposal consensus configuration for an existing session.
151+
pub async fn get_proposal_config(
152+
&self,
153+
scope: &Scope,
154+
proposal_id: u32,
155+
) -> Result<ConsensusConfig, ConsensusError> {
156+
let session = self.get_session(scope, proposal_id).await?;
157+
Ok(session.config)
158+
}
159+
150160
/// Get all proposals that have reached consensus, along with their results.
151161
///
152162
/// Returns a map from proposal ID to result (`true` for YES, `false` for NO).
@@ -270,6 +280,9 @@ where
270280
proposal: Option<&Proposal>,
271281
) -> Result<ConsensusConfig, ConsensusError> {
272282
// 1. If explicit config override exists, use it as base
283+
// NOTE: if a per-proposal override is provided, we should not stomp its timeout
284+
// from the proposal's expiration fields (the caller explicitly chose it).
285+
let has_explicit_override = proposal_override.is_some();
273286
let base_config = if let Some(override_config) = proposal_override {
274287
override_config
275288
} else if let Some(scope_config) = self.storage.get_scope_config(scope).await? {
@@ -280,8 +293,11 @@ where
280293

281294
// 2. Apply proposal field overrides if proposal is provided
282295
if let Some(prop) = proposal {
283-
// Calculate timeout from expiration_timestamp (absolute timestamp) - timestamp (creation time)
284-
let timeout_seconds = if prop.expiration_timestamp > prop.timestamp {
296+
// Calculate timeout from expiration_timestamp (absolute timestamp) - timestamp (creation time),
297+
// unless an explicit override was supplied.
298+
let timeout_seconds = if has_explicit_override {
299+
base_config.consensus_timeout()
300+
} else if prop.expiration_timestamp > prop.timestamp {
285301
Duration::from_secs(prop.expiration_timestamp - prop.timestamp)
286302
} else {
287303
base_config.consensus_timeout()
@@ -342,12 +358,19 @@ where
342358
ConsensusEvent::ConsensusReached {
343359
proposal_id,
344360
result: consensus_result,
361+
timestamp: current_timestamp()?,
345362
},
346363
);
347364
Ok(consensus_result)
348365
}
349366
None => {
350-
self.emit_event(scope, ConsensusEvent::ConsensusFailed { proposal_id });
367+
self.emit_event(
368+
scope,
369+
ConsensusEvent::ConsensusFailed {
370+
proposal_id,
371+
timestamp: current_timestamp()?,
372+
},
373+
);
351374
Err(ConsensusError::InsufficientVotesAtTimeout)
352375
}
353376
}
@@ -423,6 +446,7 @@ where
423446
ConsensusEvent::ConsensusReached {
424447
proposal_id,
425448
result,
449+
timestamp: current_timestamp().unwrap_or(0),
426450
},
427451
);
428452
}

src/service_consensus.rs

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use alloy_signer::Signer;
22

33
use crate::{
4+
api::ConsensusServiceAPI,
45
error::ConsensusError,
56
events::ConsensusEventBus,
67
protos::consensus::v1::{Proposal, Vote},
@@ -12,7 +13,7 @@ use crate::{
1213
utils::{build_vote, validate_proposal_timestamp, validate_vote},
1314
};
1415

15-
impl<Scope, S, E> ConsensusService<Scope, S, E>
16+
impl<Scope, S, E> ConsensusServiceAPI<Scope, S, E> for ConsensusService<Scope, S, E>
1617
where
1718
Scope: ConsensusScope,
1819
S: ConsensusStorage<Scope>,
@@ -56,7 +57,7 @@ where
5657
/// Ok(())
5758
/// }
5859
/// ```
59-
pub async fn create_proposal(
60+
async fn create_proposal(
6061
&self,
6162
scope: &Scope,
6263
request: CreateProposalRequest,
@@ -109,7 +110,7 @@ where
109110
/// Ok(())
110111
/// }
111112
/// ```
112-
pub async fn create_proposal_with_config(
113+
async fn create_proposal_with_config(
113114
&self,
114115
scope: &Scope,
115116
request: CreateProposalRequest,
@@ -132,7 +133,7 @@ where
132133
/// Vote is cryptographically signed and linked to previous votes in the hashgraph.
133134
/// Returns the signed vote, which you can then send to other peers in the network.
134135
/// Each voter can only vote once per proposal.
135-
pub async fn cast_vote<SN: Signer + Sync>(
136+
async fn cast_vote<SN: Signer + Sync + Send>(
136137
&self,
137138
scope: &Scope,
138139
proposal_id: u32,
@@ -166,7 +167,7 @@ where
166167
/// This is a convenience method that combines `cast_vote` and fetching the proposal.
167168
/// Useful for proposal creator as they can immediately see the proposal with their vote
168169
/// and share it with other peers.
169-
pub async fn cast_vote_and_get_proposal<SN: Signer + Sync>(
170+
async fn cast_vote_and_get_proposal<SN: Signer + Sync + Send>(
170171
&self,
171172
scope: &Scope,
172173
proposal_id: u32,
@@ -185,7 +186,7 @@ where
185186
/// If it necessary the consensus configuration is resolved from the proposal.
186187
/// If the proposal already has enough votes, consensus is reached
187188
/// immediately and an event is emitted.
188-
pub async fn process_incoming_proposal(
189+
async fn process_incoming_proposal(
189190
&self,
190191
scope: &Scope,
191192
proposal: Proposal,
@@ -208,11 +209,7 @@ where
208209
/// The vote is validated (signature, timestamp, vote chain) and added to the proposal.
209210
/// If this vote brings the total to the consensus threshold, consensus is reached and
210211
/// an event is emitted.
211-
pub async fn process_incoming_vote(
212-
&self,
213-
scope: &Scope,
214-
vote: Vote,
215-
) -> Result<(), ConsensusError> {
212+
async fn process_incoming_vote(&self, scope: &Scope, vote: Vote) -> Result<(), ConsensusError> {
216213
let session = self.get_session(scope, vote.proposal_id).await?;
217214
validate_vote(
218215
&vote,
@@ -227,4 +224,22 @@ where
227224
self.handle_transition(scope, proposal_id, transition);
228225
Ok(())
229226
}
227+
228+
async fn get_proposal(
229+
&self,
230+
scope: &Scope,
231+
proposal_id: u32,
232+
) -> Result<Proposal, ConsensusError> {
233+
let session = self.get_session(scope, proposal_id).await?;
234+
Ok(session.proposal)
235+
}
236+
237+
async fn get_proposal_payload(
238+
&self,
239+
scope: &Scope,
240+
proposal_id: u32,
241+
) -> Result<Vec<u8>, ConsensusError> {
242+
let session = self.get_session(scope, proposal_id).await?;
243+
Ok(session.proposal.payload)
244+
}
230245
}

src/session.rs

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,24 @@ impl ConsensusConfig {
6666
ConsensusConfig::from(NetworkType::Gossipsub)
6767
}
6868

69-
pub fn set_up_rounds(&mut self, max_rounds: u32) -> Result<(), ConsensusError> {
70-
if max_rounds == 0 {
71-
return Err(ConsensusError::InvalidMaxRounds);
72-
}
73-
self.max_rounds = max_rounds;
74-
Ok(())
69+
/// Set consensus timeout (validated) and return the updated config.
70+
pub fn with_timeout(mut self, consensus_timeout: Duration) -> Result<Self, ConsensusError> {
71+
crate::utils::validate_timeout(consensus_timeout)?;
72+
self.consensus_timeout = consensus_timeout;
73+
Ok(self)
74+
}
75+
76+
/// Set consensus threshold (validated) and return the updated config.
77+
pub fn with_threshold(mut self, consensus_threshold: f64) -> Result<Self, ConsensusError> {
78+
crate::utils::validate_threshold(consensus_threshold)?;
79+
self.consensus_threshold = consensus_threshold;
80+
Ok(self)
81+
}
82+
83+
/// Set liveness criteria and return the updated config.
84+
pub fn with_liveness_criteria(mut self, liveness_criteria: bool) -> Self {
85+
self.liveness_criteria = liveness_criteria;
86+
self
7587
}
7688

7789
/// Create a new ConsensusConfig with the given values.
@@ -381,8 +393,7 @@ mod tests {
381393
.unwrap();
382394

383395
let proposal = request.into_proposal().unwrap();
384-
let mut config = ConsensusConfig::gossipsub();
385-
config.set_up_rounds(2).unwrap();
396+
let config = ConsensusConfig::gossipsub();
386397
let mut session = ConsensusSession::new(proposal, config);
387398

388399
// Round 1 -> Round 2 (first vote)
@@ -409,12 +420,14 @@ mod tests {
409420

410421
#[tokio::test]
411422
async fn enforce_max_rounds_p2p() {
412-
// P2P: max_rounds = 2 means maximum 2 votes
413-
// Round 1 = 0 votes, Round 2 = 1 vote, Round 3 = 2 votes
414-
// So max_rounds = 2 allows up to round 3 (2 votes)
423+
// P2P defaults: max_rounds = 0 triggers dynamic calculation based on expected voters.
424+
// For threshold=2/3 and expected_voters=5, max_round_limit = ceil(2n/3) = 4 votes.
425+
// Round 1 = 0 votes, Round 2 = 1 vote, ... Round 5 = 4 votes.
415426
let signer1 = PrivateKeySigner::random();
416427
let signer2 = PrivateKeySigner::random();
417428
let signer3 = PrivateKeySigner::random();
429+
let signer4 = PrivateKeySigner::random();
430+
let signer5 = PrivateKeySigner::random();
418431

419432
let request = CreateProposalRequest::new(
420433
"Test".into(),
@@ -427,8 +440,7 @@ mod tests {
427440
.unwrap();
428441

429442
let proposal = request.into_proposal().unwrap();
430-
let mut config = ConsensusConfig::p2p();
431-
config.set_up_rounds(2).unwrap();
443+
let config = ConsensusConfig::p2p();
432444
let mut session = ConsensusSession::new(proposal, config);
433445

434446
// Round 1 -> Round 2 (first vote, 1 vote total)
@@ -443,9 +455,21 @@ mod tests {
443455
assert_eq!(session.proposal.round, 3);
444456
assert_eq!(session.votes.len(), 2);
445457

446-
// Third vote would be round 4 (3 votes total), which exceeds max_rounds = 2
458+
// Round 3 -> Round 4 (third vote, 3 votes total) - should succeed
447459
let vote3 = build_vote(&session.proposal, true, signer3).await.unwrap();
448-
let err = session.add_vote(vote3).unwrap_err();
460+
session.add_vote(vote3).unwrap();
461+
assert_eq!(session.proposal.round, 4);
462+
assert_eq!(session.votes.len(), 3);
463+
464+
// Round 4 -> Round 5 (fourth vote, 4 votes total) - should succeed (dynamic limit = 4)
465+
let vote4 = build_vote(&session.proposal, true, signer4).await.unwrap();
466+
session.add_vote(vote4).unwrap();
467+
assert_eq!(session.proposal.round, 5);
468+
assert_eq!(session.votes.len(), 4);
469+
470+
// Fifth vote would exceed dynamic max_round_limit (=4 votes)
471+
let vote5 = build_vote(&session.proposal, true, signer5).await.unwrap();
472+
let err = session.add_vote(vote5).unwrap_err();
449473
assert!(matches!(err, ConsensusError::MaxRoundsExceeded));
450474
}
451475
}

src/types.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,16 @@ use crate::{
66
utils::{current_timestamp, generate_id, validate_expected_voters_count, validate_timeout},
77
};
88

9-
#[derive(Debug, Clone)]
9+
#[derive(Debug, Clone, PartialEq, Eq)]
1010
pub enum ConsensusEvent {
1111
/// Consensus was reached! The proposal has a final result (yes or no).
12-
ConsensusReached { proposal_id: u32, result: bool },
12+
ConsensusReached {
13+
proposal_id: u32,
14+
result: bool,
15+
timestamp: u64,
16+
},
1317
/// Consensus failed - not enough votes were collected before the timeout.
14-
ConsensusFailed { proposal_id: u32 },
18+
ConsensusFailed { proposal_id: u32, timestamp: u64 },
1519
}
1620

1721
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -27,7 +31,7 @@ pub struct CreateProposalRequest {
2731
/// A short name for the proposal (e.g., "Upgrade to v2").
2832
pub name: String,
2933
/// Additional details about what's being voted on.
30-
pub payload: String,
34+
pub payload: Vec<u8>,
3135
/// The address (public key bytes) of whoever created this proposal.
3236
pub proposal_owner: Vec<u8>,
3337
/// How many people are expected to vote (used to calculate consensus threshold).
@@ -42,7 +46,7 @@ impl CreateProposalRequest {
4246
/// Create a new proposal request with validation.
4347
pub fn new(
4448
name: String,
45-
payload: String,
49+
payload: Vec<u8>,
4650
proposal_owner: Vec<u8>,
4751
expected_voters_count: u32,
4852
expiration_timestamp: u64,

0 commit comments

Comments
 (0)