@@ -2719,6 +2719,10 @@ pub struct ChannelManager<
27192719 /// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
27202720 pending_events_processor: AtomicBool,
27212721
2722+ /// A simple atomic flag to ensure only one task at a time can be processing HTLC forwards via
2723+ /// [`Self::process_pending_htlc_forwards`].
2724+ pending_htlc_forwards_processor: AtomicBool,
2725+
27222726 /// If we are running during init (either directly during the deserialization method or in
27232727 /// block connection methods which run after deserialization but before normal operation) we
27242728 /// cannot provide the user with [`ChannelMonitorUpdate`]s through the normal update flow -
@@ -3795,6 +3799,7 @@ where
37953799
37963800 pending_events: Mutex::new(VecDeque::new()),
37973801 pending_events_processor: AtomicBool::new(false),
3802+ pending_htlc_forwards_processor: AtomicBool::new(false),
37983803 pending_background_events: Mutex::new(Vec::new()),
37993804 total_consistency_lock: RwLock::new(()),
38003805 background_events_processed_since_startup: AtomicBool::new(false),
@@ -6325,9 +6330,19 @@ where
63256330 ///
63266331 /// Will regularly be called by the background processor.
63276332 pub fn process_pending_htlc_forwards(&self) {
6333+ if self
6334+ .pending_htlc_forwards_processor
6335+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
6336+ .is_err()
6337+ {
6338+ return;
6339+ }
6340+
63286341 let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
63296342 self.internal_process_pending_htlc_forwards()
63306343 });
6344+
6345+ self.pending_htlc_forwards_processor.store(false, Ordering::Release);
63316346 }
63326347
63336348 // Returns whether or not we need to re-persist.
@@ -16296,6 +16311,7 @@ where
1629616311
1629716312 pending_events: Mutex::new(pending_events_read),
1629816313 pending_events_processor: AtomicBool::new(false),
16314+ pending_htlc_forwards_processor: AtomicBool::new(false),
1629916315 pending_background_events: Mutex::new(pending_background_events),
1630016316 total_consistency_lock: RwLock::new(()),
1630116317 background_events_processed_since_startup: AtomicBool::new(false),
0 commit comments