Skip to content

Commit d2cadd0

Browse files
authored
Merge pull request #599 from tnull/2025-07-fix-res-propagation-sync
Refactor chain sourcing logic to make result propagation more robust
2 parents 0935401 + a6349a4 commit d2cadd0

File tree

3 files changed

+232
-203
lines changed

3 files changed

+232
-203
lines changed

src/chain/bitcoind.rs

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use super::WalletSyncStatus;
8+
use super::{periodically_archive_fully_resolved_monitors, WalletSyncStatus};
99

1010
use crate::config::{
1111
BitcoindRestClientConfig, Config, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, TX_BROADCAST_TIMEOUT_SECS,
@@ -306,6 +306,19 @@ impl BitcoindChainSource {
306306
})?;
307307
}
308308

309+
let res = self
310+
.poll_and_update_listeners_inner(channel_manager, chain_monitor, output_sweeper)
311+
.await;
312+
313+
self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res);
314+
315+
res
316+
}
317+
318+
async fn poll_and_update_listeners_inner(
319+
&self, channel_manager: Arc<ChannelManager>, chain_monitor: Arc<ChainMonitor>,
320+
output_sweeper: Arc<Sweeper>,
321+
) -> Result<(), Error> {
309322
let latest_chain_tip_opt = self.latest_chain_tip.read().unwrap().clone();
310323
let chain_tip = if let Some(tip) = latest_chain_tip_opt {
311324
tip
@@ -317,9 +330,7 @@ impl BitcoindChainSource {
317330
},
318331
Err(e) => {
319332
log_error!(self.logger, "Failed to poll for chain data: {:?}", e);
320-
let res = Err(Error::TxSyncFailed);
321-
self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res);
322-
return res;
333+
return Err(Error::TxSyncFailed);
323334
},
324335
}
325336
};
@@ -329,7 +340,7 @@ impl BitcoindChainSource {
329340
let chain_listener = ChainListener {
330341
onchain_wallet: Arc::clone(&self.onchain_wallet),
331342
channel_manager: Arc::clone(&channel_manager),
332-
chain_monitor,
343+
chain_monitor: Arc::clone(&chain_monitor),
333344
output_sweeper,
334345
};
335346
let mut spv_client =
@@ -344,13 +355,19 @@ impl BitcoindChainSource {
344355
now.elapsed().unwrap().as_millis()
345356
);
346357
*self.latest_chain_tip.write().unwrap() = Some(tip);
358+
359+
periodically_archive_fully_resolved_monitors(
360+
Arc::clone(&channel_manager),
361+
chain_monitor,
362+
Arc::clone(&self.kv_store),
363+
Arc::clone(&self.logger),
364+
Arc::clone(&self.node_metrics),
365+
)?;
347366
},
348367
Ok(_) => {},
349368
Err(e) => {
350369
log_error!(self.logger, "Failed to poll for chain data: {:?}", e);
351-
let res = Err(Error::TxSyncFailed);
352-
self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res);
353-
return res;
370+
return Err(Error::TxSyncFailed);
354371
},
355372
}
356373

@@ -376,9 +393,7 @@ impl BitcoindChainSource {
376393
},
377394
Err(e) => {
378395
log_error!(self.logger, "Failed to poll for mempool transactions: {:?}", e);
379-
let res = Err(Error::TxSyncFailed);
380-
self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res);
381-
return res;
396+
return Err(Error::TxSyncFailed);
382397
},
383398
}
384399

@@ -388,24 +403,13 @@ impl BitcoindChainSource {
388403
locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt;
389404
locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt;
390405

391-
let write_res = write_node_metrics(
406+
write_node_metrics(
392407
&*locked_node_metrics,
393408
Arc::clone(&self.kv_store),
394409
Arc::clone(&self.logger),
395-
);
396-
match write_res {
397-
Ok(()) => (),
398-
Err(e) => {
399-
log_error!(self.logger, "Failed to persist node metrics: {}", e);
400-
let res = Err(Error::PersistenceFailed);
401-
self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res);
402-
return res;
403-
},
404-
}
410+
)?;
405411

406-
let res = Ok(());
407-
self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res);
408-
res
412+
Ok(())
409413
}
410414

411415
pub(super) async fn update_fee_rate_estimates(&self) -> Result<(), Error> {

src/chain/electrum.rs

Lines changed: 50 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,6 @@ impl ElectrumChainSource {
103103
}
104104

105105
pub(crate) async fn sync_onchain_wallet(&self) -> Result<(), Error> {
106-
let electrum_client: Arc<ElectrumRuntimeClient> =
107-
if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() {
108-
Arc::clone(client)
109-
} else {
110-
debug_assert!(
111-
false,
112-
"We should have started the chain source before syncing the onchain wallet"
113-
);
114-
return Err(Error::FeerateEstimationUpdateFailed);
115-
};
116106
let receiver_res = {
117107
let mut status_lock = self.onchain_wallet_sync_status.lock().unwrap();
118108
status_lock.register_or_subscribe_pending_sync()
@@ -126,6 +116,24 @@ impl ElectrumChainSource {
126116
})?;
127117
}
128118

119+
let res = self.sync_onchain_wallet_inner().await;
120+
121+
self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);
122+
123+
res
124+
}
125+
126+
async fn sync_onchain_wallet_inner(&self) -> Result<(), Error> {
127+
let electrum_client: Arc<ElectrumRuntimeClient> =
128+
if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() {
129+
Arc::clone(client)
130+
} else {
131+
debug_assert!(
132+
false,
133+
"We should have started the chain source before syncing the onchain wallet"
134+
);
135+
return Err(Error::FeerateEstimationUpdateFailed);
136+
};
129137
// If this is our first sync, do a full scan with the configured gap limit.
130138
// Otherwise just do an incremental sync.
131139
let incremental_sync =
@@ -179,35 +187,13 @@ impl ElectrumChainSource {
179187
apply_wallet_update(update_res, now)
180188
};
181189

182-
self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);
183-
184190
res
185191
}
186192

187193
pub(crate) async fn sync_lightning_wallet(
188194
&self, channel_manager: Arc<ChannelManager>, chain_monitor: Arc<ChainMonitor>,
189195
output_sweeper: Arc<Sweeper>,
190196
) -> Result<(), Error> {
191-
let electrum_client: Arc<ElectrumRuntimeClient> =
192-
if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() {
193-
Arc::clone(client)
194-
} else {
195-
debug_assert!(
196-
false,
197-
"We should have started the chain source before syncing the lightning wallet"
198-
);
199-
return Err(Error::TxSyncFailed);
200-
};
201-
202-
let sync_cman = Arc::clone(&channel_manager);
203-
let sync_cmon = Arc::clone(&chain_monitor);
204-
let sync_sweeper = Arc::clone(&output_sweeper);
205-
let confirmables = vec![
206-
sync_cman as Arc<dyn Confirm + Sync + Send>,
207-
sync_cmon as Arc<dyn Confirm + Sync + Send>,
208-
sync_sweeper as Arc<dyn Confirm + Sync + Send>,
209-
];
210-
211197
let receiver_res = {
212198
let mut status_lock = self.lightning_wallet_sync_status.lock().unwrap();
213199
status_lock.register_or_subscribe_pending_sync()
@@ -221,6 +207,38 @@ impl ElectrumChainSource {
221207
})?;
222208
}
223209

210+
let res =
211+
self.sync_lightning_wallet_inner(channel_manager, chain_monitor, output_sweeper).await;
212+
213+
self.lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);
214+
215+
res
216+
}
217+
218+
async fn sync_lightning_wallet_inner(
219+
&self, channel_manager: Arc<ChannelManager>, chain_monitor: Arc<ChainMonitor>,
220+
output_sweeper: Arc<Sweeper>,
221+
) -> Result<(), Error> {
222+
let sync_cman = Arc::clone(&channel_manager);
223+
let sync_cmon = Arc::clone(&chain_monitor);
224+
let sync_sweeper = Arc::clone(&output_sweeper);
225+
let confirmables = vec![
226+
sync_cman as Arc<dyn Confirm + Sync + Send>,
227+
sync_cmon as Arc<dyn Confirm + Sync + Send>,
228+
sync_sweeper as Arc<dyn Confirm + Sync + Send>,
229+
];
230+
231+
let electrum_client: Arc<ElectrumRuntimeClient> =
232+
if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() {
233+
Arc::clone(client)
234+
} else {
235+
debug_assert!(
236+
false,
237+
"We should have started the chain source before syncing the lightning wallet"
238+
);
239+
return Err(Error::TxSyncFailed);
240+
};
241+
224242
let res = electrum_client.sync_confirmables(confirmables).await;
225243

226244
if let Ok(_) = res {
@@ -245,8 +263,6 @@ impl ElectrumChainSource {
245263
)?;
246264
}
247265

248-
self.lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);
249-
250266
res
251267
}
252268

0 commit comments

Comments
 (0)