Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft"
version = "0.45.3"
version = "0.45.4"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "data-structures", "cryptography", "database"]
Expand Down
221 changes: 221 additions & 0 deletions consensus/src/consensus/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
use crate::{
alerts::{Alert, ForkingNotification},
collection::Salt,
consensus::LOG_TARGET,
dag::{Dag, DagResult, DagStatus, DagUnit, Request as ReconstructionRequest},
dissemination::{Addressed, DisseminationMessage, Responder, TaskManager, TaskManagerStatus},
extension::Ordering,
units::{UncheckedSignedUnit, Unit, UnitStore, UnitStoreStatus, Validator},
Data, DelayConfig, Hasher, MultiKeychain, NodeIndex, UnitFinalizationHandler,
};
use log::{debug, trace};
use std::{
cmp::max,
fmt::{Display, Formatter, Result as FmtResult},
time::Duration,
};

/// The main logic of the consensus, minus all the asynchronous components.
pub struct Consensus<UFH, MK>
where
UFH: UnitFinalizationHandler,
MK: MultiKeychain,
{
store: UnitStore<DagUnit<UFH::Hasher, UFH::Data, MK>>,
dag: Dag<UFH::Hasher, UFH::Data, MK>,
responder: Responder<UFH::Hasher, UFH::Data, MK>,
ordering: Ordering<MK, UFH>,
task_manager: TaskManager<UFH::Hasher>,
}

/// The status of the consensus, for logging purposes.
pub struct Status<H: Hasher> {
task_manager_status: TaskManagerStatus<H>,
dag_status: DagStatus,
store_status: UnitStoreStatus,
}

impl<H: Hasher> Status<H> {
fn short_report(&self) -> String {
let rounds_behind = max(self.dag_status.top_round(), self.store_status.top_round())
- self.store_status.top_round();
match rounds_behind {
(0..=2) => "healthy".to_string(),
(3..) => format!("behind by {rounds_behind} rounds"),
}
}
}

impl<H: Hasher> Display for Status<H> {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
write!(f, "{}", self.short_report())?;
write!(f, ";reconstructed DAG: {}", self.store_status)?;
write!(f, ";additional information: {}", self.dag_status)?;
write!(f, ";task manager: {}", self.task_manager_status)?;
Ok(())
}
}

type AddressedDisseminationMessage<H, D, MK> = Addressed<DisseminationMessage<H, D, MK>>;

/// The result of some operation within the consensus, requiring either other components should get
/// informed about it, or messages should be sent to the network.
pub struct ConsensusResult<H: Hasher, D: Data, MK: MultiKeychain> {
/// Units that should be sent for backup saving.
pub units: Vec<DagUnit<H, D, MK>>,
/// Alerts that should be sent to the alerting component.
pub alerts: Vec<Alert<H, D, MK::Signature>>,
/// Messages that should be sent to other committee members.
pub messages: Vec<AddressedDisseminationMessage<H, D, MK::Signature>>,
}

impl<H: Hasher, D: Data, MK: MultiKeychain> ConsensusResult<H, D, MK> {
fn noop() -> Self {
ConsensusResult {
units: Vec::new(),
alerts: Vec::new(),
messages: Vec::new(),
}
}
}

impl<UFH, MK> Consensus<UFH, MK>
where
UFH: UnitFinalizationHandler,
MK: MultiKeychain,
{
/// Create a new Consensus.
pub fn new(
keychain: MK,
validator: Validator<MK>,
finalization_handler: UFH,
delay_config: DelayConfig,
) -> Self {
let n_members = keychain.node_count();
let index = keychain.index();
Consensus {
store: UnitStore::new(n_members),
dag: Dag::new(validator),
responder: Responder::new(keychain),
ordering: Ordering::new(finalization_handler),
task_manager: TaskManager::new(index, n_members, delay_config),
}
}

fn handle_dag_result(
&mut self,
result: DagResult<UFH::Hasher, UFH::Data, MK>,
) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> {
let DagResult {
units,
alerts,
requests,
} = result;
for request in requests {
self.task_manager.add_request(request);
}
let messages = self.trigger_tasks();
ConsensusResult {
units,
alerts,
messages,
}
}

/// Process a unit received (usually) from the network.
pub fn process_incoming_unit(
&mut self,
unit: UncheckedSignedUnit<UFH::Hasher, UFH::Data, MK::Signature>,
) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> {
let result = self.dag.add_unit(unit, &self.store);
self.handle_dag_result(result)
}

/// Process a request received from the network.
pub fn process_request(
&mut self,
request: ReconstructionRequest<UFH::Hasher>,
node_id: NodeIndex,
) -> Option<AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature>> {
match self.responder.handle_request(request, &self.store) {
Ok(response) => Some(Addressed::addressed_to(response.into(), node_id)),
Err(err) => {
debug!(target: LOG_TARGET, "Not answering request from node {:?}: {}.", node_id, err);
None
}
}
}

/// Process a parents response.
pub fn process_parents(
&mut self,
u_hash: <UFH::Hasher as Hasher>::Hash,
parents: Vec<UncheckedSignedUnit<UFH::Hasher, UFH::Data, MK::Signature>>,
) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> {
if self.store.unit(&u_hash).is_some() {
trace!(target: LOG_TARGET, "We got parents response but already imported the unit.");
return ConsensusResult::noop();
}
let result = self.dag.add_parents(u_hash, parents, &self.store);
self.handle_dag_result(result)
}

/// Process a newest unit request.
pub fn process_newest_unit_request(
&mut self,
salt: Salt,
node_id: NodeIndex,
) -> AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature> {
Addressed::addressed_to(
self.responder
.handle_newest_unit_request(node_id, salt, &self.store)
.into(),
node_id,
)
}

/// Process a forking notification.
pub fn process_forking_notification(
&mut self,
notification: ForkingNotification<UFH::Hasher, UFH::Data, MK::Signature>,
) -> ConsensusResult<UFH::Hasher, UFH::Data, MK> {
let result = self
.dag
.process_forking_notification(notification, &self.store);
self.handle_dag_result(result)
}

/// What to do once a unit has been securely backed up on disk.
pub fn on_unit_backup_saved(
&mut self,
unit: DagUnit<UFH::Hasher, UFH::Data, MK>,
) -> Option<AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature>> {
let unit_hash = unit.hash();
self.store.insert(unit.clone());
self.dag.finished_processing(&unit_hash);
self.ordering.add_unit(unit.clone());
self.task_manager.add_unit(&unit)
}

/// When should `trigger_tasks` be called next.
pub fn next_tick(&self) -> Duration {
self.task_manager.next_tick()
}

/// Trigger all the ready tasks and get all the messages that should be sent now.
pub fn trigger_tasks(
&mut self,
) -> Vec<AddressedDisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature>> {
self.task_manager
.trigger_tasks(&self.store, self.dag.processing_units())
}

/// The status of the consensus handler, for logging purposes.
pub fn status(&self) -> Status<UFH::Hasher> {
Status {
dag_status: self.dag.status(),
store_status: self.store.status(),
task_manager_status: self.task_manager.status(),
}
}
}
21 changes: 13 additions & 8 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use crate::{
alerts::{Handler as AlertHandler, Service as AlertService, IO as AlertIO},
backup::{BackupLoader, BackupSaver},
collection::initial_unit_collection,
consensus::service::{Config as ServiceConfig, Service},
consensus::{
handler::Consensus,
service::{Service, IO as ConsensusIO},
},
creation, handle_task_termination,
interface::LocalIO,
network::{Hub as NetworkHub, NetworkData},
Expand All @@ -16,6 +19,7 @@ use futures::{
};
use log::{debug, error, info};

mod handler;
mod service;

const LOG_TARGET: &str = "AlephBFT-consensus";
Expand Down Expand Up @@ -185,13 +189,16 @@ pub async fn run_session<
pin_mut!(starting_round_handle);
debug!(target: LOG_TARGET, "Initial unit collection spawned.");

//TODO: just put the Service here
debug!(target: LOG_TARGET, "Spawning consensus service.");
let consensus = Consensus::new(
keychain.clone(),
validator.clone(),
finalization_handler,
config.delay_config().clone(),
);
let service_handle = spawn_handle
.spawn_essential("consensus/service", {
let service_config = ServiceConfig {
delay_config: config.delay_config().clone(),
finalization_handler,
let consensus_io = ConsensusIO {
backup_units_for_saver,
backup_units_from_saver,
alerts_for_alerter,
Expand All @@ -203,9 +210,7 @@ pub async fn run_session<
new_units_from_creator,
};
let service_terminator = terminator.add_offspring_connection("service");
let validator = validator.clone();
let keychain = keychain.clone();
let service = Service::new(service_config, keychain, validator);
let service = Service::new(consensus, consensus_io);

async move { service.run(loaded_units, service_terminator).await }
})
Expand Down
Loading
Loading