@@ -2722,6 +2722,10 @@ pub struct ChannelManager<
27222722 /// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
27232723 pending_events_processor: AtomicBool,
27242724
2725+ /// A simple atomic flag to ensure only one task at a time can be processing HTLC forwards via
2726+ /// [`Self::process_pending_htlc_forwards`].
2727+ pending_htlc_forwards_processor: AtomicBool,
2728+
27252729 /// If we are running during init (either directly during the deserialization method or in
27262730 /// block connection methods which run after deserialization but before normal operation) we
27272731 /// cannot provide the user with [`ChannelMonitorUpdate`]s through the normal update flow -
@@ -3786,6 +3790,7 @@ where
37863790
37873791 pending_events: Mutex::new(VecDeque::new()),
37883792 pending_events_processor: AtomicBool::new(false),
3793+ pending_htlc_forwards_processor: AtomicBool::new(false),
37893794 pending_background_events: Mutex::new(Vec::new()),
37903795 total_consistency_lock: RwLock::new(()),
37913796 background_events_processed_since_startup: AtomicBool::new(false),
@@ -6356,9 +6361,19 @@ where
63566361 ///
63576362 /// Will regularly be called by the background processor.
63586363 pub fn process_pending_htlc_forwards(&self) {
6364+ if self
6365+ .pending_htlc_forwards_processor
6366+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
6367+ .is_err()
6368+ {
6369+ return;
6370+ }
6371+
63596372 let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
63606373 self.internal_process_pending_htlc_forwards()
63616374 });
6375+
6376+ self.pending_htlc_forwards_processor.store(false, Ordering::Release);
63626377 }
63636378
63646379 // Returns whether or not we need to re-persist.
@@ -16448,6 +16463,7 @@ where
1644816463
1644916464 pending_events: Mutex::new(pending_events_read),
1645016465 pending_events_processor: AtomicBool::new(false),
16466+ pending_htlc_forwards_processor: AtomicBool::new(false),
1645116467 pending_background_events: Mutex::new(pending_background_events),
1645216468 total_consistency_lock: RwLock::new(()),
1645316469 background_events_processed_since_startup: AtomicBool::new(false),
0 commit comments