diff --git a/src/ldk.rs b/src/ldk.rs index e6dbaa8..30788a5 100644 --- a/src/ldk.rs +++ b/src/ldk.rs @@ -8,7 +8,7 @@ use bitcoin_bech32::WitnessProgram; use lightning::chain::{chainmonitor, ChannelMonitorUpdateStatus}; use lightning::chain::{BestBlock, Filter, Watch}; use lightning::events::bump_transaction::{BumpTransactionEventHandler, Wallet}; -use lightning::events::{Event, PaymentFailureReason, PaymentPurpose}; +use lightning::events::{Event, PaymentFailureReason, PaymentPurpose, ReplayEvent}; use lightning::ln::channelmanager::{self, PaymentId, RecentPaymentDetails}; use lightning::ln::channelmanager::{ ChainParameters, ChannelManagerReadArgs, SimpleArcChannelManager, @@ -505,7 +505,7 @@ async fn handle_ldk_events( event: Event, unlocked_state: Arc, static_state: Arc, -) { +) -> Result<(), ReplayEvent> { match event { Event::FundingGenerationReady { temporary_channel_id, @@ -578,6 +578,7 @@ async fn handle_ldk_events( let funding_tx = psbt.clone().extract_tx().unwrap(); let funding_txid = funding_tx.compute_txid().to_string(); + tracing::info!("Funding TXID: {funding_txid}"); let psbt_path = static_state .ldk_data_dir @@ -618,7 +619,7 @@ async fn handle_ldk_events( if let Err(e) = res { tracing::error!("cannot post consignment: {e}"); - return; + return Err(ReplayEvent()); } } @@ -980,15 +981,24 @@ async fn handle_ldk_events( let state_copy = unlocked_state.clone(); let psbt_str_copy = psbt_str.clone(); + + let is_chan_colored = + is_channel_rgb(&channel_id, &PathBuf::from(&static_state.ldk_data_dir)); + tracing::info!("Initiator of the channel (colored: {})", is_chan_colored); + let _txid = tokio::task::spawn_blocking(move || { - if is_channel_rgb(&channel_id, &PathBuf::from(&static_state.ldk_data_dir)) { - state_copy.rgb_send_end(psbt_str_copy).unwrap().txid + if is_chan_colored { + state_copy.rgb_send_end(psbt_str_copy).map(|r| r.txid) } else { - state_copy.rgb_send_btc_end(psbt_str_copy).unwrap() + state_copy.rgb_send_btc_end(psbt_str_copy) } }) .await - .unwrap(); + .unwrap() + .map_err(|e| { + tracing::error!("Error completing channel opening: {e:?}"); + ReplayEvent() + })?; *unlocked_state.rgb_send_lock.lock().unwrap() = false; } else { @@ -997,7 +1007,8 @@ async fn handle_ldk_events( .ldk_data_dir .join(format!("consignment_{funding_txid}")); if !consignment_path.exists() { - return; + // vanilla channel + return Ok(()); } let consignment = RgbTransfer::load_file(consignment_path).expect("successful consignment load"); @@ -1118,7 +1129,7 @@ async fn handle_ldk_events( .channel_manager .fail_intercepted_htlc(intercept_id) .unwrap(); - return; + return Ok(()); } Some(x) => x, }; @@ -1167,7 +1178,7 @@ async fn handle_ldk_events( .channel_manager .fail_intercepted_htlc(intercept_id) .unwrap(); - return; + return Ok(()); } tracing::debug!("Swap is whitelisted, forwarding the htlc..."); @@ -1208,6 +1219,7 @@ async fn handle_ldk_events( }); } } + Ok(()) } impl OutputSpender for RgbOutputSpender { @@ -1997,10 +2009,7 @@ pub(crate) async fn start_ldk( let event_handler = move |event: Event| { let unlocked_state_copy = Arc::clone(&unlocked_state_copy); let static_state_copy = Arc::clone(&static_state_copy); - async move { - handle_ldk_events(event, unlocked_state_copy, static_state_copy).await; - Ok(()) - } + async move { handle_ldk_events(event, unlocked_state_copy, static_state_copy).await } }; // Background Processing diff --git a/src/routes.rs b/src/routes.rs index 02b2ddc..f56a60e 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -1223,7 +1223,8 @@ impl AppState { pub(crate) async fn address( State(state): State>, ) -> Result, APIError> { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let address = unlocked_state.rgb_get_address()?; @@ -1234,7 +1235,8 @@ pub(crate) async fn asset_balance( State(state): State>, WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let contract_id = ContractId::from_str(&payload.asset_id) .map_err(|_| APIError::InvalidAssetID(payload.asset_id))?; @@ -1299,7 +1301,7 @@ pub(crate) async fn backup( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let _unlocked_state = state.check_locked().await?; + let _guard = state.check_locked().await?; let _mnemonic = check_password_validity(&payload.password, &state.static_state.storage_dir_path)?; @@ -1319,7 +1321,8 @@ pub(crate) async fn btc_balance( State(state): State>, WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let btc_balance = unlocked_state.rgb_get_btc_balance(payload.skip_sync)?; @@ -1343,7 +1346,7 @@ pub(crate) async fn change_password( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let _unlocked_state = state.check_locked().await?; + let _guard = state.check_locked().await?; check_password_strength(payload.new_password.clone())?; @@ -1384,7 +1387,8 @@ pub(crate) async fn close_channel( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let channel_id_vec = hex_str_to_vec(&payload.channel_id); if channel_id_vec.is_none() || channel_id_vec.as_ref().unwrap().len() != 32 { @@ -1432,7 +1436,8 @@ pub(crate) async fn connect_peer( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let (peer_pubkey, peer_addr) = parse_peer_info(payload.peer_pubkey_and_addr.to_string())?; @@ -1460,7 +1465,8 @@ pub(crate) async fn create_utxos( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); unlocked_state.rgb_create_utxos( payload.up_to, @@ -1480,7 +1486,7 @@ pub(crate) async fn decode_ln_invoice( State(state): State>, WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { - let _unlocked_app_state = state.get_unlocked_app_state(); + let _guard = state.get_unlocked_app_state(); let invoice = match Bolt11Invoice::from_str(&payload.invoice) { Err(e) => return Err(APIError::InvalidInvoice(e.to_string())), @@ -1504,7 +1510,7 @@ pub(crate) async fn decode_rgb_invoice( State(state): State>, WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { - let _unlocked_app_state = state.get_unlocked_app_state(); + let _guard = state.get_unlocked_app_state(); let invoice_data = RgbLibInvoice::new(payload.invoice)?.invoice_data(); @@ -1524,7 +1530,8 @@ pub(crate) async fn disconnect_peer( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let peer_pubkey = match PublicKey::from_str(&payload.peer_pubkey) { Ok(pubkey) => pubkey, @@ -1584,7 +1591,8 @@ pub(crate) async fn fail_transfers( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let unlocked_state_copy = unlocked_state.clone(); let transfers_changed = tokio::task::spawn_blocking(move || { @@ -1667,7 +1675,8 @@ pub(crate) async fn invoice_status( State(state): State>, WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let invoice = match Bolt11Invoice::from_str(&payload.invoice) { Err(e) => return Err(APIError::InvalidInvoice(e.to_string())), @@ -1693,7 +1702,8 @@ pub(crate) async fn issue_asset_cfa( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); if *unlocked_state.rgb_send_lock.lock().unwrap() { return Err(APIError::OpenChannelInProgress); @@ -1727,7 +1737,8 @@ pub(crate) async fn issue_asset_nia( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); if *unlocked_state.rgb_send_lock.lock().unwrap() { return Err(APIError::OpenChannelInProgress); @@ -1752,7 +1763,8 @@ pub(crate) async fn issue_asset_uda( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); if *unlocked_state.rgb_send_lock.lock().unwrap() { return Err(APIError::OpenChannelInProgress); @@ -1793,7 +1805,8 @@ pub(crate) async fn keysend( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let dest_pubkey = match hex_str_to_compressed_pubkey(&payload.dest_pubkey) { Some(pk) => pk, @@ -1891,7 +1904,8 @@ pub(crate) async fn list_assets( State(state): State>, WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let rgb_assets = unlocked_state.rgb_list_assets( payload @@ -1967,7 +1981,8 @@ pub(crate) async fn list_assets( pub(crate) async fn list_channels( State(state): State>, ) -> Result, APIError> { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let mut channels = vec![]; for chan_info in unlocked_state.channel_manager.list_channels() { @@ -2043,7 +2058,8 @@ pub(crate) async fn list_channels( pub(crate) async fn list_payments( State(state): State>, ) -> Result, APIError> { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let inbound_payments = unlocked_state.inbound_payments(); let outbound_payments = unlocked_state.outbound_payments(); @@ -2106,7 +2122,8 @@ pub(crate) async fn get_payment( State(state): State>, WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let payment_hash_vec = hex_str_to_vec(&payload.payment_hash); if payment_hash_vec.is_none() || payment_hash_vec.as_ref().unwrap().len() != 32 { @@ -2180,7 +2197,8 @@ pub(crate) async fn get_payment( pub(crate) async fn list_peers( State(state): State>, ) -> Result, APIError> { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let mut peers = vec![]; for peer_details in unlocked_state.peer_manager.list_peers() { @@ -2195,7 +2213,8 @@ pub(crate) async fn list_peers( pub(crate) async fn list_swaps( State(state): State>, ) -> Result, APIError> { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let map_swap = |payment_hash: &PaymentHash, swap_data: &SwapData, taker: bool| { let mut status = swap_data.status.clone(); @@ -2246,7 +2265,8 @@ pub(crate) async fn get_swap( State(state): State>, WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let payment_hash_vec = hex_str_to_vec(&payload.payment_hash); if payment_hash_vec.is_none() || payment_hash_vec.as_ref().unwrap().len() != 32 { @@ -2307,7 +2327,8 @@ pub(crate) async fn list_transactions( State(state): State>, WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let mut transactions = vec![]; for tx in unlocked_state.rgb_list_transactions(payload.skip_sync)? { @@ -2336,7 +2357,8 @@ pub(crate) async fn list_transfers( State(state): State>, WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let mut transfers = vec![]; for transfer in unlocked_state.rgb_list_transfers(payload.asset_id)? { @@ -2385,7 +2407,8 @@ pub(crate) async fn list_unspents( State(state): State>, WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let mut unspents = vec![]; for unspent in unlocked_state.rgb_list_unspents(payload.skip_sync)? { @@ -2414,7 +2437,8 @@ pub(crate) async fn ln_invoice( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let contract_id = if let Some(asset_id) = payload.asset_id { Some(ContractId::from_str(&asset_id).map_err(|_| APIError::InvalidAssetID(asset_id))?) @@ -2509,7 +2533,8 @@ pub(crate) async fn maker_execute( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let swapstring = SwapString::from_str(&payload.swapstring) .map_err(|e| APIError::InvalidSwapString(payload.swapstring.clone(), e.to_string()))?; @@ -2717,7 +2742,8 @@ pub(crate) async fn maker_init( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let from_asset = match &payload.from_asset { None => None, @@ -2790,7 +2816,8 @@ pub(crate) async fn maker_init( pub(crate) async fn network_info( State(state): State>, ) -> Result, APIError> { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let best_block = unlocked_state.channel_manager.current_best_block(); @@ -2803,7 +2830,8 @@ pub(crate) async fn network_info( pub(crate) async fn node_info( State(state): State>, ) -> Result, APIError> { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let chans = unlocked_state.channel_manager.list_channels(); @@ -2869,7 +2897,8 @@ pub(crate) async fn open_channel( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); if *unlocked_state.rgb_send_lock.lock().unwrap() { return Err(APIError::OpenChannelInProgress); @@ -3116,7 +3145,8 @@ pub(crate) async fn post_asset_media( mut multipart: Multipart, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let digest = if let Some(field) = multipart .next_field() @@ -3164,9 +3194,11 @@ pub(crate) async fn refresh_transfers( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); + let unlocked_state_copy = unlocked_state.clone(); - tokio::task::spawn_blocking(move || unlocked_state.rgb_refresh(payload.skip_sync)) + tokio::task::spawn_blocking(move || unlocked_state_copy.rgb_refresh(payload.skip_sync)) .await .unwrap()?; @@ -3220,7 +3252,8 @@ pub(crate) async fn rgb_invoice( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); if *unlocked_state.rgb_send_lock.lock().unwrap() { return Err(APIError::OpenChannelInProgress); @@ -3257,7 +3290,8 @@ pub(crate) async fn send_asset( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); if *unlocked_state.rgb_send_lock.lock().unwrap() { return Err(APIError::OpenChannelInProgress); @@ -3273,8 +3307,9 @@ pub(crate) async fn send_asset( }] }; + let unlocked_state_copy = unlocked_state.clone(); let send_result = tokio::task::spawn_blocking(move || { - unlocked_state.rgb_send( + unlocked_state_copy.rgb_send( recipient_map, payload.donation, payload.fee_rate, @@ -3297,7 +3332,8 @@ pub(crate) async fn send_btc( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let txid = unlocked_state.rgb_send_btc( payload.address, @@ -3316,7 +3352,8 @@ pub(crate) async fn send_onion_message( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); if payload.node_ids.is_empty() { return Err(APIError::InvalidNodeIds(s!( @@ -3380,7 +3417,8 @@ pub(crate) async fn send_payment( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let mut status = HTLCStatus::Pending; let created_at = get_current_timestamp(); @@ -3567,7 +3605,8 @@ pub(crate) async fn sign_message( State(state): State>, WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let message = payload.message.trim(); let signed_message = lightning::util::message_signing::sign( @@ -3582,7 +3621,8 @@ pub(crate) async fn sync( State(state): State>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); unlocked_state.rgb_sync()?; @@ -3596,7 +3636,8 @@ pub(crate) async fn taker( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + let guard = state.check_unlocked().await?; + let unlocked_state = guard.as_ref().unwrap(); let swapstring = SwapString::from_str(&payload.swapstring) .map_err(|e| APIError::InvalidSwapString(payload.swapstring.clone(), e.to_string()))?; diff --git a/src/test/concurrent_openchannel.rs b/src/test/concurrent_openchannel.rs new file mode 100644 index 0000000..3db73a8 --- /dev/null +++ b/src/test/concurrent_openchannel.rs @@ -0,0 +1,108 @@ +use super::*; + +const TEST_DIR_BASE: &str = "tmp/concurrent_openchannel/"; + +#[serial_test::serial] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[traced_test] +async fn concurrent_openchannel() { + initialize(); + + const CHANNEL_CAPACITY_SAT: u64 = 100000; + const PUSH_MSAT: u64 = 20000; + const ASSET_AMOUNT: u64 = 100; + + let node1_peer_port = NODE1_PEER_PORT; + let node2_peer_port = NODE2_PEER_PORT; + let node3_peer_port = NODE3_PEER_PORT; + let node4_peer_port = NODE4_PEER_PORT; + let node5_peer_port = NODE5_PEER_PORT; + let node6_peer_port = NODE6_PEER_PORT; + + let test_dir_node1 = format!("{TEST_DIR_BASE}node1"); + let test_dir_node2 = format!("{TEST_DIR_BASE}node2"); + let test_dir_node3 = format!("{TEST_DIR_BASE}node3"); + let test_dir_node4 = format!("{TEST_DIR_BASE}node4"); + let test_dir_node5 = format!("{TEST_DIR_BASE}node5"); + let test_dir_node6 = format!("{TEST_DIR_BASE}node6"); + + let (node1_addr, _) = start_node(&test_dir_node1, node1_peer_port, false).await; + let (node2_addr, _) = start_node(&test_dir_node2, node2_peer_port, false).await; + let (node3_addr, _) = start_node(&test_dir_node3, node3_peer_port, false).await; + let (node4_addr, _) = start_node(&test_dir_node4, node4_peer_port, false).await; + let (node5_addr, _) = start_node(&test_dir_node5, node5_peer_port, false).await; + let (node6_addr, _) = start_node(&test_dir_node6, node6_peer_port, false).await; + + fund_and_create_utxos(node1_addr, None).await; + create_utxos(node1_addr, false, Some(5), Some(110000)).await; + fund_and_create_utxos(node2_addr, None).await; + fund_and_create_utxos(node3_addr, None).await; + fund_and_create_utxos(node4_addr, None).await; + fund_and_create_utxos(node5_addr, None).await; + fund_and_create_utxos(node6_addr, None).await; + + let asset_id = issue_asset_nia(node1_addr).await.asset_id; + + let _node1_pubkey = node_info(node1_addr).await.pubkey; + let node2_pubkey = node_info(node2_addr).await.pubkey; + let node3_pubkey = node_info(node3_addr).await.pubkey; + let node4_pubkey = node_info(node4_addr).await.pubkey; + let node5_pubkey = node_info(node5_addr).await.pubkey; + let node6_pubkey = node_info(node6_addr).await.pubkey; + + let (_channel_mt_1, _channel_mt_2, _channel_mt_3, _channel_mt_4, _channel_mt_5) = tokio::join!( + open_channel_with_retry( + node1_addr, + &node2_pubkey, + Some(node2_peer_port), + Some(CHANNEL_CAPACITY_SAT), + Some(PUSH_MSAT), + Some(ASSET_AMOUNT), + Some(&asset_id), + 20, + ), + open_channel_with_retry( + node1_addr, + &node3_pubkey, + Some(node3_peer_port), + Some(CHANNEL_CAPACITY_SAT), + Some(PUSH_MSAT), + Some(ASSET_AMOUNT), + Some(&asset_id), + 20, + ), + open_channel_with_retry( + node1_addr, + &node4_pubkey, + Some(node4_peer_port), + Some(CHANNEL_CAPACITY_SAT), + Some(PUSH_MSAT), + Some(ASSET_AMOUNT), + Some(&asset_id), + 20, + ), + open_channel_with_retry( + node1_addr, + &node5_pubkey, + Some(node5_peer_port), + Some(CHANNEL_CAPACITY_SAT), + Some(PUSH_MSAT), + Some(ASSET_AMOUNT), + Some(&asset_id), + 20, + ), + open_channel_with_retry( + node1_addr, + &node6_pubkey, + Some(node6_peer_port), + Some(CHANNEL_CAPACITY_SAT), + Some(PUSH_MSAT), + Some(ASSET_AMOUNT), + Some(&asset_id), + 20, + ) + ); + + let node1_usable_channels = node_info(node1_addr).await.num_usable_channels; + assert_eq!(node1_usable_channels, 5); +} diff --git a/src/test/mod.rs b/src/test/mod.rs index 5e7e5a8..7bb1d75 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -49,6 +49,8 @@ const NODE1_PEER_PORT: u16 = 9801; const NODE2_PEER_PORT: u16 = 9802; const NODE3_PEER_PORT: u16 = 9803; const NODE4_PEER_PORT: u16 = 9804; +const NODE5_PEER_PORT: u16 = 9805; +const NODE6_PEER_PORT: u16 = 9806; static INIT: Once = Once::new(); @@ -1053,7 +1055,55 @@ async fn open_channel( } #[allow(clippy::too_many_arguments)] -async fn open_channel_with_custom_data( +async fn open_channel_with_retry( + node_address: SocketAddr, + dest_peer_pubkey: &str, + dest_peer_port: Option, + capacity_sat: Option, + push_msat: Option, + asset_amount: Option, + asset_id: Option<&str>, + max_retries: u32, +) -> Channel { + let mut attempt = 0; + loop { + attempt += 1; + let result = open_channel_raw( + node_address, + dest_peer_pubkey, + dest_peer_port, + capacity_sat, + push_msat, + asset_amount, + asset_id, + None, + None, + None, + true, + ) + .await; + + match result { + Ok(channel) => return channel, + Err(status) if status == reqwest::StatusCode::FORBIDDEN && attempt < max_retries => { + println!( + "Channel opening in progress (attempt {}/{}), retrying in 5s...", + attempt, max_retries + ); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + Err(status) => { + panic!( + "Failed to open channel after {} attempts with status: {}", + attempt, status + ); + } + } + } +} + +#[allow(clippy::too_many_arguments)] +async fn open_channel_raw( node_address: SocketAddr, dest_peer_pubkey: &str, dest_peer_port: Option, @@ -1065,12 +1115,11 @@ async fn open_channel_with_custom_data( fee_proportional_millionths: Option, temporary_channel_id: Option<&str>, with_anchors: bool, -) -> Channel { +) -> Result { println!( "opening channel with {asset_amount:?} of asset {asset_id:?} from node {node_address} \ to {dest_peer_pubkey}" ); - stop_mining(); let blockcount = get_block_count(); let t_0 = OffsetDateTime::now_utc(); @@ -1108,11 +1157,13 @@ async fn open_channel_with_custom_data( .send() .await .unwrap(); - _check_response_is_ok(res) - .await - .json::() - .await - .unwrap(); + + let status = res.status(); + if !status.is_success() { + return Err(status); + } + + res.json::().await.unwrap(); let t_0 = OffsetDateTime::now_utc(); let mut channel_id = None; @@ -1129,7 +1180,7 @@ async fn open_channel_with_custom_data( if channel.funding_txid.is_some() { let txout = _get_txout(channel.funding_txid.as_ref().unwrap()); if !txout.is_empty() { - mine_n_blocks(true, 6); + mine_n_blocks(false, 6); channel_id = Some(channel.channel_id.clone()); channel_funded = true; continue; @@ -1151,7 +1202,7 @@ async fn open_channel_with_custom_data( .find(|c| c.channel_id == channel_id) .unwrap(); if channel.ready { - return channel.clone(); + return Ok(channel.clone()); } if (OffsetDateTime::now_utc() - t_0).as_seconds_f32() > 10.0 { panic!("channel is taking too long to be ready") @@ -1159,6 +1210,37 @@ async fn open_channel_with_custom_data( } } +#[allow(clippy::too_many_arguments)] +async fn open_channel_with_custom_data( + node_address: SocketAddr, + dest_peer_pubkey: &str, + dest_peer_port: Option, + capacity_sat: Option, + push_msat: Option, + asset_amount: Option, + asset_id: Option<&str>, + fee_base_msat: Option, + fee_proportional_millionths: Option, + temporary_channel_id: Option<&str>, + with_anchors: bool, +) -> Channel { + open_channel_raw( + node_address, + dest_peer_pubkey, + dest_peer_port, + capacity_sat, + push_msat, + asset_amount, + asset_id, + fee_base_msat, + fee_proportional_millionths, + temporary_channel_id, + with_anchors, + ) + .await + .expect("channel opening should succeed") +} + async fn post_asset_media(node_address: SocketAddr, file_path: &str) -> String { println!("posting asset media on node {node_address}"); let file_bytes = tokio::fs::read(file_path).await.unwrap(); @@ -1709,6 +1791,7 @@ mod close_force_nobtc_acceptor; mod close_force_other_side; mod close_force_standard; mod concurrent_btc_payments; +mod concurrent_openchannel; mod fail_transfers; mod getchannelid; mod htlc_amount_checks;