Skip to content

Commit 78f54b6

Browse files
authored
Merge pull request hyperledger-indy#810 from KitHat/master
Add cache for transactions for future catch-ups
2 parents b4a2bb8 + 774624a commit 78f54b6

File tree

8 files changed

+343
-57
lines changed

8 files changed

+343
-57
lines changed

libindy/src/errors/pool.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ pub enum PoolError {
1616
Terminate,
1717
Timeout,
1818
AlreadyExists(String),
19-
CommonError(CommonError)
19+
CommonError(CommonError),
2020
}
2121

2222
impl fmt::Display for PoolError {
@@ -27,7 +27,7 @@ impl fmt::Display for PoolError {
2727
PoolError::Terminate => write!(f, "Pool work terminated"),
2828
PoolError::Timeout => write!(f, "Timeout"),
2929
PoolError::AlreadyExists(ref description) => write!(f, "Pool ledger config already exists {}", description),
30-
PoolError::CommonError(ref err) => err.fmt(f)
30+
PoolError::CommonError(ref err) => err.fmt(f),
3131
}
3232
}
3333
}
@@ -40,7 +40,7 @@ impl error::Error for PoolError {
4040
PoolError::Terminate => "Pool work terminated",
4141
PoolError::Timeout => "Timeout",
4242
PoolError::AlreadyExists(ref description) => description,
43-
PoolError::CommonError(ref err) => err.description()
43+
PoolError::CommonError(ref err) => err.description(),
4444
}
4545
}
4646

@@ -49,7 +49,7 @@ impl error::Error for PoolError {
4949
PoolError::NotCreated(_) | PoolError::InvalidHandle(_) => None,
5050
PoolError::Terminate | PoolError::Timeout => None,
5151
PoolError::AlreadyExists(_) => None,
52-
PoolError::CommonError(ref err) => Some(err)
52+
PoolError::CommonError(ref err) => Some(err),
5353
}
5454
}
5555
}

libindy/src/services/pool/catchup.rs

Lines changed: 62 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use super::{
1616
};
1717
use super::rust_base58::{FromBase58, ToBase58};
1818
use super::types::*;
19+
use services::pool::PoolWorker;
1920

2021
pub const CATCHUP_ROUND_TIMEOUT: i64 = 50;
2122

@@ -45,7 +46,8 @@ pub struct CatchupHandler {
4546
pub pending_catchup: Option<CatchUpProcess>,
4647
pub timeout: time::Tm,
4748
pub pool_id: i32,
48-
pub nodes_votes: Vec<Option<(String, usize)>>,
49+
pub nodes_votes: Vec<Option<(String, usize, Option<Vec<String>>)>>,
50+
pub pool_name: String,
4951
}
5052

5153
impl Default for CatchupHandler {
@@ -64,6 +66,7 @@ impl Default for CatchupHandler {
6466
pool_id: 0,
6567
nodes_votes: Vec::new(),
6668
timeout: time::now_utc(),
69+
pool_name: "".to_string(),
6770
}
6871
}
6972
}
@@ -86,11 +89,11 @@ impl CatchupHandler {
8689
CatchupProgress::InProgress
8790
}
8891
Message::LedgerStatus(ledger_status) => {
89-
self.nodes_votes[src_ind] = Some((ledger_status.merkleRoot, ledger_status.txnSeqNo));
92+
self.nodes_votes[src_ind] = Some((ledger_status.merkleRoot, ledger_status.txnSeqNo, None));
9093
self.check_nodes_responses_on_status()?
9194
}
9295
Message::ConsistencyProof(cons_proof) => {
93-
self.nodes_votes[src_ind] = Some((cons_proof.newMerkleRoot, cons_proof.seqNoEnd));
96+
self.nodes_votes[src_ind] = Some((cons_proof.newMerkleRoot, cons_proof.seqNoEnd, Some(cons_proof.hashes)));
9497
self.check_nodes_responses_on_status()?
9598
}
9699
Message::CatchupRep(catchup) => {
@@ -115,40 +118,66 @@ impl CatchupHandler {
115118
if self.pending_catchup.is_some() {
116119
return Ok(CatchupProgress::InProgress);
117120
}
118-
let mut votes: HashMap<(String, usize), usize> = HashMap::new();
119-
for node_vote in &self.nodes_votes {
120-
if let &Some(ref node_vote) = node_vote {
121-
let cnt = *votes.get(&node_vote).unwrap_or(&0) + 1;
122-
votes.insert((node_vote.0.clone(), node_vote.1), cnt);
123-
}
124-
}
121+
122+
let votes: HashMap<(String, usize, Option<Vec<String>>), usize> = self.nodes_votes.iter().cloned()
123+
.filter_map(|e| e)
124+
.fold(HashMap::new(), |mut acc, vote| {
125+
//WARN: It can be processed without excessive cloning two steps before, but then we will face double write borrowing.
126+
//WARN: This is the most readable variant. However, not the most effective.
127+
let cnt = acc.get(&vote).unwrap_or(&0) + 1;
128+
acc.insert(vote, cnt);
129+
acc
130+
});
131+
125132
if let Some((most_popular_vote, votes_cnt)) = votes.iter().max_by_key(|entry| entry.1) {
126133
if *votes_cnt == self.nodes.len() - self.f {
127-
let &(ref target_mt_root, target_mt_size) = most_popular_vote;
128-
let cur_mt_size = self.merkle_tree.count();
129-
let cur_mt_hash = self.merkle_tree.root_hash().to_base58();
130-
if target_mt_size == cur_mt_size {
131-
if cur_mt_hash.eq(target_mt_root) {
132-
return Ok(CatchupProgress::NotNeeded);
134+
return self._try_to_catch_up(most_popular_vote).or_else(|err| {
135+
if PoolWorker::drop_saved_txns(&self.pool_name).is_ok() {
136+
self.merkle_tree = PoolWorker::restore_merkle_tree_from_pool_name(&self.pool_name)?;
137+
self._try_to_catch_up(most_popular_vote)
133138
} else {
134-
return Err(PoolError::CommonError(CommonError::InvalidState(
135-
"Ledger merkle tree doesn't acceptable for current tree.".to_string())));
139+
Err(err)
136140
}
137-
} else if target_mt_size > cur_mt_size {
138-
self.target_mt_size = target_mt_size;
139-
self.target_mt_root = target_mt_root.from_base58().map_err(|_|
140-
CommonError::InvalidStructure(
141-
"Can't parse target MerkleTree hash from nodes responses".to_string()))?;
142-
return Ok(CatchupProgress::ShouldBeStarted);
143-
} else {
144-
return Err(PoolError::CommonError(CommonError::InvalidState(
145-
"Local merkle tree greater than mt from ledger".to_string())));
146-
}
141+
})
147142
}
148143
}
149144
Ok(CatchupProgress::InProgress)
150145
}
151146

147+
fn _try_to_catch_up(&mut self, ledger_status: &(String, usize, Option<Vec<String>>)) -> Result<CatchupProgress, PoolError> {
148+
let &(ref target_mt_root, target_mt_size, ref hashes) = ledger_status;
149+
let cur_mt_size = self.merkle_tree.count();
150+
let cur_mt_hash = self.merkle_tree.root_hash().to_base58();
151+
if target_mt_size == cur_mt_size {
152+
if cur_mt_hash.eq(target_mt_root) {
153+
return Ok(CatchupProgress::NotNeeded);
154+
} else {
155+
return Err(PoolError::CommonError(CommonError::InvalidState(
156+
"Ledger merkle tree doesn't acceptable for current tree.".to_string())));
157+
}
158+
} else if target_mt_size > cur_mt_size {
159+
self.target_mt_size = target_mt_size;
160+
self.target_mt_root = target_mt_root.from_base58().map_err(|_|
161+
CommonError::InvalidStructure(
162+
"Can't parse target MerkleTree hash from nodes responses".to_string()))?;
163+
match hashes {
164+
&None => {return Err(PoolError::from(CommonError::InvalidState("Empty consistency proof but catch up needed".to_string())));},
165+
&Some(ref hashes) => {
166+
match CatchupHandler::check_cons_proofs(&self.merkle_tree, hashes, &self.target_mt_root, self.target_mt_size) {
167+
Ok(_) => (),
168+
Err(err) => {
169+
return Err(PoolError::from(err));
170+
}
171+
}
172+
}
173+
};
174+
return Ok(CatchupProgress::ShouldBeStarted);
175+
} else {
176+
return Err(PoolError::CommonError(CommonError::InvalidState(
177+
"Local merkle tree greater than mt from ledger".to_string())));
178+
}
179+
}
180+
152181
pub fn reset_nodes_votes(&mut self) {
153182
self.nodes_votes.clear();
154183
self.nodes_votes.resize(self.nodes.len(), None);
@@ -243,11 +272,13 @@ impl CatchupHandler {
243272
if first_resp.min_tx()? - 1 != process.merkle_tree.count() { break; }
244273

245274
let mut temp_mt = process.merkle_tree.clone();
275+
let mut vec_to_dump: Vec<Vec<u8>> = vec![];
246276
while !first_resp.txns.is_empty() {
247277
let key = first_resp.min_tx()?.to_string();
248278
let new_gen_tx = first_resp.txns.remove(&key).unwrap();
249279
if let Ok(new_get_txn_bytes) = rmp_serde::to_vec_named(&new_gen_tx) {
250-
temp_mt.append(new_get_txn_bytes)?;
280+
temp_mt.append(new_get_txn_bytes.clone())?;
281+
vec_to_dump.push(new_get_txn_bytes);
251282
} else {
252283
return Ok(CatchupStepResult::FailedAtNode(node_idx));
253284
}
@@ -258,6 +289,8 @@ impl CatchupHandler {
258289
return Ok(CatchupStepResult::FailedAtNode(node_idx));
259290
}
260291

292+
PoolWorker::dump_new_txns(&self.pool_name, &vec_to_dump)?;
293+
261294
process.merkle_tree = temp_mt;
262295
}
263296
process.pending_reps.remove(index);

0 commit comments

Comments
 (0)