Skip to content

Commit 0503b18

Browse files
authored
jwk #2: ensure jwk txns are expected in consensus (aptos-labs#11855)
* jwk types update * update * update * jwk txn and execution * update * consensus ensure jwk txns are expected * update * fix dummy * update * update * update * update * update * update * remove dummy txns * check voting power than verify signature * fix warnings * debug * debug * debug * debug * debug * debug * debug * finish debug * fmt
1 parent 793563f commit 0503b18

File tree

13 files changed

+203
-23
lines changed

13 files changed

+203
-23
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

consensus/safety-rules/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ repository = { workspace = true }
1313
rust-version = { workspace = true }
1414

1515
[dependencies]
16+
anyhow = { workspace = true }
1617
aptos-config = { workspace = true }
1718
aptos-consensus-types = { workspace = true }
1819
aptos-crypto = { workspace = true }

consensus/safety-rules/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ mod process;
1414
mod remote_service;
1515
mod safety_rules;
1616
mod safety_rules_2chain;
17-
mod safety_rules_manager;
17+
pub mod safety_rules_manager;
1818
mod serializer;
1919
mod t_safety_rules;
2020
mod thread;

consensus/safety-rules/src/safety_rules_manager.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use crate::{
1111
thread::ThreadService,
1212
SafetyRules, TSafetyRules,
1313
};
14+
use anyhow::anyhow;
1415
use aptos_config::config::{InitialSafetyRulesConfig, SafetyRulesConfig, SafetyRulesService};
16+
use aptos_crypto::bls12381::PrivateKey;
17+
use aptos_global_constants::CONSENSUS_KEY;
1518
use aptos_infallible::RwLock;
1619
use aptos_secure_storage::{KVStorage, Storage};
1720
use std::{net::SocketAddr, sync::Arc};
@@ -73,6 +76,17 @@ pub fn storage(config: &SafetyRulesConfig) -> PersistentSafetyStorage {
7376
}
7477
}
7578

79+
pub fn load_consensus_key_from_secure_storage(
80+
config: &SafetyRulesConfig,
81+
) -> anyhow::Result<PrivateKey> {
82+
let storage: Storage = (&config.backend).into();
83+
let storage = Box::new(storage);
84+
let response = storage.get::<PrivateKey>(CONSENSUS_KEY).map_err(|e| {
85+
anyhow!("load_consensus_key_from_secure_storage failed with storage read error: {e}")
86+
})?;
87+
Ok(response.value)
88+
}
89+
7690
enum SafetyRulesWrapper {
7791
Local(Arc<RwLock<SafetyRules>>),
7892
Process(ProcessService),

consensus/src/dag/bootstrap.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use aptos_logger::{debug, info};
4646
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
4747
use aptos_types::{
4848
epoch_state::EpochState,
49-
on_chain_config::{DagConsensusConfigV1, ValidatorTxnConfig},
49+
on_chain_config::{DagConsensusConfigV1, FeatureFlag, Features, ValidatorTxnConfig},
5050
validator_signer::ValidatorSigner,
5151
};
5252
use async_trait::async_trait;
@@ -330,6 +330,7 @@ pub struct DagBootstrapper {
330330
quorum_store_enabled: bool,
331331
vtxn_config: ValidatorTxnConfig,
332332
executor: BoundedExecutor,
333+
features: Features,
333334
}
334335

335336
impl DagBootstrapper {
@@ -352,6 +353,7 @@ impl DagBootstrapper {
352353
quorum_store_enabled: bool,
353354
vtxn_config: ValidatorTxnConfig,
354355
executor: BoundedExecutor,
356+
features: Features,
355357
) -> Self {
356358
Self {
357359
self_peer,
@@ -371,6 +373,7 @@ impl DagBootstrapper {
371373
quorum_store_enabled,
372374
vtxn_config,
373375
executor,
376+
features,
374377
}
375378
}
376379

@@ -557,6 +560,7 @@ impl DagBootstrapper {
557560
fetch_requester,
558561
self.config.node_payload_config.clone(),
559562
self.vtxn_config.clone(),
563+
self.features.clone(),
560564
);
561565
let fetch_handler = FetchRequestHandler::new(dag_store.clone(), self.epoch_state.clone());
562566

@@ -647,6 +651,8 @@ pub(super) fn bootstrap_dag_for_test(
647651
UnboundedReceiver<OrderedBlocks>,
648652
) {
649653
let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded();
654+
let mut features = Features::default();
655+
features.enable(FeatureFlag::RECONFIGURE_WITH_DKG);
650656
let bootstraper = DagBootstrapper::new(
651657
self_peer,
652658
DagConsensusConfig::default(),
@@ -665,6 +671,7 @@ pub(super) fn bootstrap_dag_for_test(
665671
false,
666672
ValidatorTxnConfig::default_enabled(),
667673
BoundedExecutor::new(2, Handle::current()),
674+
features,
668675
);
669676

670677
let (_base_state, handler, fetch_service) = bootstraper.full_bootstrap();

consensus/src/dag/rb_handler.rs

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,32 @@
11
// Copyright © Aptos Foundation
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use crate::dag::{
5-
dag_fetcher::TFetchRequester,
6-
dag_network::RpcHandler,
7-
dag_store::Dag,
8-
errors::NodeBroadcastHandleError,
9-
observability::{
10-
logging::{LogEvent, LogSchema},
11-
tracing::{observe_node, NodeStage},
4+
use crate::{
5+
dag::{
6+
dag_fetcher::TFetchRequester,
7+
dag_network::RpcHandler,
8+
dag_store::Dag,
9+
errors::NodeBroadcastHandleError,
10+
observability::{
11+
logging::{LogEvent, LogSchema},
12+
tracing::{observe_node, NodeStage},
13+
},
14+
storage::DAGStorage,
15+
types::{Node, NodeCertificate, Vote},
16+
NodeId,
1217
},
13-
storage::DAGStorage,
14-
types::{Node, NodeCertificate, Vote},
15-
NodeId,
18+
util::is_vtxn_expected,
1619
};
1720
use anyhow::{bail, ensure};
1821
use aptos_config::config::DagPayloadConfig;
1922
use aptos_consensus_types::common::{Author, Round};
2023
use aptos_infallible::RwLock;
2124
use aptos_logger::{debug, error};
2225
use aptos_types::{
23-
epoch_state::EpochState, on_chain_config::ValidatorTxnConfig,
24-
validator_signer::ValidatorSigner, validator_txn::ValidatorTransaction,
26+
epoch_state::EpochState,
27+
on_chain_config::{Features, ValidatorTxnConfig},
28+
validator_signer::ValidatorSigner,
29+
validator_txn::ValidatorTransaction,
2530
};
2631
use async_trait::async_trait;
2732
use std::{collections::BTreeMap, mem, sync::Arc};
@@ -35,6 +40,7 @@ pub(crate) struct NodeBroadcastHandler {
3540
fetch_requester: Arc<dyn TFetchRequester>,
3641
payload_config: DagPayloadConfig,
3742
vtxn_config: ValidatorTxnConfig,
43+
features: Features,
3844
}
3945

4046
impl NodeBroadcastHandler {
@@ -46,6 +52,7 @@ impl NodeBroadcastHandler {
4652
fetch_requester: Arc<dyn TFetchRequester>,
4753
payload_config: DagPayloadConfig,
4854
vtxn_config: ValidatorTxnConfig,
55+
features: Features,
4956
) -> Self {
5057
let epoch = epoch_state.epoch;
5158
let votes_by_round_peer = read_votes_from_storage(&storage, epoch);
@@ -59,6 +66,7 @@ impl NodeBroadcastHandler {
5966
fetch_requester,
6067
payload_config,
6168
vtxn_config,
69+
features,
6270
}
6371
}
6472

@@ -87,6 +95,13 @@ impl NodeBroadcastHandler {
8795
fn validate(&self, node: Node) -> anyhow::Result<Node> {
8896
let num_vtxns = node.validator_txns().len() as u64;
8997
ensure!(num_vtxns <= self.vtxn_config.per_block_limit_txn_count());
98+
for vtxn in node.validator_txns() {
99+
ensure!(
100+
is_vtxn_expected(&self.features, vtxn),
101+
"unexpected validator transaction: {:?}",
102+
vtxn.topic()
103+
);
104+
}
90105
let vtxn_total_bytes = node
91106
.validator_txns()
92107
.iter()

consensus/src/dag/tests/integration_tests.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,6 @@ async fn test_dag_e2e() {
213213
let runtime = consensus_runtime();
214214
let mut playground = NetworkPlayground::new(runtime.handle().clone());
215215
let (signers, validators) = random_validator_verifier(num_nodes, None, false);
216-
217216
let (nodes, mut ordered_node_receivers) = bootstrap_nodes(&mut playground, signers, validators);
218217
for node in nodes {
219218
runtime.spawn(node.start());
@@ -229,7 +228,6 @@ async fn test_dag_e2e() {
229228
}
230229
let first = all_ordered.first().unwrap();
231230
assert_gt!(first.len(), 0, "must order nodes");
232-
debug!("Nodes: {:?}", first);
233231
for a in all_ordered.iter() {
234232
assert_eq!(a.len(), first.len(), "length should match");
235233
assert_eq!(a, first);

consensus/src/dag/tests/rb_handler_tests.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ use crate::dag::{
1717
use aptos_config::config::DagPayloadConfig;
1818
use aptos_infallible::RwLock;
1919
use aptos_types::{
20-
aggregate_signature::PartialSignatures, epoch_state::EpochState,
21-
on_chain_config::ValidatorTxnConfig, validator_verifier::random_validator_verifier,
20+
aggregate_signature::PartialSignatures,
21+
epoch_state::EpochState,
22+
on_chain_config::{Features, ValidatorTxnConfig},
23+
validator_verifier::random_validator_verifier,
2224
};
2325
use claims::{assert_ok, assert_ok_eq};
2426
use futures::executor::block_on;
@@ -68,6 +70,7 @@ async fn test_node_broadcast_receiver_succeed() {
6870
Arc::new(MockFetchRequester {}),
6971
DagPayloadConfig::default(),
7072
ValidatorTxnConfig::default_disabled(),
73+
Features::default(),
7174
);
7275

7376
let expected_result = Vote::new(
@@ -114,6 +117,7 @@ async fn test_node_broadcast_receiver_failure() {
114117
Arc::new(MockFetchRequester {}),
115118
DagPayloadConfig::default(),
116119
ValidatorTxnConfig::default_disabled(),
120+
Features::default(),
117121
)
118122
})
119123
.collect();
@@ -197,6 +201,7 @@ async fn test_node_broadcast_receiver_storage() {
197201
Arc::new(MockFetchRequester {}),
198202
DagPayloadConfig::default(),
199203
ValidatorTxnConfig::default_disabled(),
204+
Features::default(),
200205
);
201206
let sig = rb_receiver.process(node).await.expect("must succeed");
202207

@@ -213,6 +218,7 @@ async fn test_node_broadcast_receiver_storage() {
213218
Arc::new(MockFetchRequester {}),
214219
DagPayloadConfig::default(),
215220
ValidatorTxnConfig::default_disabled(),
221+
Features::default(),
216222
);
217223
assert_ok!(rb_receiver.gc_before_round(2));
218224
assert_eq!(storage.get_votes().unwrap().len(), 0);

consensus/src/epoch_manager.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -823,6 +823,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
823823
network_sender: NetworkSender,
824824
payload_client: Arc<dyn PayloadClient>,
825825
payload_manager: Arc<PayloadManager>,
826+
features: Features,
826827
) {
827828
let epoch = epoch_state.epoch;
828829
info!(
@@ -930,6 +931,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
930931
onchain_consensus_config,
931932
buffered_proposal_tx,
932933
self.config.clone(),
934+
features,
933935
);
934936

935937
round_manager.init(last_vote).await;
@@ -975,7 +977,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
975977

976978
let onchain_consensus_config: anyhow::Result<OnChainConsensusConfig> = payload.get();
977979
let onchain_execution_config: anyhow::Result<OnChainExecutionConfig> = payload.get();
978-
let features = payload.get::<Features>().ok().unwrap_or_default();
980+
let features = payload.get::<Features>();
979981

980982
if let Err(error) = &onchain_consensus_config {
981983
error!("Failed to read on-chain consensus config {}", error);
@@ -985,11 +987,17 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
985987
error!("Failed to read on-chain execution config {}", error);
986988
}
987989

990+
if let Err(error) = &features {
991+
error!("Failed to read on-chain features {}", error);
992+
}
993+
988994
self.epoch_state = Some(epoch_state.clone());
989995

990996
let consensus_config = onchain_consensus_config.unwrap_or_default();
991997
let execution_config = onchain_execution_config
992998
.unwrap_or_else(|_| OnChainExecutionConfig::default_if_missing());
999+
let features = features.unwrap_or_default();
1000+
9931001
let (network_sender, payload_client, payload_manager) = self
9941002
.initialize_shared_component(
9951003
&epoch_state,
@@ -1006,6 +1014,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
10061014
network_sender,
10071015
payload_client,
10081016
payload_manager,
1017+
features,
10091018
)
10101019
.await
10111020
} else {
@@ -1015,6 +1024,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
10151024
network_sender,
10161025
payload_client,
10171026
payload_manager,
1027+
features,
10181028
)
10191029
.await
10201030
}
@@ -1061,6 +1071,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
10611071
network_sender: NetworkSender,
10621072
payload_client: Arc<dyn PayloadClient>,
10631073
payload_manager: Arc<PayloadManager>,
1074+
features: Features,
10641075
) {
10651076
match self.storage.start() {
10661077
LivenessStorageData::FullRecoveryData(initial_data) => {
@@ -1072,6 +1083,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
10721083
network_sender,
10731084
payload_client,
10741085
payload_manager,
1086+
features,
10751087
)
10761088
.await
10771089
},
@@ -1095,6 +1107,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
10951107
network_sender: NetworkSender,
10961108
payload_client: Arc<dyn PayloadClient>,
10971109
payload_manager: Arc<PayloadManager>,
1110+
features: Features,
10981111
) {
10991112
let epoch = epoch_state.epoch;
11001113

@@ -1147,6 +1160,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
11471160
onchain_consensus_config.quorum_store_enabled(),
11481161
onchain_consensus_config.effective_validator_txn_config(),
11491162
self.bounded_executor.clone(),
1163+
features,
11501164
);
11511165

11521166
let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 10, None);

0 commit comments

Comments
 (0)