Skip to content

Commit 159e48c

Browse files
committed
try out async
1 parent f71c795 commit 159e48c

File tree

1 file changed

+64
-52
lines changed

1 file changed

+64
-52
lines changed

lightning/src/util/sweep.rs

Lines changed: 64 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use bitcoin::{BlockHash, ScriptBuf, Transaction, Txid};
3636

3737
use core::future::Future;
3838
use core::ops::Deref;
39+
use core::pin::Pin;
3940
use core::sync::atomic::{AtomicBool, Ordering};
4041
use core::task;
4142

@@ -414,7 +415,7 @@ where
414415
/// Returns `Err` on persistence failure, in which case the call may be safely retried.
415416
///
416417
/// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
417-
pub fn track_spendable_outputs(
418+
pub async fn track_spendable_outputs(
418419
&self, output_descriptors: Vec<SpendableOutputDescriptor>, channel_id: Option<ChannelId>,
419420
exclude_static_outputs: bool, delay_until_height: Option<u32>,
420421
) -> Result<(), ()> {
@@ -430,24 +431,32 @@ where
430431
return Ok(());
431432
}
432433

433-
let mut state_lock = self.sweeper_state.lock().unwrap();
434-
for descriptor in relevant_descriptors {
435-
let output_info = TrackedSpendableOutput {
436-
descriptor,
437-
channel_id,
438-
status: OutputSpendStatus::PendingInitialBroadcast {
439-
delayed_until_height: delay_until_height,
440-
},
441-
};
442-
443-
let mut outputs = state_lock.persistent.outputs.iter();
444-
if outputs.find(|o| o.descriptor == output_info.descriptor).is_some() {
445-
continue;
446-
}
434+
let persist_fut;
435+
{
436+
let mut state_lock = self.sweeper_state.lock().unwrap();
437+
for descriptor in relevant_descriptors {
438+
let output_info = TrackedSpendableOutput {
439+
descriptor,
440+
channel_id,
441+
status: OutputSpendStatus::PendingInitialBroadcast {
442+
delayed_until_height: delay_until_height,
443+
},
444+
};
445+
446+
let mut outputs = state_lock.persistent.outputs.iter();
447+
if outputs.find(|o| o.descriptor == output_info.descriptor).is_some() {
448+
continue;
449+
}
447450

448-
state_lock.persistent.outputs.push(output_info);
451+
state_lock.persistent.outputs.push(output_info);
452+
}
453+
persist_fut = self.persist_state(&state_lock.persistent);
454+
state_lock.dirty = false;
449455
}
450-
self.flush_state(&mut state_lock).map_err(|e| {
456+
457+
persist_fut.await.map_err(|e| {
458+
self.sweeper_state.lock().unwrap().dirty = true;
459+
451460
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
452461
})
453462
}
@@ -476,12 +485,20 @@ where
476485
}
477486

478487
let result = {
479-
self.regenerate_and_broadcast_spend_if_necessary_internal().await?;
488+
let dirty = self.regenerate_and_broadcast_spend_if_necessary_internal().await?;
480489

481490
// If there is still dirty state, we need to persist it.
482-
let mut sweeper_state = self.sweeper_state.lock().unwrap();
483-
if sweeper_state.dirty {
484-
self.flush_state(&mut sweeper_state).map_err(|e| {
491+
if dirty {
492+
let persist_fut;
493+
{
494+
let mut sweeper_state = self.sweeper_state.lock().unwrap();
495+
persist_fut = self.persist_state(&sweeper_state.persistent);
496+
sweeper_state.dirty = false;
497+
}
498+
499+
persist_fut.await.map_err(|e| {
500+
self.sweeper_state.lock().unwrap().dirty = true;
501+
485502
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
486503
})
487504
} else {
@@ -496,7 +513,7 @@ where
496513
}
497514

498515
/// Regenerates and broadcasts the spending transaction for any outputs that are pending
499-
async fn regenerate_and_broadcast_spend_if_necessary_internal(&self) -> Result<(), ()> {
516+
async fn regenerate_and_broadcast_spend_if_necessary_internal(&self) -> Result<bool, ()> {
500517
let filter_fn = |o: &TrackedSpendableOutput, cur_height: u32| {
501518
if o.status.is_confirmed() {
502519
// Don't rebroadcast confirmed txs.
@@ -524,7 +541,7 @@ where
524541
let has_respends =
525542
sweeper_state.persistent.outputs.iter().any(|o| filter_fn(o, cur_height));
526543
if !has_respends {
527-
return Ok(());
544+
return Ok(sweeper_state.dirty);
528545
}
529546
}
530547

@@ -533,6 +550,7 @@ where
533550
self.change_destination_source.get_change_destination_script().await?;
534551

535552
// Sweep the outputs.
553+
let persist_fut;
536554
{
537555
let mut sweeper_state = self.sweeper_state.lock().unwrap();
538556

@@ -549,7 +567,7 @@ where
549567

550568
if respend_descriptors.is_empty() {
551569
// It could be that a tx confirmed and there is now nothing to sweep anymore.
552-
return Ok(());
570+
return Ok(sweeper_state.dirty);
553571
}
554572

555573
let spending_tx = self
@@ -581,14 +599,18 @@ where
581599
output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
582600
}
583601

584-
self.flush_state(&mut sweeper_state).map_err(|e| {
585-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
586-
})?;
587-
602+
persist_fut = self.persist_state(&sweeper_state.persistent);
603+
sweeper_state.dirty = false;
588604
self.broadcaster.broadcast_transactions(&[&spending_tx]);
589605
}
590606

591-
Ok(())
607+
persist_fut.await.map_err(|e| {
608+
self.sweeper_state.lock().unwrap().dirty = true;
609+
610+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
611+
})?;
612+
613+
Ok(false)
592614
}
593615

594616
fn prune_confirmed_outputs(&self, sweeper_state: &mut SweeperState) {
@@ -613,29 +635,19 @@ where
613635
sweeper_state.dirty = true;
614636
}
615637

616-
/// Flushes the current state to the persistence layer and marks the state as clean.
617-
fn flush_state(&self, sweeper_state: &mut SweeperState) -> Result<(), io::Error> {
618-
self.kv_store
619-
.write(
620-
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
621-
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
622-
OUTPUT_SWEEPER_PERSISTENCE_KEY,
623-
&sweeper_state.persistent.encode(),
624-
)
625-
.map_err(|e| {
626-
log_error!(
627-
self.logger,
628-
"Write for key {}/{}/{} failed due to: {}",
629-
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
630-
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
631-
OUTPUT_SWEEPER_PERSISTENCE_KEY,
632-
e
633-
);
634-
e
635-
})
636-
.map(|_| {
637-
sweeper_state.dirty = false;
638-
})
638+
fn persist_state<'a>(
639+
&self, sweeper_state: &PersistentSweeperState,
640+
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'a + Send>> {
641+
let encoded = &sweeper_state.encode();
642+
643+
let result = self.kv_store.write(
644+
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
645+
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
646+
OUTPUT_SWEEPER_PERSISTENCE_KEY,
647+
encoded,
648+
);
649+
650+
Box::pin(async move { result })
639651
}
640652

641653
fn spend_outputs(

0 commit comments

Comments
 (0)