Skip to content

Commit 54d447c

Browse files
committed
feat(consensus):Utilize priority function in test framework CON-275
1 parent 1aa7f0b commit 54d447c

File tree

5 files changed

+141
-33
lines changed

5 files changed

+141
-33
lines changed

rs/consensus/tests/framework/driver.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use ic_artifact_pool::{
33
certification_pool::CertificationPoolImpl, consensus_pool::ConsensusPoolImpl, dkg_pool,
44
};
55
use ic_config::artifact_pool::ArtifactPoolConfig;
6-
use ic_consensus::consensus::ConsensusImpl;
6+
use ic_consensus::consensus::{ConsensusGossipImpl, ConsensusImpl};
77
use ic_interfaces::{
88
artifact_pool::{ChangeSetProducer, MutablePool},
99
certification,
@@ -27,6 +27,7 @@ impl<'a> ConsensusDriver<'a> {
2727
node_id: NodeId,
2828
pool_config: ArtifactPoolConfig,
2929
consensus: ConsensusImpl,
30+
consensus_gossip: ConsensusGossipImpl,
3031
dkg: ic_consensus::dkg::DkgImpl,
3132
certifier: Box<
3233
dyn ChangeSetProducer<CertificationPoolImpl, ChangeSet = certification::ChangeSet> + 'a,
@@ -42,15 +43,19 @@ impl<'a> ConsensusDriver<'a> {
4243
logger.clone(),
4344
metrics_registry,
4445
)));
46+
let consensus_priority =
47+
PriorityFnState::new(&consensus_gossip, &*consensus_pool.read().unwrap());
4548
ConsensusDriver {
4649
consensus,
50+
consensus_gossip,
4751
dkg,
4852
certifier,
4953
logger,
5054
consensus_pool,
5155
certification_pool,
5256
ingress_pool,
5357
dkg_pool,
58+
consensus_priority,
5459
}
5560
}
5661

rs/consensus/tests/framework/execution.rs

Lines changed: 62 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@ use ic_interfaces::{
55
};
66
use ic_logger::{trace, ReplicaLogger};
77
use ic_test_utilities::types::ids::node_test_id;
8-
use ic_types::time::Time;
8+
use ic_types::{artifact::Priority, time::Time};
99
use rand::seq::SliceRandom;
1010
use std::time::Duration;
1111

1212
fn execute_instance<'a, 'b>(
1313
instance: &'b ConsensusInstance<'a>,
1414
time_source: &dyn TimeSource,
15+
use_priority_fn: bool,
1516
logger: &ReplicaLogger,
1617
) -> Option<Time> {
1718
let mut in_queue = instance.in_queue.borrow_mut();
@@ -33,6 +34,24 @@ fn execute_instance<'a, 'b>(
3334
match inp {
3435
Input::Message(x) => match x.message {
3536
InputMessage::Consensus(msg) => {
37+
if use_priority_fn {
38+
match instance
39+
.driver
40+
.consensus_priority
41+
.borrow()
42+
.get_priority(&msg)
43+
{
44+
Priority::Drop => return Some(timestamp),
45+
Priority::Stash | Priority::Later => {
46+
instance
47+
.buffered
48+
.borrow_mut()
49+
.push(InputMessage::Consensus(msg));
50+
return Some(timestamp);
51+
}
52+
Priority::Fetch | Priority::FetchNow => (),
53+
};
54+
}
3655
let mut pool = instance.driver.consensus_pool.write().unwrap();
3756
pool.insert(UnvalidatedArtifact {
3857
message: msg,
@@ -58,9 +77,25 @@ fn execute_instance<'a, 'b>(
5877
}
5978
},
6079
// Repeat the polling
61-
Input::TimerExpired(x) => in_queue.push(Input::TimerExpired(
62-
x + Duration::from_millis(POLLING_INTERVAL),
63-
)),
80+
Input::TimerExpired(x) => {
81+
if use_priority_fn {
82+
let mut priority = instance.driver.consensus_priority.borrow_mut();
83+
if priority.last_updated + PRIORITY_FN_REFRESH_INTERVAL < timestamp {
84+
priority.refresh(
85+
&instance.driver.consensus_gossip,
86+
&*instance.driver.consensus_pool.read().unwrap(),
87+
timestamp,
88+
);
89+
}
90+
let mut buffered = instance.buffered.borrow_mut();
91+
for message in buffered.drain(..) {
92+
in_queue.push(Input::Message(Message { message, timestamp }));
93+
}
94+
}
95+
in_queue.push(Input::TimerExpired(
96+
x + Duration::from_millis(POLLING_INTERVAL),
97+
));
98+
}
6499
}
65100
// Move new messages into out_queue.
66101
for message in instance.driver.step(time_source) {
@@ -76,11 +111,13 @@ fn execute_instance<'a, 'b>(
76111
/// timestamp(min(i)) value globally. This ensures that all input messages are
77112
/// always executed in order, for all nodes.
78113
#[derive(Debug)]
79-
pub struct GlobalMessage;
114+
pub struct GlobalMessage {
115+
use_priority_fn: bool,
116+
}
80117

81118
impl GlobalMessage {
82-
pub fn new() -> Box<GlobalMessage> {
83-
Box::new(GlobalMessage)
119+
pub fn new(use_priority_fn: bool) -> Box<GlobalMessage> {
120+
Box::new(GlobalMessage { use_priority_fn })
84121
}
85122
}
86123

@@ -95,16 +132,20 @@ impl ExecutionStrategy for GlobalMessage {
95132
let t_j = j.in_queue.borrow().peek().map(|x| x.timestamp());
96133
compare_timestamp(t_i, t_j)
97134
})
98-
.and_then(|instance| execute_instance(instance, runner.time_source(), logger))
135+
.and_then(|instance| {
136+
execute_instance(instance, runner.time_source(), self.use_priority_fn, logger)
137+
})
99138
}
100139
}
101140

102141
#[derive(Debug)]
103-
pub struct RandomExecute;
142+
pub struct RandomExecute {
143+
use_priority_fn: bool,
144+
}
104145

105146
impl RandomExecute {
106-
pub fn new() -> Box<RandomExecute> {
107-
Box::new(RandomExecute)
147+
pub fn new(use_priority_fn: bool) -> Box<RandomExecute> {
148+
Box::new(RandomExecute { use_priority_fn })
108149
}
109150
}
110151

@@ -115,7 +156,8 @@ impl ExecutionStrategy for RandomExecute {
115156
let mut rng = runner.rng();
116157
instances.shuffle(&mut *rng);
117158
while let Some(instance) = instances.pop() {
118-
let result = execute_instance(instance, runner.time_source(), logger);
159+
let result =
160+
execute_instance(instance, runner.time_source(), self.use_priority_fn, logger);
119161
if result.is_some() {
120162
return result;
121163
}
@@ -125,11 +167,13 @@ impl ExecutionStrategy for RandomExecute {
125167
}
126168

127169
#[derive(Debug)]
128-
pub struct GlobalClock;
170+
pub struct GlobalClock {
171+
use_priority_fn: bool,
172+
}
129173

130174
impl GlobalClock {
131-
pub fn new() -> Box<GlobalClock> {
132-
Box::new(GlobalClock)
175+
pub fn new(use_priority_fn: bool) -> Box<GlobalClock> {
176+
Box::new(GlobalClock { use_priority_fn })
133177
}
134178
}
135179

@@ -144,6 +188,8 @@ impl ExecutionStrategy for GlobalClock {
144188
let t_j = j.in_queue.borrow().peek().map(|_| *j.clock.borrow());
145189
compare_timestamp(t_i, t_j)
146190
})
147-
.and_then(|instance| execute_instance(instance, runner.time_source(), logger))
191+
.and_then(|instance| {
192+
execute_instance(instance, runner.time_source(), self.use_priority_fn, logger)
193+
})
148194
}
149195
}

rs/consensus/tests/framework/runner.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use ic_config::artifact_pool::ArtifactPoolConfig;
55
use ic_consensus::consensus::dkg_key_manager::DkgKeyManager;
66
use ic_consensus::{
77
certification::{CertificationCrypto, CertifierImpl},
8-
consensus::ConsensusImpl,
98
dkg,
109
};
1110
use ic_consensus_utils::crypto::ConsensusCrypto;
@@ -141,7 +140,7 @@ impl<'a> ConsensusRunner<'a> {
141140
let fake_local_store_certified_time_reader =
142141
Arc::new(FakeLocalStoreCertifiedTimeReader::new(self.time.clone()));
143142

144-
let consensus = ConsensusImpl::new(
143+
let (consensus, consensus_gossip) = ic_consensus::consensus::setup(
145144
deps.replica_config.clone(),
146145
Default::default(),
147146
Arc::clone(&deps.registry_client),
@@ -157,11 +156,11 @@ impl<'a> ConsensusRunner<'a> {
157156
deps.message_routing.clone(),
158157
deps.state_manager.clone(),
159158
Arc::clone(&self.time) as Arc<_>,
160-
Duration::from_secs(0),
161159
MaliciousFlags::default(),
162160
deps.metrics_registry.clone(),
163161
replica_logger.clone(),
164162
fake_local_store_certified_time_reader,
163+
0,
165164
);
166165
let dkg = dkg::DkgImpl::new(
167166
deps.replica_config.node_id,
@@ -188,12 +187,14 @@ impl<'a> ConsensusRunner<'a> {
188187
node_id,
189188
deps,
190189
in_queue,
190+
buffered: Default::default(),
191191
out_queue: Default::default(),
192192

193193
driver: ConsensusDriver::new(
194194
node_id,
195195
pool_config,
196196
consensus,
197+
consensus_gossip,
197198
dkg,
198199
Box::new(certifier),
199200
deps.consensus_pool.clone(),
@@ -271,7 +272,8 @@ impl Default for ConsensusRunnerConfig {
271272
num_nodes: 10,
272273
num_rounds: 20,
273274
degree: 9,
274-
execution: GlobalMessage::new(),
275+
use_priority_fn: false,
276+
execution: GlobalMessage::new(false),
275277
delivery: Sequential::new(),
276278
}
277279
}
@@ -325,6 +327,7 @@ impl ConsensusRunnerConfig {
325327
config.num_rounds = rng.gen_range(10..101);
326328
config.degree = rng
327329
.gen_range(std::cmp::min(5, config.num_nodes / 2)..std::cmp::min(config.num_nodes, 20));
330+
config.use_priority_fn = rng.gen_bool(0.5);
328331
config.reset_strategies();
329332
config
330333
}
@@ -340,9 +343,9 @@ impl ConsensusRunnerConfig {
340343
fn strategies<R: Rng>(&self, rng: &mut R) -> Strategies {
341344
(
342345
vec![
343-
GlobalMessage::new(),
344-
GlobalClock::new(),
345-
RandomExecute::new(),
346+
GlobalMessage::new(self.use_priority_fn),
347+
GlobalClock::new(self.use_priority_fn),
348+
RandomExecute::new(self.use_priority_fn),
346349
],
347350
vec![
348351
Sequential::new(),
@@ -353,8 +356,8 @@ impl ConsensusRunnerConfig {
353356
}
354357

355358
/// Parse and update configuration from environment: NUM_NODES,
356-
/// NUM_ROUNDS, MAX_DELTA, DEGREE, EXECUTION and DELIVERY (except
357-
/// RANDOM_SEED, which should be used when first creating the config).
359+
/// NUM_ROUNDS, MAX_DELTA, DEGREE, USE_PRIORITY_FN, EXECUTION and DELIVERY
360+
/// (except RANDOM_SEED, which should be used when first creating the config).
358361
/// Return the updated config if parsing is successful, or an error message
359362
/// in string otherwise.
360363
pub fn parse_extra_config(mut self) -> Result<Self, String> {
@@ -376,6 +379,11 @@ impl ConsensusRunnerConfig {
376379
.parse()
377380
.map_err(|_| "DEGREE must be an unsigned integer")?
378381
}
382+
"use_priority_fn" => {
383+
self.use_priority_fn = value
384+
.parse()
385+
.map_err(|_| "USE_PRIORITY_FN must be either true or false")?
386+
}
379387
_ => (),
380388
}
381389
}

0 commit comments

Comments
 (0)