Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The minor version will be incremented upon a breaking change and the patch versi

### Fixes

- client: Fix deadlock when having multiple websocket listeners ([#4250](https://github.com/solana-foundation/anchor/pull/4250)).
- lang: Fix wrong generated error code in declare_program! ([#4129](https://github.com/solana-foundation/anchor/pull/4129)).
- idl: Fix defined types with unsupported fields not producing an error ([#4088](https://github.com/solana-foundation/anchor/pull/4088)).
- lang: Fix using non-instruction composite accounts multiple times with `declare_program!` ([#4113](https://github.com/solana-foundation/anchor/pull/4113)).
Expand Down
6 changes: 3 additions & 3 deletions client/src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use solana_signature::Signature;
use solana_signer::Signer;
use solana_transaction::Transaction;

use std::{marker::PhantomData, ops::Deref, sync::Arc};
use std::{marker::PhantomData, ops::Deref};
use tokio::{
runtime::{Builder, Handle},
sync::RwLock,
sync::OnceCell,
};

impl EventUnsubscriber<'_> {
Expand Down Expand Up @@ -43,7 +43,7 @@ impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
Ok(Self {
program_id,
cfg,
sub_client: Arc::new(RwLock::new(None)),
sub_client: OnceCell::new(),
internal_rpc_client: rpc_client,
rt,
})
Expand Down
80 changes: 34 additions & 46 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,10 @@ use std::pin::Pin;
use std::sync::Arc;
use std::vec::IntoIter;
use thiserror::Error;
use tokio::sync::OnceCell;
use tokio::{
runtime::Handle,
sync::{
mpsc::{unbounded_channel, UnboundedReceiver},
RwLock,
},
sync::mpsc::{unbounded_channel, UnboundedReceiver},
task::JoinHandle,
};

Expand Down Expand Up @@ -230,7 +228,7 @@ impl EventUnsubscriber<'_> {
pub struct Program<C> {
program_id: Pubkey,
cfg: Config<C>,
sub_client: Arc<RwLock<Option<PubsubClient>>>,
sub_client: OnceCell<Arc<PubsubClient>>,
#[cfg(not(feature = "async"))]
rt: tokio::runtime::Runtime,
internal_rpc_client: AsyncRpcClient,
Expand Down Expand Up @@ -297,20 +295,6 @@ impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
})
}

async fn init_sub_client_if_needed(&self) -> Result<(), ClientError> {
let lock = &self.sub_client;
let mut client = lock.write().await;

if client.is_none() {
let sub_client = PubsubClient::new(self.cfg.cluster.ws_url())
.await
.map_err(Box::new)?;
*client = Some(sub_client);
}

Ok(())
}

async fn on_internal<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
&self,
f: impl Fn(&EventContext, T) + Send + 'static,
Expand All @@ -321,41 +305,45 @@ impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
),
ClientError,
> {
self.init_sub_client_if_needed().await?;
let client = self
.sub_client
.get_or_try_init(|| async {
PubsubClient::new(self.cfg.cluster.ws_url())
.await
.map(Arc::new)
.map_err(|e| ClientError::SolanaClientPubsubError(Box::new(e)))
})
.await?
.clone();

let (tx, rx) = unbounded_channel::<_>();
let config = RpcTransactionLogsConfig {
commitment: self.cfg.options,
};
let program_id_str = self.program_id.to_string();
let filter = RpcTransactionLogsFilter::Mentions(vec![program_id_str.clone()]);

let lock = Arc::clone(&self.sub_client);

let handle = tokio::spawn(async move {
if let Some(ref client) = *lock.read().await {
let (mut notifications, unsubscribe) = client
.logs_subscribe(filter, config)
.await
.map_err(Box::new)?;

tx.send(unsubscribe).map_err(|e| {
ClientError::SolanaClientPubsubError(Box::new(
PubsubClientError::RequestFailed {
message: "Unsubscribe failed".to_string(),
reason: e.to_string(),
},
))
})?;

while let Some(logs) = notifications.next().await {
let ctx = EventContext {
signature: logs.value.signature.parse().unwrap(),
slot: logs.context.slot,
};
let events = parse_logs_response(logs, &program_id_str)?;
for e in events {
f(&ctx, e);
}
let (mut notifications, unsubscribe) = client
.logs_subscribe(filter, config)
.await
.map_err(Box::new)?;

tx.send(unsubscribe).map_err(|e| {
ClientError::SolanaClientPubsubError(Box::new(PubsubClientError::RequestFailed {
message: "Unsubscribe failed".to_string(),
reason: e.to_string(),
}))
})?;

while let Some(logs) = notifications.next().await {
let ctx = EventContext {
signature: logs.value.signature.parse().unwrap(),
slot: logs.context.slot,
};
let events = parse_logs_response(logs, &program_id_str)?;
for e in events {
f(&ctx, e);
}
}
Ok::<(), ClientError>(())
Expand Down
4 changes: 2 additions & 2 deletions client/src/nonblocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use solana_signature::Signature;
use solana_signer::Signer;
use solana_transaction::Transaction;
use std::{marker::PhantomData, ops::Deref, sync::Arc};
use tokio::sync::RwLock;
use tokio::sync::OnceCell;

impl<'a> EventUnsubscriber<'a> {
/// Unsubscribe gracefully.
Expand Down Expand Up @@ -51,7 +51,7 @@ impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
Ok(Self {
program_id,
cfg,
sub_client: Arc::new(RwLock::new(None)),
sub_client: OnceCell::new(),
internal_rpc_client: rpc_client,
})
}
Expand Down