Skip to content

Commit 65f0f73

Browse files
authored
[dag] reliable broadcast storage (aptos-labs#9015)
1 parent 35d757a commit 65f0f73

File tree

10 files changed

+302
-35
lines changed

10 files changed

+302
-35
lines changed

consensus/src/consensusdb/mod.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ mod consensusdb_test;
77
mod schema;
88

99
use crate::{
10-
dag::{CertifiedNode, Node},
10+
dag::{CertifiedNode, Node, NodeDigestSignature, NodeId},
1111
error::DbError,
1212
};
1313
use anyhow::Result;
@@ -17,7 +17,7 @@ use aptos_logger::prelude::*;
1717
use aptos_schemadb::{Options, ReadOptions, SchemaBatch, DB, DEFAULT_COLUMN_FAMILY_NAME};
1818
use schema::{
1919
block::BlockSchema,
20-
dag::{CertifiedNodeSchema, NodeSchema},
20+
dag::{CertifiedNodeSchema, NodeDigestSignatureSchema, NodeSchema},
2121
quorum_certificate::QCSchema,
2222
single_entry::{SingleEntryKey, SingleEntrySchema},
2323
BLOCK_CF_NAME, CERTIFIED_NODE_CF_NAME, NODE_CF_NAME, QC_CF_NAME, SINGLE_ENTRY_CF_NAME,
@@ -199,6 +199,38 @@ impl ConsensusDB {
199199
Ok(())
200200
}
201201

202+
pub fn delete_node(&self, digest: HashValue) -> Result<(), DbError> {
203+
let batch = SchemaBatch::new();
204+
batch.delete::<NodeSchema>(&digest)?;
205+
self.commit(batch)
206+
}
207+
208+
pub fn save_node_signature(
209+
&self,
210+
node_id: &NodeId,
211+
node_digest_signature: &NodeDigestSignature,
212+
) -> Result<(), DbError> {
213+
let batch = SchemaBatch::new();
214+
batch.put::<NodeDigestSignatureSchema>(node_id, node_digest_signature)?;
215+
self.commit(batch)
216+
}
217+
218+
pub fn get_node_signatures(&self) -> Result<HashMap<NodeId, NodeDigestSignature>, DbError> {
219+
let mut iter = self
220+
.db
221+
.iter::<NodeDigestSignatureSchema>(ReadOptions::default())?;
222+
iter.seek_to_first();
223+
Ok(iter.collect::<Result<HashMap<NodeId, NodeDigestSignature>>>()?)
224+
}
225+
226+
pub fn delete_node_signatures(&self, node_ids: Vec<NodeId>) -> Result<(), DbError> {
227+
let batch = SchemaBatch::new();
228+
node_ids
229+
.iter()
230+
.try_for_each(|node_id| batch.delete::<NodeDigestSignatureSchema>(node_id))?;
231+
self.commit(batch)
232+
}
233+
202234
pub fn save_certified_node(&self, node: &CertifiedNode) -> Result<(), DbError> {
203235
let batch = SchemaBatch::new();
204236
batch.put::<CertifiedNodeSchema>(&node.digest(), node)?;

consensus/src/consensusdb/schema/dag/mod.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
//! | digest | node/certified node |
1010
//! ```
1111
12-
use crate::dag::{CertifiedNode, Node};
12+
use crate::dag::{CertifiedNode, Node, NodeDigestSignature, NodeId};
1313
use anyhow::Result;
1414
use aptos_crypto::HashValue;
1515
use aptos_schemadb::{
@@ -42,6 +42,35 @@ impl ValueCodec<NodeSchema> for Node {
4242
}
4343
}
4444

45+
pub const NODE_DIGEST_SIGNATURE_CF_NAME: ColumnFamilyName = "node_digest_signature";
46+
47+
define_schema!(
48+
NodeDigestSignatureSchema,
49+
NodeId,
50+
NodeDigestSignature,
51+
NODE_DIGEST_SIGNATURE_CF_NAME
52+
);
53+
54+
impl KeyCodec<NodeDigestSignatureSchema> for NodeId {
55+
fn encode_key(&self) -> Result<Vec<u8>> {
56+
Ok(bcs::to_bytes(&self)?)
57+
}
58+
59+
fn decode_key(data: &[u8]) -> Result<Self> {
60+
Ok(bcs::from_bytes(data)?)
61+
}
62+
}
63+
64+
impl ValueCodec<NodeDigestSignatureSchema> for NodeDigestSignature {
65+
fn encode_value(&self) -> Result<Vec<u8>> {
66+
Ok(bcs::to_bytes(&self)?)
67+
}
68+
69+
fn decode_value(data: &[u8]) -> Result<Self> {
70+
Ok(bcs::from_bytes(data)?)
71+
}
72+
}
73+
4574
pub const CERTIFIED_NODE_CF_NAME: ColumnFamilyName = "certified_node";
4675

4776
define_schema!(

consensus/src/dag/dag_driver.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright © Aptos Foundation
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use super::storage::DAGStorage;
45
use crate::{
56
dag::{
67
dag_store::Dag,
@@ -28,6 +29,7 @@ pub(crate) struct DagDriver {
2829
current_round: Round,
2930
time_service: Arc<dyn TimeService>,
3031
rb_abort_handle: Option<AbortHandle>,
32+
storage: Arc<dyn DAGStorage>,
3133
}
3234

3335
impl DagDriver {
@@ -39,7 +41,9 @@ impl DagDriver {
3941
reliable_broadcast: Arc<ReliableBroadcast>,
4042
current_round: Round,
4143
time_service: Arc<dyn TimeService>,
44+
storage: Arc<dyn DAGStorage>,
4245
) -> Self {
46+
// TODO: rebroadcast nodes after recovery
4347
Self {
4448
author,
4549
epoch_state,
@@ -49,6 +53,7 @@ impl DagDriver {
4953
current_round,
5054
time_service,
5155
rb_abort_handle: None,
56+
storage,
5257
}
5358
}
5459

@@ -84,6 +89,9 @@ impl DagDriver {
8489
payload,
8590
strong_links,
8691
);
92+
self.storage
93+
.save_node(&new_node)
94+
.expect("node must be saved");
8795
self.broadcast_node(new_node);
8896
}
8997

consensus/src/dag/dag_handler.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Copyright © Aptos Foundation
22

3-
use super::{reliable_broadcast::CertifiedNodeHandler, types::TDAGMessage};
3+
use super::{reliable_broadcast::CertifiedNodeHandler, storage::DAGStorage, types::TDAGMessage};
44
use crate::{
55
dag::{
66
dag_network::RpcHandler, dag_store::Dag, reliable_broadcast::NodeBroadcastHandler,
@@ -32,20 +32,23 @@ impl NetworkHandler {
3232
dag_rpc_rx: aptos_channel::Receiver<Author, IncomingDAGRequest>,
3333
signer: ValidatorSigner,
3434
epoch_state: Arc<EpochState>,
35+
storage: Arc<dyn DAGStorage>,
3536
) -> Self {
3637
Self {
3738
dag_rpc_rx,
3839
node_receiver: NodeBroadcastHandler::new(
3940
dag.clone(),
4041
signer,
41-
epoch_state.verifier.clone(),
42+
epoch_state.clone(),
43+
storage,
4244
),
4345
certified_node_receiver: CertifiedNodeHandler::new(dag),
4446
epoch_state,
4547
}
4648
}
4749

4850
async fn start(mut self) {
51+
// TODO(ibalajiarun): clean up Reliable Broadcast storage periodically.
4952
while let Some(msg) = self.dag_rpc_rx.next().await {
5053
if let Err(e) = self.process_rpc(msg).await {
5154
warn!(error = ?e, "error processing rpc");

consensus/src/dag/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ mod tests;
1616
mod types;
1717

1818
pub use dag_network::RpcHandler;
19-
pub use types::{CertifiedNode, DAGNetworkMessage, Node};
19+
pub use types::{CertifiedNode, DAGNetworkMessage, Node, NodeDigestSignature, NodeId};

consensus/src/dag/reliable_broadcast.rs

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
// Copyright © Aptos Foundation
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use super::types::{CertifiedAck, CertifiedNode};
4+
use super::{
5+
storage::DAGStorage,
6+
types::{CertifiedAck, CertifiedNode},
7+
NodeId,
8+
};
59
use crate::{
610
dag::{
711
dag_network::{DAGNetworkSender, RpcHandler},
@@ -13,9 +17,10 @@ use crate::{
1317
use anyhow::{bail, ensure};
1418
use aptos_consensus_types::common::{Author, Round};
1519
use aptos_infallible::RwLock;
16-
use aptos_types::{validator_signer::ValidatorSigner, validator_verifier::ValidatorVerifier};
20+
use aptos_logger::error;
21+
use aptos_types::{epoch_state::EpochState, validator_signer::ValidatorSigner};
1722
use futures::{stream::FuturesUnordered, StreamExt};
18-
use std::{collections::BTreeMap, future::Future, sync::Arc, time::Duration};
23+
use std::{collections::BTreeMap, future::Future, mem, sync::Arc, time::Duration};
1924
use thiserror::Error as ThisError;
2025

2126
pub trait BroadcastStatus {
@@ -96,25 +101,42 @@ pub struct NodeBroadcastHandler {
96101
dag: Arc<RwLock<Dag>>,
97102
signatures_by_round_peer: BTreeMap<Round, BTreeMap<Author, NodeDigestSignature>>,
98103
signer: ValidatorSigner,
99-
verifier: ValidatorVerifier,
104+
epoch_state: Arc<EpochState>,
105+
storage: Arc<dyn DAGStorage>,
100106
}
101107

102108
impl NodeBroadcastHandler {
103109
pub fn new(
104110
dag: Arc<RwLock<Dag>>,
105111
signer: ValidatorSigner,
106-
verifier: ValidatorVerifier,
112+
epoch_state: Arc<EpochState>,
113+
storage: Arc<dyn DAGStorage>,
107114
) -> Self {
115+
let epoch = epoch_state.epoch;
116+
let signatures_by_round_peer = read_signatures_from_storage(&storage, epoch);
117+
108118
Self {
109119
dag,
110-
signatures_by_round_peer: BTreeMap::new(),
120+
signatures_by_round_peer,
111121
signer,
112-
verifier,
122+
epoch_state,
123+
storage,
113124
}
114125
}
115126

116-
pub fn gc_before_round(&mut self, min_round: Round) {
117-
self.signatures_by_round_peer.retain(|r, _| r >= &min_round);
127+
pub fn gc_before_round(&mut self, min_round: Round) -> anyhow::Result<()> {
128+
let to_retain = self.signatures_by_round_peer.split_off(&min_round);
129+
let to_delete = mem::replace(&mut self.signatures_by_round_peer, to_retain);
130+
131+
let to_delete = to_delete
132+
.iter()
133+
.flat_map(|(r, peer_and_digest)| {
134+
peer_and_digest
135+
.iter()
136+
.map(|(author, _)| NodeId::new(self.epoch_state.epoch, *r, *author))
137+
})
138+
.collect();
139+
self.storage.delete_node_signatures(to_delete)
118140
}
119141

120142
fn validate(&self, node: &Node) -> anyhow::Result<()> {
@@ -146,7 +168,8 @@ impl NodeBroadcastHandler {
146168
ensure!(
147169
missing_parents.iter().all(|parent| {
148170
let node_digest = NodeDigest::new(*parent.metadata().digest());
149-
self.verifier
171+
self.epoch_state
172+
.verifier
150173
.verify_multi_signatures(&node_digest, parent.signatures())
151174
.is_ok()
152175
}),
@@ -160,6 +183,31 @@ impl NodeBroadcastHandler {
160183
}
161184
}
162185

186+
fn read_signatures_from_storage(
187+
storage: &Arc<dyn DAGStorage>,
188+
epoch: u64,
189+
) -> BTreeMap<u64, BTreeMap<Author, NodeDigestSignature>> {
190+
let mut signatures_by_round_peer = BTreeMap::new();
191+
192+
let all_node_signatures = storage.get_node_signatures().unwrap_or_default();
193+
let mut to_delete = vec![];
194+
for (node_id, node_sig) in all_node_signatures {
195+
if node_id.epoch() == epoch {
196+
signatures_by_round_peer
197+
.entry(node_id.round())
198+
.or_insert_with(BTreeMap::new)
199+
.insert(node_id.author(), node_sig);
200+
} else {
201+
to_delete.push(node_id);
202+
}
203+
}
204+
if let Err(err) = storage.delete_node_signatures(to_delete) {
205+
error!("unable to clear old signatures: {}", err);
206+
}
207+
208+
signatures_by_round_peer
209+
}
210+
163211
impl RpcHandler for NodeBroadcastHandler {
164212
type Request = Node;
165213
type Response = NodeDigestSignature;
@@ -171,13 +219,16 @@ impl RpcHandler for NodeBroadcastHandler {
171219
.signatures_by_round_peer
172220
.entry(node.metadata().round())
173221
.or_insert(BTreeMap::new());
174-
// TODO(ibalajiarun): persist node before voting
175222
match signatures_by_peer.get(node.metadata().author()) {
176223
None => {
177224
let signature = node.sign(&self.signer)?;
178225
let digest_signature =
179226
NodeDigestSignature::new(node.metadata().epoch(), node.digest(), signature);
227+
228+
self.storage
229+
.save_node_signature(&NodeId::from(&node), &digest_signature)?;
180230
signatures_by_peer.insert(*node.metadata().author(), digest_signature.clone());
231+
181232
Ok(digest_signature)
182233
},
183234
Some(ack) => Ok(ack.clone()),

consensus/src/dag/storage.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,30 @@
11
// Copyright © Aptos Foundation
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use super::{types::NodeDigestSignature, NodeId};
45
use crate::{
56
consensusdb::ConsensusDB,
67
dag::{CertifiedNode, Node},
78
};
9+
use anyhow::Ok;
810
use aptos_crypto::HashValue;
911
use std::collections::HashMap;
1012

1113
pub trait DAGStorage {
1214
fn save_node(&self, node: &Node) -> anyhow::Result<()>;
1315

16+
fn delete_node(&self, digest: HashValue) -> anyhow::Result<()>;
17+
18+
fn save_node_signature(
19+
&self,
20+
node_id: &NodeId,
21+
node_digest_signature: &NodeDigestSignature,
22+
) -> anyhow::Result<()>;
23+
24+
fn get_node_signatures(&self) -> anyhow::Result<HashMap<NodeId, NodeDigestSignature>>;
25+
26+
fn delete_node_signatures(&self, node_ids: Vec<NodeId>) -> anyhow::Result<()>;
27+
1428
fn save_certified_node(&self, node: &CertifiedNode) -> anyhow::Result<()>;
1529

1630
fn get_certified_nodes(&self) -> anyhow::Result<HashMap<HashValue, CertifiedNode>>;
@@ -23,6 +37,26 @@ impl DAGStorage for ConsensusDB {
2337
Ok(self.save_node(node)?)
2438
}
2539

40+
fn delete_node(&self, digest: HashValue) -> anyhow::Result<()> {
41+
Ok(self.delete_node(digest)?)
42+
}
43+
44+
fn save_node_signature(
45+
&self,
46+
node_id: &NodeId,
47+
node_digest_signature: &NodeDigestSignature,
48+
) -> anyhow::Result<()> {
49+
Ok(self.save_node_signature(node_id, node_digest_signature)?)
50+
}
51+
52+
fn get_node_signatures(&self) -> anyhow::Result<HashMap<NodeId, NodeDigestSignature>> {
53+
Ok(self.get_node_signatures()?)
54+
}
55+
56+
fn delete_node_signatures(&self, node_ids: Vec<NodeId>) -> anyhow::Result<()> {
57+
Ok(self.delete_node_signatures(node_ids)?)
58+
}
59+
2660
fn save_certified_node(&self, node: &CertifiedNode) -> anyhow::Result<()> {
2761
Ok(self.save_certified_node(node)?)
2862
}

0 commit comments

Comments
 (0)