Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions rs/artifact_pool/src/dkg_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,20 @@ pub struct DkgPoolImpl {
const POOL_DKG: &str = "dkg";

impl DkgPoolImpl {
/// Instantiates a new DKG pool from the time source.
pub fn new(metrics_registry: MetricsRegistry, log: ReplicaLogger) -> Self {
/// Instantiates a new DKG pool with the given start height.
pub fn new(
metrics_registry: MetricsRegistry,
log: ReplicaLogger,
current_start_height: Height,
) -> Self {
Self {
invalidated_artifacts: metrics_registry.int_counter(
"dkg_invalidated_artifacts",
"The number of invalidated DKG artifacts",
),
validated: PoolSection::new(metrics_registry.clone(), POOL_DKG, POOL_TYPE_VALIDATED),
unvalidated: PoolSection::new(metrics_registry, POOL_DKG, POOL_TYPE_UNVALIDATED),
current_start_height: Height::from(1),
current_start_height,
log,
}
}
Expand Down Expand Up @@ -221,7 +225,7 @@ mod test {

#[test]
fn test_dkg_pool_insert_and_remove() {
let mut pool = DkgPoolImpl::new(MetricsRegistry::new(), no_op_logger());
let mut pool = DkgPoolImpl::new(MetricsRegistry::new(), no_op_logger(), Height::from(0));
let message = make_message(Height::from(30), node_test_id(1));
let id = DkgMessageId::from(&message);

Expand All @@ -243,7 +247,11 @@ mod test {
// create 2 DKGs for the same subnet
let current_dkg_id_start_height = Height::from(30);
let last_dkg_id_start_height = Height::from(10);
let mut pool = DkgPoolImpl::new(MetricsRegistry::new(), no_op_logger());
let mut pool = DkgPoolImpl::new(
MetricsRegistry::new(),
no_op_logger(),
current_dkg_id_start_height,
);
// add two validated messages, one for every DKG instance
let result = pool.apply(
[
Expand Down Expand Up @@ -310,7 +318,7 @@ mod test {

#[test]
fn test_dkg_pool_filter_by_age() {
let mut pool = DkgPoolImpl::new(MetricsRegistry::new(), no_op_logger());
let mut pool = DkgPoolImpl::new(MetricsRegistry::new(), no_op_logger(), Height::from(0));

// 200 sec old
let msg = make_message(Height::from(1), node_test_id(1));
Expand Down
72 changes: 43 additions & 29 deletions rs/consensus/dkg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ mod tests {
use ic_crypto_test_utils_ni_dkg::dummy_dealing;
use ic_interfaces::{
consensus_pool::ConsensusPool,
p2p::consensus::{MutablePool, UnvalidatedArtifact},
p2p::consensus::{BouncerFactory, BouncerValue, MutablePool, UnvalidatedArtifact},
};
use ic_interfaces_registry::RegistryClient;
use ic_management_canister_types_private::{MasterPublicKeyId, VetKdCurve, VetKdKeyId};
Expand All @@ -420,6 +420,7 @@ mod tests {
use ic_types::{
RegistryVersion, ReplicaVersion,
consensus::{Block, BlockPayload},
crypto::CryptoHash,
crypto::threshold_sig::ni_dkg::{
NiDkgId, NiDkgMasterPublicKeyId, NiDkgTargetId, NiDkgTargetSubnet,
},
Expand All @@ -428,6 +429,32 @@ mod tests {
use std::{collections::BTreeSet, convert::TryFrom};
use utils::{tags_iter, vetkd_key_ids_for_subnet};

#[test]
fn test_dkg_bouncer() {
with_test_replica_logger(|logger| {
let mut dkg_pool = DkgPoolImpl::new(MetricsRegistry::new(), logger, Height::from(500));
let bouncer_factory = DkgBouncer::new(&MetricsRegistry::new());
let bouncer = bouncer_factory.new_bouncer(&dkg_pool);

let height_500_id = DkgMessageId {
hash: CryptoHash(vec![0]).into(),
height: Height::from(500),
};
let height_1000_id = DkgMessageId {
hash: CryptoHash(vec![1]).into(),
height: Height::from(1000),
};
assert_eq!(bouncer(&height_500_id), BouncerValue::Wants);
assert_eq!(bouncer(&height_1000_id), BouncerValue::MaybeWantsLater);

dkg_pool.apply(vec![ChangeAction::Purge(Height::from(1000))]);
let bouncer = bouncer_factory.new_bouncer(&dkg_pool);

assert_eq!(bouncer(&height_1000_id), BouncerValue::Wants);
assert_eq!(bouncer(&height_500_id), BouncerValue::Unwanted);
});
}

#[test]
// In this test we test the creation of dealing payloads.
fn test_create_dealings_payload() {
Expand Down Expand Up @@ -534,7 +561,8 @@ mod tests {
MetricsRegistry::new(),
logger.clone(),
);
let dkg_pool_2 = DkgPoolImpl::new(MetricsRegistry::new(), logger);
let start_height = dkg_pool.read().unwrap().get_current_start_height();
let dkg_pool_2 = DkgPoolImpl::new(MetricsRegistry::new(), logger, start_height);
sync_dkg_key_manager(&dkg_key_manager_2, &pool);
let change_set = dkg_2.on_state_change(&dkg_pool_2);
assert_eq!(change_set.len(), 3);
Expand Down Expand Up @@ -593,7 +621,8 @@ mod tests {
let Dependencies {
mut pool, crypto, ..
} = dependencies(pool_config.clone(), 2);
let mut dkg_pool = DkgPoolImpl::new(MetricsRegistry::new(), logger.clone());
let mut dkg_pool =
DkgPoolImpl::new(MetricsRegistry::new(), logger.clone(), Height::from(0));
// Let's check that replica 3, who's not a dealer, does not produce dealings.
let dkg_key_manager =
new_dkg_key_manager(crypto.clone(), logger.clone(), &PoolReader::new(&pool));
Expand Down Expand Up @@ -716,7 +745,8 @@ mod tests {
// We did not advance the consensus pool yet. The configs for remote transcripts
// are not added to a summary block yet. That's why we see two dealings for
// local thresholds.
let mut dkg_pool = DkgPoolImpl::new(MetricsRegistry::new(), logger);
let mut dkg_pool =
DkgPoolImpl::new(MetricsRegistry::new(), logger, Height::from(0));
sync_dkg_key_manager(&dkg_key_manager, &pool);
let change_set = dkg.on_state_change(&dkg_pool);
match &change_set.as_slice() {
Expand Down Expand Up @@ -870,8 +900,10 @@ mod tests {
let consensus_pool_2 = dependencies(pool_config_2, 2).pool;

with_test_replica_logger(|logger| {
let dkg_pool_1 = DkgPoolImpl::new(MetricsRegistry::new(), logger.clone());
let dkg_pool_2 = DkgPoolImpl::new(MetricsRegistry::new(), logger.clone());
let dkg_pool_1 =
DkgPoolImpl::new(MetricsRegistry::new(), logger.clone(), Height::from(0));
let dkg_pool_2 =
DkgPoolImpl::new(MetricsRegistry::new(), logger.clone(), Height::from(0));

// We instantiate the DKG component for node Id = 1 and Id = 2.
let dkg_key_manager_1 = new_dkg_key_manager(
Expand Down Expand Up @@ -1397,18 +1429,11 @@ mod tests {
MetricsRegistry::new(),
logger.clone(),
);
let mut dkg_pool_1 = DkgPoolImpl::new(MetricsRegistry::new(), logger.clone());
let mut dkg_pool_2 = DkgPoolImpl::new(MetricsRegistry::new(), logger);

// First we expect a new purge.
let change_set = dkg_1.on_state_change(&dkg_pool_1);
match &change_set.as_slice() {
&[ChangeAction::Purge(purge_height)]
if *purge_height == Height::from(2 * (dkg_interval_length + 1)) => {}
val => panic!("Unexpected change set: {:?}", val),
};
dkg_pool_1.apply(change_set);
sync_dkg_key_manager(&dgk_key_manager_1, &pool_1);
let start_height = pool_1.get_cache().summary_block().height;
let dkg_pool_1 =
DkgPoolImpl::new(MetricsRegistry::new(), logger.clone(), start_height);
let mut dkg_pool_2 =
DkgPoolImpl::new(MetricsRegistry::new(), logger, start_height);

// The last summary contains two local and two remote configs.
// dkg.on_state_change should create 4 dealings for those
Expand Down Expand Up @@ -1454,17 +1479,6 @@ mod tests {

assert_eq!(dkg_pool_2.get_unvalidated().count(), 4);

// First we expect a new purge from dkg_2 as well.
let change_set = dkg_2.on_state_change(&dkg_pool_2);
match &change_set.as_slice() {
&[ChangeAction::Purge(purge_height)]
if *purge_height == Height::from(2 * (dkg_interval_length + 1)) => {}
val => panic!("Unexpected change set: {:?}", val),
};
dkg_pool_2.apply(change_set);

assert_eq!(dkg_pool_2.get_unvalidated().count(), 4);

// The pool contains two local and two remote dealings.
// dkg.on_state_change should move these 4 dealings
// into the validated pool.
Expand Down
3 changes: 2 additions & 1 deletion rs/consensus/dkg/src/payload_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,8 @@ mod tests {
MetricsRegistry::new(),
no_op_logger(),
);
let mut dkg_pool = DkgPoolImpl::new(MetricsRegistry::new(), no_op_logger());
let mut dkg_pool =
DkgPoolImpl::new(MetricsRegistry::new(), no_op_logger(), dkg_summary.height);
// Update start height
let start_height = Height::new(10);
dkg_pool.apply(vec![ChangeAction::Purge(start_height)]);
Expand Down
1 change: 1 addition & 0 deletions rs/consensus/mocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ pub fn dependencies_with_subnet_records_with_raw_state_manager(
let dkg_pool = Arc::new(RwLock::new(DkgPoolImpl::new(
ic_metrics::MetricsRegistry::new(),
log.clone(),
Height::from(0),
)));
let idkg_pool = Arc::new(RwLock::new(IDkgPoolImpl::new(
replica_config.node_id,
Expand Down
1 change: 1 addition & 0 deletions rs/consensus/src/consensus/block_maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,7 @@ mod tests {
let dkg_pool = Arc::new(RwLock::new(ic_artifact_pool::dkg_pool::DkgPoolImpl::new(
MetricsRegistry::new(),
no_op_logger(),
Height::from(0),
)));

let idkg_pool = Arc::new(RwLock::new(ic_artifact_pool::idkg_pool::IDkgPoolImpl::new(
Expand Down
5 changes: 3 additions & 2 deletions rs/consensus/tests/framework/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use ic_types::{
NodeId, SubnetId,
artifact::IdentifiableArtifact,
consensus::{
CatchUpPackage, ConsensusMessage, certification::CertificationMessage,
CatchUpPackage, ConsensusMessage, HasHeight, certification::CertificationMessage,
dkg::Message as DkgMessage, idkg::IDkgMessage,
},
replica_config::ReplicaConfig,
Expand Down Expand Up @@ -201,6 +201,8 @@ impl ConsensusDependencies {
// let state_manager_arc = Rc::new(state_manager);
let metrics_registry = MetricsRegistry::new();

let dkg_pool =
dkg_pool::DkgPoolImpl::new(metrics_registry.clone(), no_op_logger(), cup.height());
let consensus_pool = Arc::new(RwLock::new(ConsensusPoolImpl::new(
replica_config.node_id,
replica_config.subnet_id,
Expand All @@ -210,7 +212,6 @@ impl ConsensusDependencies {
no_op_logger(),
time_source,
)));
let dkg_pool = dkg_pool::DkgPoolImpl::new(metrics_registry.clone(), no_op_logger());
let idkg_pool = idkg_pool::IDkgPoolImpl::new(
replica_config.node_id,
pool_config,
Expand Down
9 changes: 5 additions & 4 deletions rs/consensus/tests/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ fn consensus_produces_expected_batches() {
let fake_crypto = Arc::new(fake_crypto);
let metrics_registry = MetricsRegistry::new();
let time_source = FastForwardTimeSource::new();
let dkg_pool = Arc::new(RwLock::new(dkg_pool::DkgPoolImpl::new(
metrics_registry.clone(),
no_op_logger(),
)));
let idkg_pool = Arc::new(RwLock::new(idkg_pool::IDkgPoolImpl::new(
replica_config.node_id,
pool_config.clone(),
Expand Down Expand Up @@ -138,6 +134,11 @@ fn consensus_produces_expected_batches() {
cup_contents.version,
)
.expect("Failed to get DKG summary from CUP contents");
let dkg_pool = Arc::new(RwLock::new(dkg_pool::DkgPoolImpl::new(
metrics_registry.clone(),
no_op_logger(),
summary.height,
)));
let consensus_pool = Arc::new(RwLock::new(consensus_pool::ConsensusPoolImpl::new(
node_id,
subnet_id,
Expand Down
6 changes: 5 additions & 1 deletion rs/replay/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ impl ReplayValidator {
subnet_id,
));
let time_source = Arc::new(SysTimeSource::new());
let dkg_pool = RwLock::new(DkgPoolImpl::new(metrics_registry.clone(), log.clone()));
let dkg_pool = RwLock::new(DkgPoolImpl::new(
metrics_registry.clone(),
log.clone(),
Height::from(0),
));
let node_id = NodeId::from(PrincipalId::new_node_test_id(1));
let replica_cfg = ReplicaConfig::new(node_id, subnet_id);
let thread_pool = ThreadPoolBuilder::new()
Expand Down
1 change: 1 addition & 0 deletions rs/replica/setup_ic_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl ArtifactPools {
let dkg_pool = Arc::new(RwLock::new(DkgPoolImpl::new(
metrics_registry.clone(),
log.clone(),
catch_up_package.height(),
)));
let https_outcalls_pool = Arc::new(RwLock::new(CanisterHttpPoolImpl::new(
metrics_registry.clone(),
Expand Down
23 changes: 12 additions & 11 deletions rs/test_utilities/artifact_pool/src/consensus_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,17 @@ impl TestConsensusPool {
state_manager: Arc<dyn StateManager<State = ReplicatedState>>,
dkg_pool: Option<Arc<RwLock<DkgPoolImpl>>>,
) -> Self {
let cup_contents = registry_client
.get_cup_contents(subnet_id, registry_client.get_latest_version())
.expect("Failed to retreive the DKG transcripts from registry");
let summary = get_dkg_summary_from_cup_contents(
cup_contents.value.expect("Missing CUP contents"),
subnet_id,
&*registry_client,
cup_contents.version,
)
.expect("Failed to get DKG summary from CUP contents");

let dkg_payload_builder = Box::new(dkg_payload_builder_fn(
subnet_id,
registry_client.clone(),
Expand All @@ -189,22 +200,12 @@ impl TestConsensusPool {
ic_artifact_pool::dkg_pool::DkgPoolImpl::new(
ic_metrics::MetricsRegistry::new(),
no_op_logger(),
summary.height,
),
))
}),
));

let cup_contents = registry_client
.get_cup_contents(subnet_id, registry_client.get_latest_version())
.expect("Failed to retreive the DKG transcripts from registry");
let summary = get_dkg_summary_from_cup_contents(
cup_contents.value.expect("Missing CUP contents"),
subnet_id,
&*registry_client,
cup_contents.version,
)
.expect("Failed to get DKG summary from CUP contents");

let pool = ConsensusPoolImpl::new(
node_id,
subnet_id,
Expand Down
Loading