@@ -16,6 +16,7 @@ use super::{
1616} ;
1717use super :: rust_base58:: { FromBase58 , ToBase58 } ;
1818use super :: types:: * ;
19+ use services:: pool:: PoolWorker ;
1920
2021pub const CATCHUP_ROUND_TIMEOUT : i64 = 50 ;
2122
@@ -45,7 +46,8 @@ pub struct CatchupHandler {
4546 pub pending_catchup : Option < CatchUpProcess > ,
4647 pub timeout : time:: Tm ,
4748 pub pool_id : i32 ,
48- pub nodes_votes : Vec < Option < ( String , usize ) > > ,
49+ pub nodes_votes : Vec < Option < ( String , usize , Option < Vec < String > > ) > > ,
50+ pub pool_name : String ,
4951}
5052
5153impl Default for CatchupHandler {
@@ -64,6 +66,7 @@ impl Default for CatchupHandler {
6466 pool_id : 0 ,
6567 nodes_votes : Vec :: new ( ) ,
6668 timeout : time:: now_utc ( ) ,
69+ pool_name : "" . to_string ( ) ,
6770 }
6871 }
6972}
@@ -86,11 +89,11 @@ impl CatchupHandler {
8689 CatchupProgress :: InProgress
8790 }
8891 Message :: LedgerStatus ( ledger_status) => {
89- self . nodes_votes [ src_ind] = Some ( ( ledger_status. merkleRoot , ledger_status. txnSeqNo ) ) ;
92+ self . nodes_votes [ src_ind] = Some ( ( ledger_status. merkleRoot , ledger_status. txnSeqNo , None ) ) ;
9093 self . check_nodes_responses_on_status ( ) ?
9194 }
9295 Message :: ConsistencyProof ( cons_proof) => {
93- self . nodes_votes [ src_ind] = Some ( ( cons_proof. newMerkleRoot , cons_proof. seqNoEnd ) ) ;
96+ self . nodes_votes [ src_ind] = Some ( ( cons_proof. newMerkleRoot , cons_proof. seqNoEnd , Some ( cons_proof . hashes ) ) ) ;
9497 self . check_nodes_responses_on_status ( ) ?
9598 }
9699 Message :: CatchupRep ( catchup) => {
@@ -115,40 +118,66 @@ impl CatchupHandler {
115118 if self . pending_catchup . is_some ( ) {
116119 return Ok ( CatchupProgress :: InProgress ) ;
117120 }
118- let mut votes: HashMap < ( String , usize ) , usize > = HashMap :: new ( ) ;
119- for node_vote in & self . nodes_votes {
120- if let & Some ( ref node_vote) = node_vote {
121- let cnt = * votes. get ( & node_vote) . unwrap_or ( & 0 ) + 1 ;
122- votes. insert ( ( node_vote. 0 . clone ( ) , node_vote. 1 ) , cnt) ;
123- }
124- }
121+
122+ let votes: HashMap < ( String , usize , Option < Vec < String > > ) , usize > = self . nodes_votes . iter ( ) . cloned ( )
123+ . filter_map ( |e| e)
124+ . fold ( HashMap :: new ( ) , |mut acc, vote| {
125+ //WARN: It can be processed without excessive cloning two steps before, but then we will face double write borrowing.
126+ //WARN: This is the most readable variant. However, not the most effective.
127+ let cnt = acc. get ( & vote) . unwrap_or ( & 0 ) + 1 ;
128+ acc. insert ( vote, cnt) ;
129+ acc
130+ } ) ;
131+
125132 if let Some ( ( most_popular_vote, votes_cnt) ) = votes. iter ( ) . max_by_key ( |entry| entry. 1 ) {
126133 if * votes_cnt == self . nodes . len ( ) - self . f {
127- let & ( ref target_mt_root, target_mt_size) = most_popular_vote;
128- let cur_mt_size = self . merkle_tree . count ( ) ;
129- let cur_mt_hash = self . merkle_tree . root_hash ( ) . to_base58 ( ) ;
130- if target_mt_size == cur_mt_size {
131- if cur_mt_hash. eq ( target_mt_root) {
132- return Ok ( CatchupProgress :: NotNeeded ) ;
134+ return self . _try_to_catch_up ( most_popular_vote) . or_else ( |err| {
135+ if PoolWorker :: drop_saved_txns ( & self . pool_name ) . is_ok ( ) {
136+ self . merkle_tree = PoolWorker :: restore_merkle_tree_from_pool_name ( & self . pool_name ) ?;
137+ self . _try_to_catch_up ( most_popular_vote)
133138 } else {
134- return Err ( PoolError :: CommonError ( CommonError :: InvalidState (
135- "Ledger merkle tree doesn't acceptable for current tree." . to_string ( ) ) ) ) ;
139+ Err ( err)
136140 }
137- } else if target_mt_size > cur_mt_size {
138- self . target_mt_size = target_mt_size;
139- self . target_mt_root = target_mt_root. from_base58 ( ) . map_err ( |_|
140- CommonError :: InvalidStructure (
141- "Can't parse target MerkleTree hash from nodes responses" . to_string ( ) ) ) ?;
142- return Ok ( CatchupProgress :: ShouldBeStarted ) ;
143- } else {
144- return Err ( PoolError :: CommonError ( CommonError :: InvalidState (
145- "Local merkle tree greater than mt from ledger" . to_string ( ) ) ) ) ;
146- }
141+ } )
147142 }
148143 }
149144 Ok ( CatchupProgress :: InProgress )
150145 }
151146
147+ fn _try_to_catch_up ( & mut self , ledger_status : & ( String , usize , Option < Vec < String > > ) ) -> Result < CatchupProgress , PoolError > {
148+ let & ( ref target_mt_root, target_mt_size, ref hashes) = ledger_status;
149+ let cur_mt_size = self . merkle_tree . count ( ) ;
150+ let cur_mt_hash = self . merkle_tree . root_hash ( ) . to_base58 ( ) ;
151+ if target_mt_size == cur_mt_size {
152+ if cur_mt_hash. eq ( target_mt_root) {
153+ return Ok ( CatchupProgress :: NotNeeded ) ;
154+ } else {
155+ return Err ( PoolError :: CommonError ( CommonError :: InvalidState (
156+ "Ledger merkle tree doesn't acceptable for current tree." . to_string ( ) ) ) ) ;
157+ }
158+ } else if target_mt_size > cur_mt_size {
159+ self . target_mt_size = target_mt_size;
160+ self . target_mt_root = target_mt_root. from_base58 ( ) . map_err ( |_|
161+ CommonError :: InvalidStructure (
162+ "Can't parse target MerkleTree hash from nodes responses" . to_string ( ) ) ) ?;
163+ match hashes {
164+ & None => { return Err ( PoolError :: from ( CommonError :: InvalidState ( "Empty consistency proof but catch up needed" . to_string ( ) ) ) ) ; } ,
165+ & Some ( ref hashes) => {
166+ match CatchupHandler :: check_cons_proofs ( & self . merkle_tree , hashes, & self . target_mt_root , self . target_mt_size ) {
167+ Ok ( _) => ( ) ,
168+ Err ( err) => {
169+ return Err ( PoolError :: from ( err) ) ;
170+ }
171+ }
172+ }
173+ } ;
174+ return Ok ( CatchupProgress :: ShouldBeStarted ) ;
175+ } else {
176+ return Err ( PoolError :: CommonError ( CommonError :: InvalidState (
177+ "Local merkle tree greater than mt from ledger" . to_string ( ) ) ) ) ;
178+ }
179+ }
180+
152181 pub fn reset_nodes_votes ( & mut self ) {
153182 self . nodes_votes . clear ( ) ;
154183 self . nodes_votes . resize ( self . nodes . len ( ) , None ) ;
@@ -243,11 +272,13 @@ impl CatchupHandler {
243272 if first_resp. min_tx ( ) ? - 1 != process. merkle_tree . count ( ) { break ; }
244273
245274 let mut temp_mt = process. merkle_tree . clone ( ) ;
275+ let mut vec_to_dump: Vec < Vec < u8 > > = vec ! [ ] ;
246276 while !first_resp. txns . is_empty ( ) {
247277 let key = first_resp. min_tx ( ) ?. to_string ( ) ;
248278 let new_gen_tx = first_resp. txns . remove ( & key) . unwrap ( ) ;
249279 if let Ok ( new_get_txn_bytes) = rmp_serde:: to_vec_named ( & new_gen_tx) {
250- temp_mt. append ( new_get_txn_bytes) ?;
280+ temp_mt. append ( new_get_txn_bytes. clone ( ) ) ?;
281+ vec_to_dump. push ( new_get_txn_bytes) ;
251282 } else {
252283 return Ok ( CatchupStepResult :: FailedAtNode ( node_idx) ) ;
253284 }
@@ -258,6 +289,8 @@ impl CatchupHandler {
258289 return Ok ( CatchupStepResult :: FailedAtNode ( node_idx) ) ;
259290 }
260291
292+ PoolWorker :: dump_new_txns ( & self . pool_name , & vec_to_dump) ?;
293+
261294 process. merkle_tree = temp_mt;
262295 }
263296 process. pending_reps . remove ( index) ;
0 commit comments