|
1 | 1 | use anyhow::anyhow; |
2 | | -use std::collections::HashSet; |
| 2 | +use std::collections::{BTreeMap, HashSet}; |
3 | 3 | use std::str::FromStr; |
4 | 4 | use std::sync::atomic::{AtomicBool, Ordering}; |
5 | 5 | use std::sync::{Arc, RwLock}; |
@@ -91,6 +91,83 @@ impl<S: MutinyStorage> OnChainWallet<S> { |
91 | 91 | } |
92 | 92 |
|
93 | 93 | pub async fn sync(&self) -> Result<(), MutinyError> { |
| 94 | + // get first wallet lock that only needs to read |
| 95 | + let (checkpoints, spks, txids) = { |
| 96 | + if let Ok(wallet) = self.wallet.try_read() { |
| 97 | + let checkpoints = wallet.checkpoints(); |
| 98 | + |
| 99 | + let spk_vec = wallet |
| 100 | + .spk_index() |
| 101 | + .unused_spks(..) |
| 102 | + .map(|(k, v)| (*k, v.clone())) |
| 103 | + .collect::<Vec<_>>(); |
| 104 | + |
| 105 | + let mut spk_map = BTreeMap::new(); |
| 106 | + for ((a, b), c) in spk_vec { |
| 107 | + spk_map.entry(a).or_insert_with(Vec::new).push((b, c)); |
| 108 | + } |
| 109 | + |
| 110 | + let chain = wallet.local_chain(); |
| 111 | + let chain_tip = chain.tip().unwrap_or_default(); |
| 112 | + |
| 113 | + let unconfirmed_txids = wallet |
| 114 | + .tx_graph() |
| 115 | + .list_chain_txs(chain, chain_tip) |
| 116 | + .filter(|canonical_tx| !canonical_tx.observed_as.is_confirmed()) |
| 117 | + .map(|canonical_tx| canonical_tx.node.txid) |
| 118 | + .collect::<Vec<Txid>>(); |
| 119 | + |
| 120 | + (checkpoints.clone(), spk_map, unconfirmed_txids) |
| 121 | + } else { |
| 122 | + log_error!(self.logger, "Could not get wallet lock to sync"); |
| 123 | + return Err(MutinyError::WalletOperationFailed); |
| 124 | + } |
| 125 | + }; |
| 126 | + |
| 127 | + let update = self |
| 128 | + .blockchain |
| 129 | + .scan(&checkpoints, spks, txids, core::iter::empty(), 20, 5) |
| 130 | + .await?; |
| 131 | + |
| 132 | + // get new wallet lock for writing and apply the update |
| 133 | + for _ in 0..10 { |
| 134 | + match self.wallet.try_write() { |
| 135 | + Ok(mut wallet) => match wallet.apply_update(update) { |
| 136 | + Ok(changed) => { |
| 137 | + // commit the changes if there were any |
| 138 | + if changed { |
| 139 | + wallet.commit()?; |
| 140 | + } |
| 141 | + |
| 142 | + return Ok(()); |
| 143 | + } |
| 144 | + Err(e) => { |
| 145 | + // failed to apply wallet update |
| 146 | + log_error!(self.logger, "Could not apply wallet update: {e}"); |
| 147 | + return Err(MutinyError::Other(anyhow!("Could not apply update: {e}"))); |
| 148 | + } |
| 149 | + }, |
| 150 | + Err(e) => { |
| 151 | + // if we can't get the lock, we just return and try again later |
| 152 | + log_error!( |
| 153 | + self.logger, |
| 154 | + "Could not get wallet lock: {e}, retrying in 250ms" |
| 155 | + ); |
| 156 | + |
| 157 | + if self.stop.load(Ordering::Relaxed) { |
| 158 | + return Err(MutinyError::NotRunning); |
| 159 | + }; |
| 160 | + |
| 161 | + sleep(250).await; |
| 162 | + } |
| 163 | + } |
| 164 | + } |
| 165 | + |
| 166 | + log_error!(self.logger, "Could not get wallet lock after 10 retries"); |
| 167 | + Err(MutinyError::WalletOperationFailed) |
| 168 | + } |
| 169 | + |
| 170 | + pub async fn full_sync(&self) -> Result<(), MutinyError> { |
94 | 171 | // get first wallet lock that only needs to read |
95 | 172 | let (checkpoints, spks) = { |
96 | 173 | if let Ok(wallet) = self.wallet.try_read() { |
|
0 commit comments