88mod bitcoind_rpc;
99mod block_sync_init;
1010
11- use crate :: chain:: bitcoind_rpc:: BitcoindRpcClient ;
12-
11+ use crate :: chain:: bitcoind_rpc:: { BitcoindRpcClient , BoundedHeaderCache , ChainListener } ;
12+ use crate :: chain :: block_sync_init :: synchronize_listeners ;
1313use crate :: config:: {
1414 Config , EsploraSyncConfig , BDK_CLIENT_CONCURRENCY , BDK_CLIENT_STOP_GAP ,
1515 BDK_WALLET_SYNC_TIMEOUT_SECS , FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS , LDK_WALLET_SYNC_TIMEOUT_SECS ,
@@ -25,11 +25,14 @@ use crate::logger::{log_bytes, log_error, log_info, log_trace, FilesystemLogger,
2525use crate :: types:: { Broadcaster , ChainMonitor , ChannelManager , DynStore , Sweeper , Wallet } ;
2626use crate :: { Error , NodeMetrics } ;
2727
28- use lightning:: chain:: { Confirm , Filter } ;
28+ use lightning:: chain:: { Confirm , Filter , Listen } ;
2929use lightning:: util:: ser:: Writeable ;
3030
3131use lightning_transaction_sync:: EsploraSyncClient ;
3232
33+ use lightning_block_sync:: poll:: ChainPoller ;
34+ use lightning_block_sync:: SpvClient ;
35+
3336use bdk_esplora:: EsploraAsyncExt ;
3437
3538use esplora_client:: AsyncClient as EsploraAsyncClient ;
@@ -46,6 +49,8 @@ pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/ap
4649// The default Esplora client timeout we're using.
4750pub ( crate ) const DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS : u64 = 10 ;
4851
52+ const CHAIN_POLLING_INTERVAL_SECS : u64 = 1 ;
53+
4954pub ( crate ) enum WalletSyncStatus {
5055 Completed ,
5156 InProgress { subscribers : tokio:: sync:: broadcast:: Sender < Result < ( ) , Error > > } ,
@@ -242,7 +247,144 @@ impl ChainSource {
242247 }
243248 }
244249 } ,
245- Self :: BitcoindRpc { .. } => todo ! ( ) ,
250+ Self :: BitcoindRpc {
251+ bitcoind_rpc_client,
252+ onchain_wallet,
253+ kv_store,
254+ config,
255+ logger,
256+ node_metrics,
257+ ..
258+ } => {
259+ let mut header_cache = BoundedHeaderCache :: new ( ) ;
260+ let channel_manager_best_block_hash =
261+ channel_manager. current_best_block ( ) . block_hash ;
262+ let sweeper_best_block_hash = output_sweeper. current_best_block ( ) . block_hash ;
263+ let onchain_wallet_best_block_hash = onchain_wallet. current_best_block ( ) . block_hash ;
264+
265+ let mut chain_listeners = vec ! [
266+ (
267+ onchain_wallet_best_block_hash,
268+ & * * onchain_wallet as & ( dyn Listen + Send + Sync ) ,
269+ ) ,
270+ (
271+ channel_manager_best_block_hash,
272+ & * channel_manager as & ( dyn Listen + Send + Sync ) ,
273+ ) ,
274+ ( sweeper_best_block_hash, & * output_sweeper as & ( dyn Listen + Send + Sync ) ) ,
275+ ] ;
276+
277+ // TODO: Eventually we might want to see if we can synchronize `ChannelMonitor`s
278+ // before giving them to `ChainMonitor` it the first place. However, this isn't
279+ // trivial as we load them on initialization (in the `Builder`) and only gain
280+ // network access during `start`. For now, we just make sure we get the worst known
281+ // block hash and sychronize them via `ChainMonitor`.
282+ if let Some ( worst_channel_monitor_block_hash) = chain_monitor
283+ . list_monitors ( )
284+ . iter ( )
285+ . flat_map ( |( txo, _) | chain_monitor. get_monitor ( * txo) )
286+ . map ( |m| m. current_best_block ( ) )
287+ . min_by_key ( |b| b. height )
288+ . map ( |b| b. block_hash )
289+ {
290+ chain_listeners. push ( (
291+ worst_channel_monitor_block_hash,
292+ & * chain_monitor as & ( dyn Listen + Send + Sync ) ,
293+ ) ) ;
294+ }
295+
296+ let chain_tip = loop {
297+ match synchronize_listeners (
298+ bitcoind_rpc_client. as_ref ( ) ,
299+ config. network ,
300+ & mut header_cache,
301+ chain_listeners. clone ( ) ,
302+ )
303+ . await
304+ {
305+ Ok ( chain_tip) => {
306+ {
307+ let unix_time_secs_opt = SystemTime :: now ( )
308+ . duration_since ( UNIX_EPOCH )
309+ . ok ( )
310+ . map ( |d| d. as_secs ( ) ) ;
311+ let mut locked_node_metrics = node_metrics. write ( ) . unwrap ( ) ;
312+ locked_node_metrics. latest_lightning_wallet_sync_timestamp =
313+ unix_time_secs_opt;
314+ locked_node_metrics. latest_onchain_wallet_sync_timestamp =
315+ unix_time_secs_opt;
316+ write_node_metrics (
317+ & * locked_node_metrics,
318+ Arc :: clone ( & kv_store) ,
319+ Arc :: clone ( & logger) ,
320+ )
321+ . unwrap_or_else ( |e| {
322+ log_error ! ( logger, "Failed to persist node metrics: {}" , e) ;
323+ } ) ;
324+ }
325+ break chain_tip;
326+ } ,
327+
328+ Err ( e) => {
329+ log_error ! ( logger, "Failed to synchronize chain listeners: {:?}" , e) ;
330+ tokio:: time:: sleep ( Duration :: from_secs ( CHAIN_POLLING_INTERVAL_SECS ) )
331+ . await ;
332+ } ,
333+ }
334+ } ;
335+
336+ let chain_poller =
337+ ChainPoller :: new ( Arc :: clone ( & bitcoind_rpc_client) , config. network ) ;
338+ let chain_listener = ChainListener {
339+ onchain_wallet : Arc :: clone ( & onchain_wallet) ,
340+ channel_manager : Arc :: clone ( & channel_manager) ,
341+ chain_monitor : Arc :: clone ( & chain_monitor) ,
342+ output_sweeper : Arc :: clone ( & output_sweeper) ,
343+ } ;
344+ let mut spv_client =
345+ SpvClient :: new ( chain_tip, chain_poller, & mut header_cache, & chain_listener) ;
346+ let mut chain_polling_interval =
347+ tokio:: time:: interval ( Duration :: from_secs ( CHAIN_POLLING_INTERVAL_SECS ) ) ;
348+ chain_polling_interval
349+ . set_missed_tick_behavior ( tokio:: time:: MissedTickBehavior :: Skip ) ;
350+
351+ // Start the polling loop.
352+ loop {
353+ tokio:: select! {
354+ _ = stop_sync_receiver. changed( ) => {
355+ log_trace!(
356+ logger,
357+ "Stopping polling for new chain data." ,
358+ ) ;
359+ return ;
360+ }
361+ _ = chain_polling_interval. tick( ) => {
362+ let _ = spv_client. poll_best_tip( ) . await . map_err( |e| {
363+ log_error!( logger, "Failed to poll for chain data: {:?}" , e) ;
364+ } ) ;
365+ {
366+ let unix_time_secs_opt = SystemTime :: now( )
367+ . duration_since( UNIX_EPOCH )
368+ . ok( )
369+ . map( |d| d. as_secs( ) ) ;
370+ let mut locked_node_metrics = node_metrics. write( ) . unwrap( ) ;
371+ locked_node_metrics. latest_lightning_wallet_sync_timestamp =
372+ unix_time_secs_opt;
373+ locked_node_metrics. latest_onchain_wallet_sync_timestamp =
374+ unix_time_secs_opt;
375+ write_node_metrics(
376+ & * locked_node_metrics,
377+ Arc :: clone( & kv_store) ,
378+ Arc :: clone( & logger) ,
379+ )
380+ . unwrap_or_else( |e| {
381+ log_error!( logger, "Failed to persist node metrics: {}" , e) ;
382+ } ) ;
383+ }
384+ }
385+ }
386+ }
387+ } ,
246388 }
247389 }
248390
0 commit comments