diff --git a/CHANGELOG.md b/CHANGELOG.md index 0261196e69..34030bf5c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ The minor version will be incremented upon a breaking change and the patch versi ### Fixes +- client: Fix deadlock when having multiple websocket listeners ([#4250](https://github.com/solana-foundation/anchor/pull/4250)). - lang: Fix wrong generated error code in declare_program! ([#4129](https://github.com/solana-foundation/anchor/pull/4129)). - idl: Fix defined types with unsupported fields not producing an error ([#4088](https://github.com/solana-foundation/anchor/pull/4088)). - lang: Fix using non-instruction composite accounts multiple times with `declare_program!` ([#4113](https://github.com/solana-foundation/anchor/pull/4113)). diff --git a/client/src/blocking.rs b/client/src/blocking.rs index 24236a2719..b9ff3c64d3 100644 --- a/client/src/blocking.rs +++ b/client/src/blocking.rs @@ -12,10 +12,10 @@ use solana_signature::Signature; use solana_signer::Signer; use solana_transaction::Transaction; -use std::{marker::PhantomData, ops::Deref, sync::Arc}; +use std::{marker::PhantomData, ops::Deref}; use tokio::{ runtime::{Builder, Handle}, - sync::RwLock, + sync::OnceCell, }; impl EventUnsubscriber<'_> { @@ -43,7 +43,7 @@ impl + Clone> Program { Ok(Self { program_id, cfg, - sub_client: Arc::new(RwLock::new(None)), + sub_client: OnceCell::new(), internal_rpc_client: rpc_client, rt, }) diff --git a/client/src/lib.rs b/client/src/lib.rs index ebcf72e4ff..fe64f66fd1 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -99,12 +99,10 @@ use std::pin::Pin; use std::sync::Arc; use std::vec::IntoIter; use thiserror::Error; +use tokio::sync::OnceCell; use tokio::{ runtime::Handle, - sync::{ - mpsc::{unbounded_channel, UnboundedReceiver}, - RwLock, - }, + sync::mpsc::{unbounded_channel, UnboundedReceiver}, task::JoinHandle, }; @@ -230,7 +228,7 @@ impl EventUnsubscriber<'_> { pub struct Program { program_id: Pubkey, cfg: Config, - sub_client: Arc>>, + sub_client: OnceCell>, #[cfg(not(feature = "async"))] rt: tokio::runtime::Runtime, internal_rpc_client: AsyncRpcClient, @@ -297,20 +295,6 @@ impl + Clone> Program { }) } - async fn init_sub_client_if_needed(&self) -> Result<(), ClientError> { - let lock = &self.sub_client; - let mut client = lock.write().await; - - if client.is_none() { - let sub_client = PubsubClient::new(self.cfg.cluster.ws_url()) - .await - .map_err(Box::new)?; - *client = Some(sub_client); - } - - Ok(()) - } - async fn on_internal( &self, f: impl Fn(&EventContext, T) + Send + 'static, @@ -321,7 +305,17 @@ impl + Clone> Program { ), ClientError, > { - self.init_sub_client_if_needed().await?; + let client = self + .sub_client + .get_or_try_init(|| async { + PubsubClient::new(self.cfg.cluster.ws_url()) + .await + .map(Arc::new) + .map_err(|e| ClientError::SolanaClientPubsubError(Box::new(e))) + }) + .await? + .clone(); + let (tx, rx) = unbounded_channel::<_>(); let config = RpcTransactionLogsConfig { commitment: self.cfg.options, @@ -329,33 +323,27 @@ impl + Clone> Program { let program_id_str = self.program_id.to_string(); let filter = RpcTransactionLogsFilter::Mentions(vec![program_id_str.clone()]); - let lock = Arc::clone(&self.sub_client); - let handle = tokio::spawn(async move { - if let Some(ref client) = *lock.read().await { - let (mut notifications, unsubscribe) = client - .logs_subscribe(filter, config) - .await - .map_err(Box::new)?; - - tx.send(unsubscribe).map_err(|e| { - ClientError::SolanaClientPubsubError(Box::new( - PubsubClientError::RequestFailed { - message: "Unsubscribe failed".to_string(), - reason: e.to_string(), - }, - )) - })?; - - while let Some(logs) = notifications.next().await { - let ctx = EventContext { - signature: logs.value.signature.parse().unwrap(), - slot: logs.context.slot, - }; - let events = parse_logs_response(logs, &program_id_str)?; - for e in events { - f(&ctx, e); - } + let (mut notifications, unsubscribe) = client + .logs_subscribe(filter, config) + .await + .map_err(Box::new)?; + + tx.send(unsubscribe).map_err(|e| { + ClientError::SolanaClientPubsubError(Box::new(PubsubClientError::RequestFailed { + message: "Unsubscribe failed".to_string(), + reason: e.to_string(), + })) + })?; + + while let Some(logs) = notifications.next().await { + let ctx = EventContext { + signature: logs.value.signature.parse().unwrap(), + slot: logs.context.slot, + }; + let events = parse_logs_response(logs, &program_id_str)?; + for e in events { + f(&ctx, e); } } Ok::<(), ClientError>(()) diff --git a/client/src/nonblocking.rs b/client/src/nonblocking.rs index 14ef47b562..32580f2335 100644 --- a/client/src/nonblocking.rs +++ b/client/src/nonblocking.rs @@ -10,7 +10,7 @@ use solana_signature::Signature; use solana_signer::Signer; use solana_transaction::Transaction; use std::{marker::PhantomData, ops::Deref, sync::Arc}; -use tokio::sync::RwLock; +use tokio::sync::OnceCell; impl<'a> EventUnsubscriber<'a> { /// Unsubscribe gracefully. @@ -51,7 +51,7 @@ impl + Clone> Program { Ok(Self { program_id, cfg, - sub_client: Arc::new(RwLock::new(None)), + sub_client: OnceCell::new(), internal_rpc_client: rpc_client, }) }