Skip to content

Commit 265c8cb

Browse files
committed
[dag] commit rule draft 2/n
1 parent 98d5aca commit 265c8cb

File tree

5 files changed

+197
-154
lines changed

5 files changed

+197
-154
lines changed

consensus/src/dag/commit_rule.rs

Lines changed: 0 additions & 102 deletions
This file was deleted.

consensus/src/dag/dag_store.rs

Lines changed: 58 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use aptos_crypto::HashValue;
1111
use aptos_logger::error;
1212
use aptos_types::{epoch_state::EpochState, validator_verifier::ValidatorVerifier};
1313
use std::{
14-
collections::{BTreeMap, HashMap},
14+
collections::{BTreeMap, HashMap, HashSet},
1515
sync::Arc,
1616
};
1717

@@ -30,6 +30,11 @@ impl NodeStatus {
3030
| NodeStatus::Committed(node) => node,
3131
}
3232
}
33+
34+
pub fn mark_as_ordered(&mut self) {
35+
assert!(matches!(self, NodeStatus::Unordered(_)));
36+
*self = NodeStatus::Ordered(self.as_node().clone());
37+
}
3338
}
3439

3540
/// Data structure that stores the DAG representation, it maintains round based index.
@@ -149,9 +154,9 @@ impl Dag {
149154
&self,
150155
round: Round,
151156
author: &Author,
152-
) -> Option<Arc<CertifiedNode>> {
157+
) -> Option<&Arc<CertifiedNode>> {
153158
self.get_node_ref(round, author)
154-
.map(|node_status| node_status.as_node().clone())
159+
.map(|node_status| node_status.as_node())
155160
}
156161

157162
// TODO: I think we can cache votes in the NodeStatus::Unordered
@@ -161,7 +166,7 @@ impl Dag {
161166
validator_verifier: &ValidatorVerifier,
162167
) -> bool {
163168
self.get_round_iter(metadata.round() + 1)
164-
.and_then(|next_round_iter| {
169+
.map(|next_round_iter| {
165170
let votes = next_round_iter
166171
.filter(|node_status| {
167172
node_status
@@ -171,16 +176,59 @@ impl Dag {
171176
.any(|cert| cert.metadata() == metadata)
172177
})
173178
.map(|node_status| node_status.as_node().author());
174-
Some(validator_verifier.check_voting_power(votes).is_ok())
179+
validator_verifier.check_voting_power(votes).is_ok()
175180
})
176181
.unwrap_or(false)
177182
}
178183

179-
pub fn reachable<'a>(
180-
&'a self,
181-
from: &'a Arc<CertifiedNode>,
182-
) -> impl Iterator<Item = &Arc<CertifiedNode>> {
183-
ReachableIterator::new(from, self)
184+
fn reachable_filter(start: HashValue) -> impl FnMut(&Arc<CertifiedNode>) -> bool {
185+
let mut reachable = HashSet::from([start]);
186+
move |node| {
187+
if reachable.contains(&node.digest()) {
188+
for parent in node.parents() {
189+
reachable.insert(*parent.metadata().digest());
190+
}
191+
true
192+
} else {
193+
false
194+
}
195+
}
196+
}
197+
198+
pub fn reachable_mut(
199+
&mut self,
200+
from: &Arc<CertifiedNode>,
201+
until: Option<Round>,
202+
) -> impl Iterator<Item = &mut NodeStatus> {
203+
let until = until.unwrap_or(self.lowest_round());
204+
let mut reachable_filter = Self::reachable_filter(from.digest());
205+
self.nodes_by_round
206+
.range_mut(until..=from.round())
207+
.rev()
208+
.flat_map(|(_, round_ref)| round_ref.iter_mut())
209+
.flatten()
210+
.filter(move |node_status| {
211+
matches!(node_status, NodeStatus::Unordered(_))
212+
&& reachable_filter(node_status.as_node())
213+
})
214+
}
215+
216+
pub fn reachable(
217+
&self,
218+
from: &Arc<CertifiedNode>,
219+
until: Option<Round>,
220+
) -> impl Iterator<Item = &NodeStatus> {
221+
let until = until.unwrap_or(self.lowest_round());
222+
let mut reachable_filter = Self::reachable_filter(from.digest());
223+
self.nodes_by_round
224+
.range(until..=from.round())
225+
.rev()
226+
.flat_map(|(_, round_ref)| round_ref.iter())
227+
.flatten()
228+
.filter(move |node_status| {
229+
matches!(node_status, NodeStatus::Unordered(_))
230+
&& reachable_filter(node_status.as_node())
231+
})
184232
}
185233

186234
pub fn get_strong_links_for_round(
@@ -210,43 +258,3 @@ impl Dag {
210258
todo!();
211259
}
212260
}
213-
214-
pub struct ReachableIterator<'a> {
215-
current: Vec<&'a Arc<CertifiedNode>>,
216-
next: HashMap<HashValue, NodeMetadata>,
217-
dag: &'a Dag,
218-
}
219-
220-
impl<'a> ReachableIterator<'a> {
221-
fn new(from: &'a Arc<CertifiedNode>, dag: &'a Dag) -> Self {
222-
Self {
223-
current: vec![from],
224-
next: HashMap::new(),
225-
dag,
226-
}
227-
}
228-
}
229-
230-
impl<'a> Iterator for ReachableIterator<'a> {
231-
type Item = &'a Arc<CertifiedNode>;
232-
233-
fn next(&mut self) -> Option<Self::Item> {
234-
if self.current.is_empty() {
235-
for (_, metadata) in self.next.drain() {
236-
match self.dag.get_node_ref_by_metadata(&metadata) {
237-
Some(NodeStatus::Unordered(node)) => self.current.push(node),
238-
_ => (),
239-
}
240-
}
241-
}
242-
if let Some(node) = self.current.pop() {
243-
for parent in node.parents() {
244-
self.next
245-
.insert(*parent.metadata().digest(), parent.metadata().clone());
246-
}
247-
Some(node)
248-
} else {
249-
None
250-
}
251-
}
252-
}

consensus/src/dag/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
#![allow(dead_code)]
44

55
mod anchor_election;
6-
mod commit_rule;
76
mod dag_driver;
87
mod dag_fetcher;
98
mod dag_handler;
109
mod dag_network;
1110
mod dag_store;
11+
mod order_rule;
1212
mod reliable_broadcast;
1313
mod storage;
1414
#[cfg(test)]

0 commit comments

Comments
 (0)