Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 88 additions & 93 deletions lightning-liquidity/src/lsps2/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,16 +771,14 @@ where
/// [`ChannelManager::create_channel`]: lightning::ln::channelmanager::ChannelManager::create_channel
/// [`ChannelManager::get_intercept_scid`]: lightning::ln::channelmanager::ChannelManager::get_intercept_scid
/// [`LSPS2ServiceEvent::BuyRequest`]: crate::lsps2::event::LSPS2ServiceEvent::BuyRequest
#[allow(clippy::await_holding_lock)]
pub async fn invoice_parameters_generated(
&self, counterparty_node_id: &PublicKey, request_id: LSPSRequestId, intercept_scid: u64,
cltv_expiry_delta: u32, client_trusts_lsp: bool, user_channel_id: u128,
) -> Result<(), APIError> {
let mut message_queue_notifier = self.pending_messages.notifier();
let mut should_persist = false;

let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
match self.per_peer_state.read().unwrap().get(counterparty_node_id) {
Some(inner_state_lock) => {
let mut peer_state_lock = inner_state_lock.lock().unwrap();

Expand Down Expand Up @@ -827,8 +825,6 @@ where
},
};

drop(outer_state_lock);

if should_persist {
self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
APIError::APIMisuseError {
Expand All @@ -855,16 +851,16 @@ where
///
/// [`Event::HTLCIntercepted`]: lightning::events::Event::HTLCIntercepted
/// [`LSPS2ServiceEvent::OpenChannel`]: crate::lsps2::event::LSPS2ServiceEvent::OpenChannel
#[allow(clippy::await_holding_lock)]
pub async fn htlc_intercepted(
&self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64,
payment_hash: PaymentHash,
) -> Result<(), APIError> {
let event_queue_notifier = self.pending_events.notifier();
let mut should_persist = None;

let peer_by_intercept_scid = self.peer_by_intercept_scid.read().unwrap();
if let Some(counterparty_node_id) = peer_by_intercept_scid.get(&intercept_scid) {
if let Some(counterparty_node_id) =
self.peer_by_intercept_scid.read().unwrap().get(&intercept_scid)
{
let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
Expand Down Expand Up @@ -939,8 +935,6 @@ where
}
}

drop(peer_by_intercept_scid);

if let Some(counterparty_node_id) = should_persist {
self.persist_peer_state(counterparty_node_id).await.map_err(|e| {
APIError::APIMisuseError {
Expand Down Expand Up @@ -1131,55 +1125,57 @@ where
/// open, as it only affects the local LSPS2 state and doesn't affect any channels that
/// might already exist on-chain. Any pending channel open attempts must be managed
/// separately.
#[allow(clippy::await_holding_lock)]
pub async fn channel_open_abandoned(
&self, counterparty_node_id: &PublicKey, user_channel_id: u128,
) -> Result<(), APIError> {
let outer_state_lock = self.per_peer_state.read().unwrap();
let inner_state_lock =
outer_state_lock.get(counterparty_node_id).ok_or_else(|| APIError::APIMisuseError {
err: format!("No counterparty state for: {}", counterparty_node_id),
})?;
let mut peer_state = inner_state_lock.lock().unwrap();

let intercept_scid = peer_state
.intercept_scid_by_user_channel_id
.get(&user_channel_id)
.copied()
.ok_or_else(|| APIError::APIMisuseError {
err: format!("Could not find a channel with user_channel_id {}", user_channel_id),
{
let outer_state_lock = self.per_peer_state.read().unwrap();
let inner_state_lock = outer_state_lock.get(counterparty_node_id).ok_or_else(|| {
APIError::APIMisuseError {
err: format!("No counterparty state for: {}", counterparty_node_id),
}
})?;
let mut peer_state = inner_state_lock.lock().unwrap();

let jit_channel = peer_state
.outbound_channels_by_intercept_scid
.get(&intercept_scid)
.ok_or_else(|| APIError::APIMisuseError {
let intercept_scid = peer_state
.intercept_scid_by_user_channel_id
.get(&user_channel_id)
.copied()
.ok_or_else(|| APIError::APIMisuseError {
err: format!(
"Could not find a channel with user_channel_id {}",
user_channel_id
),
})?;

let jit_channel = peer_state
.outbound_channels_by_intercept_scid
.get(&intercept_scid)
.ok_or_else(|| APIError::APIMisuseError {
err: format!(
"Failed to map intercept_scid {} for user_channel_id {} to a channel.",
intercept_scid, user_channel_id,
),
})?;

let is_pending = matches!(
jit_channel.state,
OutboundJITChannelState::PendingInitialPayment { .. }
| OutboundJITChannelState::PendingChannelOpen { .. }
);

if !is_pending {
return Err(APIError::APIMisuseError {
err: "Cannot abandon channel open after channel creation or payment forwarding"
.to_string(),
});
}
let is_pending = matches!(
jit_channel.state,
OutboundJITChannelState::PendingInitialPayment { .. }
| OutboundJITChannelState::PendingChannelOpen { .. }
);

peer_state.intercept_scid_by_user_channel_id.remove(&user_channel_id);
peer_state.outbound_channels_by_intercept_scid.remove(&intercept_scid);
peer_state.intercept_scid_by_channel_id.retain(|_, &mut scid| scid != intercept_scid);
peer_state.needs_persist |= true;
if !is_pending {
return Err(APIError::APIMisuseError {
err: "Cannot abandon channel open after channel creation or payment forwarding"
.to_string(),
});
}

drop(peer_state);
drop(outer_state_lock);
peer_state.intercept_scid_by_user_channel_id.remove(&user_channel_id);
peer_state.outbound_channels_by_intercept_scid.remove(&intercept_scid);
peer_state.intercept_scid_by_channel_id.retain(|_, &mut scid| scid != intercept_scid);
peer_state.needs_persist |= true;
}

self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
APIError::APIMisuseError {
Expand All @@ -1197,62 +1193,63 @@ where
/// state so that the payer may try the payment again.
///
/// [`LSPS2ServiceEvent::OpenChannel`]: crate::lsps2::event::LSPS2ServiceEvent::OpenChannel
#[allow(clippy::await_holding_lock)]
pub async fn channel_open_failed(
&self, counterparty_node_id: &PublicKey, user_channel_id: u128,
) -> Result<(), APIError> {
let outer_state_lock = self.per_peer_state.read().unwrap();
{
let outer_state_lock = self.per_peer_state.read().unwrap();

let inner_state_lock =
outer_state_lock.get(counterparty_node_id).ok_or_else(|| APIError::APIMisuseError {
err: format!("No counterparty state for: {}", counterparty_node_id),
let inner_state_lock = outer_state_lock.get(counterparty_node_id).ok_or_else(|| {
APIError::APIMisuseError {
err: format!("No counterparty state for: {}", counterparty_node_id),
}
})?;

let mut peer_state = inner_state_lock.lock().unwrap();
let mut peer_state = inner_state_lock.lock().unwrap();

let intercept_scid = peer_state
.intercept_scid_by_user_channel_id
.get(&user_channel_id)
.copied()
.ok_or_else(|| APIError::APIMisuseError {
err: format!("Could not find a channel with user_channel_id {}", user_channel_id),
})?;
let intercept_scid = peer_state
.intercept_scid_by_user_channel_id
.get(&user_channel_id)
.copied()
.ok_or_else(|| APIError::APIMisuseError {
err: format!(
"Could not find a channel with user_channel_id {}",
user_channel_id
),
})?;

let jit_channel = peer_state
.outbound_channels_by_intercept_scid
.get_mut(&intercept_scid)
.ok_or_else(|| APIError::APIMisuseError {
err: format!(
"Failed to map intercept_scid {} for user_channel_id {} to a channel.",
intercept_scid, user_channel_id,
),
})?;
let jit_channel = peer_state
.outbound_channels_by_intercept_scid
.get_mut(&intercept_scid)
.ok_or_else(|| APIError::APIMisuseError {
err: format!(
"Failed to map intercept_scid {} for user_channel_id {} to a channel.",
intercept_scid, user_channel_id,
),
})?;

if let OutboundJITChannelState::PendingChannelOpen { payment_queue, .. } =
&mut jit_channel.state
{
let intercepted_htlcs = payment_queue.clear();
for htlc in intercepted_htlcs {
self.channel_manager.get_cm().fail_htlc_backwards_with_reason(
&htlc.payment_hash,
FailureCode::TemporaryNodeFailure,
);
}

if let OutboundJITChannelState::PendingChannelOpen { payment_queue, .. } =
&mut jit_channel.state
{
let intercepted_htlcs = payment_queue.clear();
for htlc in intercepted_htlcs {
self.channel_manager.get_cm().fail_htlc_backwards_with_reason(
&htlc.payment_hash,
FailureCode::TemporaryNodeFailure,
);
jit_channel.state = OutboundJITChannelState::PendingInitialPayment {
payment_queue: PaymentQueue::new(),
};
} else {
return Err(APIError::APIMisuseError {
err: "Channel is not in the PendingChannelOpen state.".to_string(),
});
}

jit_channel.state = OutboundJITChannelState::PendingInitialPayment {
payment_queue: PaymentQueue::new(),
};
} else {
return Err(APIError::APIMisuseError {
err: "Channel is not in the PendingChannelOpen state.".to_string(),
});
peer_state.needs_persist |= true;
}

peer_state.needs_persist |= true;

drop(peer_state);
drop(outer_state_lock);

self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
APIError::APIMisuseError {
err: format!("Failed to persist peer state for {}: {}", counterparty_node_id, e),
Expand All @@ -1268,7 +1265,6 @@ where
/// we need to forward a payment over otherwise it will be ignored.
///
/// [`Event::ChannelReady`]: lightning::events::Event::ChannelReady
#[allow(clippy::await_holding_lock)]
pub async fn channel_ready(
&self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey,
) -> Result<(), APIError> {
Expand All @@ -1277,8 +1273,7 @@ where
let mut peer_by_channel_id = self.peer_by_channel_id.write().unwrap();
peer_by_channel_id.insert(*channel_id, *counterparty_node_id);
}
let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
match self.per_peer_state.read().unwrap().get(counterparty_node_id) {
Some(inner_state_lock) => {
let mut peer_state = inner_state_lock.lock().unwrap();
if let Some(intercept_scid) =
Expand Down
Loading