From 144e1f920f4f5f662a290d4cb22302ebef4c55a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 29 Jul 2025 13:16:32 +0200 Subject: [PATCH 01/18] session: avoid `unwrap()`s in favour of let-else This is a refactor to avoid using `unwrap()` on the `connected` field of the `SessionGuard`. Instead, it uses the `let-else` syntax to handle the case where the session is not connected in a more idiomatic way. Note that no `unwrap()` could really panic here, as the `connected` field was always checked before, but this change makes the code safer and more readable. --- scylla-rust-wrapper/src/session.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/scylla-rust-wrapper/src/session.rs b/scylla-rust-wrapper/src/session.rs index 173a187b..b308b02e 100644 --- a/scylla-rust-wrapper/src/session.rs +++ b/scylla-rust-wrapper/src/session.rs @@ -280,17 +280,16 @@ pub unsafe extern "C" fn cass_session_execute_batch( let batch_exec_profile = batch_from_raw.exec_profile.clone(); let future = async move { - if session_guard.connected.is_none() { + let Some(connected_session) = session_guard.connected.as_ref() else { return Err(( CassError::CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, "Session is not connected".msg(), )); - } + }; - let cass_connected_session = session_guard.connected.as_ref().unwrap(); - let session = &cass_connected_session.session; + let session = &connected_session.session; - let handle = cass_connected_session + let handle = connected_session .get_or_resolve_profile_handle(batch_exec_profile.as_ref()) .await?; @@ -512,14 +511,15 @@ pub unsafe extern "C" fn cass_session_prepare_from_existing( } }; - if session_guard.connected.is_none() { + let Some(connected_session) = session_guard.connected.as_ref() else { return Err(( CassError::CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, "Session is not connected".msg(), )); - } - let session = &session_guard.connected.as_ref().unwrap().session; - let prepared = session + }; + + let prepared = connected_session + .session .prepare(query.query.clone()) .await .map_err(|err| (err.to_cass_error(), err.msg()))?; @@ -569,15 +569,15 @@ pub unsafe extern "C" fn cass_session_prepare_n( let query = Statement::new(query_str.to_string()); let fut = async move { - if session_guard.connected.is_none() { + let Some(connected_session) = session_guard.connected.as_ref() else { return Err(( CassError::CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, "Session is not connected".msg(), )); - } - let session = &session_guard.connected.as_ref().unwrap().session; + }; - let prepared = session + let prepared = connected_session + .session .prepare(query) .await .map_err(|err| (err.to_cass_error(), err.msg()))?; From cb79e8c16a81e82374a7b31ec1773a65f8f8e602 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Thu, 31 Jul 2025 11:34:31 +0200 Subject: [PATCH 02/18] future: put err_string under OnceLock The `err_string` field in `CassFuture` serves as an allocation cache for the error message string, which is generated only when needed and must have a lifetime of `CassFuture`, as required by the `cass_future_error_message` function. This field was previously an `Option` and was held under a `Mutex` inside `CassFutureState`, but as its semantics fit `OnceLock` perfectlym it has been changed accordingly and is now a `OnceLock`. As a pleasant result, `with_waited_result` can now be used instead of `with_waited_state`, which simplifies the code and avoids `unwrap`. --- scylla-rust-wrapper/src/future.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/scylla-rust-wrapper/src/future.rs b/scylla-rust-wrapper/src/future.rs index 7034476c..9724499c 100644 --- a/scylla-rust-wrapper/src/future.rs +++ b/scylla-rust-wrapper/src/future.rs @@ -49,7 +49,6 @@ impl BoundCallback { #[derive(Default)] struct CassFutureState { - err_string: Option, callback: Option, join_handle: Option>, } @@ -57,6 +56,7 @@ struct CassFutureState { pub struct CassFuture { state: Mutex, result: OnceLock, + err_string: OnceLock, wait_for_value: Condvar, #[cfg(cpp_integration_testing)] recording_listener: Option>, @@ -104,6 +104,7 @@ impl CassFuture { let cass_fut = Arc::new(CassFuture { state: Mutex::new(Default::default()), result: OnceLock::new(), + err_string: OnceLock::new(), wait_for_value: Condvar::new(), #[cfg(cpp_integration_testing)] recording_listener, @@ -139,6 +140,7 @@ impl CassFuture { Arc::new(CassFuture { state: Mutex::new(CassFutureState::default()), result: OnceLock::from(r), + err_string: OnceLock::new(), wait_for_value: Condvar::new(), #[cfg(cpp_integration_testing)] recording_listener: None, @@ -434,15 +436,12 @@ pub unsafe extern "C" fn cass_future_error_message( return; }; - future.with_waited_state(|state: &mut CassFutureState| { - let value = future.result.get(); - let msg = state - .err_string - .get_or_insert_with(|| match value.as_ref().unwrap() { - Ok(CassResultValue::QueryError(err)) => err.msg(), - Err((_, s)) => s.msg(), - _ => "".to_string(), - }); + future.with_waited_result(|result: &CassFutureResult| { + let msg = future.err_string.get_or_init(|| match result { + Ok(CassResultValue::QueryError(err)) => err.msg(), + Err((_, s)) => s.msg(), + _ => "".to_string(), + }); unsafe { write_str_to_c(msg.as_str(), message, message_length) }; }); } From 722952e21e7c75e47b81a650b5ba708f40ccd86c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Thu, 31 Jul 2025 11:49:41 +0200 Subject: [PATCH 03/18] future: `cass_future_ready` tiny refactor --- scylla-rust-wrapper/src/future.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/scylla-rust-wrapper/src/future.rs b/scylla-rust-wrapper/src/future.rs index 9724499c..46f198b9 100644 --- a/scylla-rust-wrapper/src/future.rs +++ b/scylla-rust-wrapper/src/future.rs @@ -403,10 +403,7 @@ pub unsafe extern "C" fn cass_future_ready( return cass_false; }; - match future.result.get() { - None => cass_false, - Some(_) => cass_true, - } + future.result.get().is_some() as cass_bool_t } #[unsafe(no_mangle)] From 8985823fc546bf88c456dd07a4f1834d580d7c0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Thu, 31 Jul 2025 11:56:45 +0200 Subject: [PATCH 04/18] future: ditch closure-based awaiting There has long existed an API for awaiting futures that takes a closure and invokes it with either the future state or the future result once the future is ready. This API is not really ergonomic, but it was a must as long as we were holding the future result inside the future state, which is held behind a mutex. As the result could be accessible only while the state's `MutexGuard` is alive, we could not return references to the result outside of the closure. Now that we have the future result stored in an `OnceLock`, we can return references to it, which makes the API much more ergonomic. The future state is still held behind a mutex, so it's still not possible to return a reference to the future state outside of the closure, but after both the future result and the cached error string were moved to `OnceLock`s out of the future state, we can remove the closure-based future state awaiting API altogether, and make the future result awaiting API reference-based instead of a closure-based one. Perfect! Clean! Nice! Refactors induced by this change in `waited_result(_timed)` has showed that the logic there is a bit convoluted, so I added FIXMEs there. I'm going to address them in further commits. --- scylla-rust-wrapper/src/future.rs | 111 ++++++++---------- .../src/integration_testing.rs | 4 +- scylla-rust-wrapper/src/session.rs | 2 +- 3 files changed, 50 insertions(+), 67 deletions(-) diff --git a/scylla-rust-wrapper/src/future.rs b/scylla-rust-wrapper/src/future.rs index 46f198b9..094efb8f 100644 --- a/scylla-rust-wrapper/src/future.rs +++ b/scylla-rust-wrapper/src/future.rs @@ -147,17 +147,7 @@ impl CassFuture { }) } - pub(crate) fn with_waited_result<'s, T>( - &'s self, - f: impl FnOnce(&'s CassFutureResult) -> T, - ) -> T - where - T: 's, - { - self.with_waited_state(|_| f(self.result.get().unwrap())) - } - - /// Awaits the future until completion. + /// Awaits the future until completion and exposes the result. /// /// There are three possible cases: /// - result is already available -> we can return. @@ -170,21 +160,20 @@ impl CassFuture { /// - JoinHandle is consumed -> some other thread already resolved the future. /// We can return. /// - JoinHandle is Some -> some other thread was working on the future, but - /// timed out (see [CassFuture::with_waited_state_timed]). We need to + /// timed out (see [CassFuture::waited_result_timed]). We need to /// take the ownership of the handle, and complete the work. - fn with_waited_state(&self, f: impl FnOnce(&mut CassFutureState) -> T) -> T { + pub(crate) fn waited_result(&self) -> &CassFutureResult { let mut guard = self.state.lock().unwrap(); loop { - if self.result.get().is_some() { + if let Some(result) = self.result.get() { // The result is already available, we can return it. - return f(&mut guard); + return result; } let handle = guard.join_handle.take(); if let Some(handle) = handle { mem::drop(guard); // unwrap: JoinError appears only when future either panic'ed or canceled. RUNTIME.block_on(handle).unwrap(); - guard = self.state.lock().unwrap(); } else { guard = self .wait_for_value @@ -202,19 +191,12 @@ impl CassFuture { continue; } } - return f(&mut guard); + return self.result.get().unwrap(); // FIXME: refactor this to avoid unwrap } } - fn with_waited_result_timed( - &self, - f: impl FnOnce(&CassFutureResult) -> T, - timeout_duration: Duration, - ) -> Result { - self.with_waited_state_timed(|_| f(self.result.get().unwrap()), timeout_duration) - } - - /// Tries to await the future with a given timeout. + /// Tries to await the future with a given timeout and exposes the result, + /// if it is available. /// /// There are three possible cases: /// - result is already available -> we can return. @@ -229,22 +211,21 @@ impl CassFuture { /// - JoinHandle is consumed -> some other thread already resolved the future. /// We can return. /// - JoinHandle is Some -> some other thread was working on the future, but - /// timed out (see [CassFuture::with_waited_state_timed]). We need to + /// timed out (see [CassFuture::waited_result_timed]). We need to /// take the ownership of the handle, and continue the work. - fn with_waited_state_timed( + fn waited_result_timed( &self, - f: impl FnOnce(&mut CassFutureState) -> T, timeout_duration: Duration, - ) -> Result { + ) -> Result<&CassFutureResult, FutureError> { let mut guard = self.state.lock().unwrap(); let deadline = tokio::time::Instant::now() .checked_add(timeout_duration) .ok_or(FutureError::InvalidDuration)?; loop { - if self.result.get().is_some() { + if let Some(result) = self.result.get() { // The result is already available, we can return it. - return Ok(f(&mut guard)); + return Ok(result); } let handle = guard.join_handle.take(); if let Some(handle) = handle { @@ -279,7 +260,6 @@ impl CassFuture { // unwrap: JoinError appears only when future either panic'ed or canceled. Ok(result) => result.unwrap(), }; - guard = self.state.lock().unwrap(); } else { let remaining_timeout = deadline.duration_since(tokio::time::Instant::now()); let (guard_result, timeout_result) = self @@ -303,7 +283,7 @@ impl CassFuture { } } - return Ok(f(&mut guard)); + return Ok(self.result.get().unwrap()); // FIXME: refactor this to avoid unwrap } } @@ -376,7 +356,7 @@ pub unsafe extern "C" fn cass_future_wait(future_raw: CassBorrowedSharedPtr err.to_cass_error(), Err((err, _)) => *err, _ => CassError::CASS_OK, - }) + } } #[unsafe(no_mangle)] @@ -433,14 +413,14 @@ pub unsafe extern "C" fn cass_future_error_message( return; }; - future.with_waited_result(|result: &CassFutureResult| { - let msg = future.err_string.get_or_init(|| match result { - Ok(CassResultValue::QueryError(err)) => err.msg(), - Err((_, s)) => s.msg(), - _ => "".to_string(), - }); - unsafe { write_str_to_c(msg.as_str(), message, message_length) }; + let value = future.waited_result(); + let msg = future.err_string.get_or_init(|| match value { + Ok(CassResultValue::QueryError(err)) => err.msg(), + Err((_, s)) => s.msg(), + _ => "".to_string(), }); + + unsafe { write_str_to_c(msg.as_str(), message, message_length) }; } #[unsafe(no_mangle)] @@ -458,11 +438,12 @@ pub unsafe extern "C" fn cass_future_get_result( }; future - .with_waited_result(|r: &CassFutureResult| -> Option> { - match r.as_ref().ok()? { - CassResultValue::QueryResult(qr) => Some(Arc::clone(qr)), - _ => None, - } + .waited_result() + .as_ref() + .ok() + .and_then(|r| match r { + CassResultValue::QueryResult(qr) => Some(Arc::clone(qr)), + _ => None, }) .map_or(ArcFFI::null(), ArcFFI::into_ptr) } @@ -477,11 +458,12 @@ pub unsafe extern "C" fn cass_future_get_error_result( }; future - .with_waited_result(|r: &CassFutureResult| -> Option> { - match r.as_ref().ok()? { - CassResultValue::QueryError(qr) => Some(Arc::clone(qr)), - _ => None, - } + .waited_result() + .as_ref() + .ok() + .and_then(|r| match r { + CassResultValue::QueryError(qr) => Some(Arc::clone(qr)), + _ => None, }) .map_or(ArcFFI::null(), ArcFFI::into_ptr) } @@ -496,11 +478,12 @@ pub unsafe extern "C" fn cass_future_get_prepared( }; future - .with_waited_result(|r: &CassFutureResult| -> Option> { - match r.as_ref().ok()? { - CassResultValue::Prepared(p) => Some(Arc::clone(p)), - _ => None, - } + .waited_result() + .as_ref() + .ok() + .and_then(|r| match r { + CassResultValue::Prepared(p) => Some(Arc::clone(p)), + _ => None, }) .map_or(ArcFFI::null(), ArcFFI::into_ptr) } @@ -515,7 +498,7 @@ pub unsafe extern "C" fn cass_future_tracing_id( return CassError::CASS_ERROR_LIB_BAD_PARAMS; }; - future.with_waited_result(|r: &CassFutureResult| match r { + match future.waited_result() { Ok(CassResultValue::QueryResult(result)) => match result.tracing_id { Some(id) => { unsafe { *tracing_id = CassUuid::from(id) }; @@ -524,7 +507,7 @@ pub unsafe extern "C" fn cass_future_tracing_id( None => CassError::CASS_ERROR_LIB_NO_TRACING_ID, }, _ => CassError::CASS_ERROR_LIB_INVALID_FUTURE_TYPE, - }) + } } #[unsafe(no_mangle)] @@ -536,13 +519,13 @@ pub unsafe extern "C" fn cass_future_coordinator( return RefFFI::null(); }; - future.with_waited_result(|r| match r { + match future.waited_result() { Ok(CassResultValue::QueryResult(result)) => { // unwrap: Coordinator is `None` only for tests. RefFFI::as_ptr(result.coordinator.as_ref().unwrap()) } _ => RefFFI::null(), - }) + } } #[cfg(test)] diff --git a/scylla-rust-wrapper/src/integration_testing.rs b/scylla-rust-wrapper/src/integration_testing.rs index 0f76af0d..768af496 100644 --- a/scylla-rust-wrapper/src/integration_testing.rs +++ b/scylla-rust-wrapper/src/integration_testing.rs @@ -80,7 +80,7 @@ pub unsafe extern "C" fn testing_future_get_host( return; }; - future.with_waited_result(|r| match r { + match future.waited_result() { Ok(CassResultValue::QueryResult(result)) => { // unwrap: Coordinator is none only for unit tests. let coordinator = result.coordinator.as_ref().unwrap(); @@ -101,7 +101,7 @@ pub unsafe extern "C" fn testing_future_get_host( *host = std::ptr::null_mut(); *host_length = 0; }, - }) + } } #[unsafe(no_mangle)] diff --git a/scylla-rust-wrapper/src/session.rs b/scylla-rust-wrapper/src/session.rs index b308b02e..541110d4 100644 --- a/scylla-rust-wrapper/src/session.rs +++ b/scylla-rust-wrapper/src/session.rs @@ -602,7 +602,7 @@ pub unsafe extern "C" fn cass_session_free(session_raw: CassOwnedSharedPtr Date: Thu, 31 Jul 2025 12:43:55 +0200 Subject: [PATCH 05/18] future: waiting logic readability improvements `CassFuture::waited_result(_timed)` methods have been documented better and refactored to improve readability. **Logic has NOT been changed.** Code changes & rationale: The previous logic was a bit convoluted: it `unwrap`ped the result immediately after both branches of the `if` related to `join_handle` merged back, which made it hard to understand the flow of the code. Now, the logic is clearer: the result is `unwrap`ped in the branch where we are sure that the future is resolved, i.e. the `join_handle` had been `Some` and we have just completed the future. The other branch, where we wait for the future to be completed, is now clearer as well: once we wake up, there are two cases (quoting the new comments): 1. The result is already available, so we can return it. 2. The `join_handle` was `None`, and now it's `Some` - some other thread must have timed out and returned the handle. We need to take over the work of completing the future, because the result is still not available and we may be using `current_thread` tokio executor, in which case no one else will complete the future, so it's our responsibility. In the next iteration we will land in the branch with `block_on` and complete the future. In both cases we just continue the loop, so the code is simpler and easier to read. --- scylla-rust-wrapper/src/future.rs | 60 +++++++++++++++++++++---------- 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/scylla-rust-wrapper/src/future.rs b/scylla-rust-wrapper/src/future.rs index 094efb8f..cebef2d5 100644 --- a/scylla-rust-wrapper/src/future.rs +++ b/scylla-rust-wrapper/src/future.rs @@ -171,27 +171,37 @@ impl CassFuture { } let handle = guard.join_handle.take(); if let Some(handle) = handle { + // No one else has taken the handle, so we are responsible for completing + // the future. mem::drop(guard); // unwrap: JoinError appears only when future either panic'ed or canceled. RUNTIME.block_on(handle).unwrap(); + + // Once we are here, the future is resolved. + // The result is guaranteed to be set. + return self.result.get().unwrap(); } else { + // Someone has taken the handle, so we need to wait for them to complete + // the future. Once they finish or timeout, we will be notified. guard = self .wait_for_value .wait_while(guard, |state| { + // There are two cases when we should wake up: + // 1. The result is already available, so we can return it. + // In this case, we will see it available in the next iteration + // of the loop, so we will return it. + // 2. `join_handle` was None, and now it's Some - some other thread must + // have timed out and returned the handle. We need to take over the work + // of completing the future, because the result is still not available + // and we may be using `current_thread` tokio executor, in which case + // no one else will complete the future, so it's our responsibility. + // In the next iteration we will land in the branch with `block_on` + // and complete the future. self.result.get().is_none() && state.join_handle.is_none() }) // unwrap: Error appears only when mutex is poisoned. .unwrap(); - if self.result.get().is_none() && guard.join_handle.is_some() { - // join_handle was none, and now it isn't - some other thread must - // have timed out and returned the handle. We need to take over - // the work of completing the future, because the result is still not available. - // To do that, we go into another iteration so that we land in the branch - // with `block_on`. - continue; - } } - return self.result.get().unwrap(); // FIXME: refactor this to avoid unwrap } } @@ -229,6 +239,8 @@ impl CassFuture { } let handle = guard.join_handle.take(); if let Some(handle) = handle { + // No one else has taken the handle, so we are responsible for completing + // the future. mem::drop(guard); // Need to wrap it with async{} block, so the timeout is lazily executed inside the runtime. // See mention about panics: https://docs.rs/tokio/latest/tokio/time/fn.timeout.html. @@ -258,13 +270,32 @@ impl CassFuture { return Err(FutureError::TimeoutError); } // unwrap: JoinError appears only when future either panic'ed or canceled. - Ok(result) => result.unwrap(), + Ok(result) => { + result.unwrap(); + + // Once we are here, the future is resolved. + // The result is guaranteed to be set. + return Ok(self.result.get().unwrap()); + } }; } else { + // Someone has taken the handle, so we need to wait for them to complete + // the future. Once they finish or timeout, we will be notified. let remaining_timeout = deadline.duration_since(tokio::time::Instant::now()); let (guard_result, timeout_result) = self .wait_for_value .wait_timeout_while(guard, remaining_timeout, |state| { + // There are two cases when we should wake up: + // 1. The result is already available, so we can return it. + // In this case, we will see it available in the next iteration + // of the loop, so we will return it. + // 2. `join_handle` was None, and now it's Some - some other thread must + // have timed out and returned the handle. We need to take over the work + // of completing the future, because the result is still not available + // and we may be using `current_thread` tokio executor, in which case + // no one else will complete the future, so it's our responsibility. + // In the next iteration we will land in the branch with `block_on` + // and attempt to complete the future. self.result.get().is_none() && state.join_handle.is_none() }) // unwrap: Error appears only when mutex is poisoned. @@ -274,16 +305,7 @@ impl CassFuture { } guard = guard_result; - if self.result.get().is_none() && guard.join_handle.is_some() { - // join_handle was none, and now it isn't - some other thread must - // have timed out and returned the handle. We need to take over - // the work of completing the future. To do that, we go into - // another iteration so that we land in the branch with block_on. - continue; - } } - - return Ok(self.result.get().unwrap()); // FIXME: refactor this to avoid unwrap } } From d0a3b558a1d7837db9ba78d5d8501cc1714f8f07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Thu, 31 Jul 2025 12:50:31 +0200 Subject: [PATCH 06/18] future: optimize happy path when getting result When getting the result of a future, we can avoid locking the Mutex if the result is already available. This improves performance when the result is already available, which is a common case. --- scylla-rust-wrapper/src/future.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/scylla-rust-wrapper/src/future.rs b/scylla-rust-wrapper/src/future.rs index cebef2d5..ba873498 100644 --- a/scylla-rust-wrapper/src/future.rs +++ b/scylla-rust-wrapper/src/future.rs @@ -163,6 +163,12 @@ impl CassFuture { /// timed out (see [CassFuture::waited_result_timed]). We need to /// take the ownership of the handle, and complete the work. pub(crate) fn waited_result(&self) -> &CassFutureResult { + // Happy path: if the result is already available, we can return it + // without locking the Mutex. + if let Some(result) = self.result.get() { + return result; + } + let mut guard = self.state.lock().unwrap(); loop { if let Some(result) = self.result.get() { @@ -227,6 +233,12 @@ impl CassFuture { &self, timeout_duration: Duration, ) -> Result<&CassFutureResult, FutureError> { + // Happy path: if the result is already available, we can return it + // without locking the Mutex. + if let Some(result) = self.result.get() { + return Ok(result); + } + let mut guard = self.state.lock().unwrap(); let deadline = tokio::time::Instant::now() .checked_add(timeout_duration) From 40d3302f2f885cace1178c2aef17d8600cbfe00b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Thu, 31 Jul 2025 13:00:53 +0200 Subject: [PATCH 07/18] future: document fields --- scylla-rust-wrapper/src/future.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/scylla-rust-wrapper/src/future.rs b/scylla-rust-wrapper/src/future.rs index ba873498..7b3e5518 100644 --- a/scylla-rust-wrapper/src/future.rs +++ b/scylla-rust-wrapper/src/future.rs @@ -49,15 +49,28 @@ impl BoundCallback { #[derive(Default)] struct CassFutureState { + /// Optional callback to be executed when the future is resolved. callback: Option, + + /// Join handle of the tokio task that resolves the future. join_handle: Option>, } pub struct CassFuture { + /// Mutable state of the future that requires synchronized exclusive access + /// in order to ensure thread safety of the future execution. state: Mutex, + + /// Result of the future once it is resolved. result: OnceLock, + + /// Required as a place to allocate the stringified error message. + /// This is needed to support `cass_future_error_message`. err_string: OnceLock, + + /// Used to notify threads waiting for the future's result. wait_for_value: Condvar, + #[cfg(cpp_integration_testing)] recording_listener: Option>, } From 7feba78e494a8786c53f1501a7a275c252d3544c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Mon, 4 Aug 2025 13:15:32 +0200 Subject: [PATCH 08/18] future: fix setting callback to be at-most-once CPP Driver has the semantics that a callback can be set only once for the whole lifetime of a future. This commit fixes the Rust wrapper to match this behavior. This is guaranteed by unconditionally setting the callback field in the `set_callback` method, and by leaving the callback field as `Some` after the callback is invoked in both cases: 1. when the future is already resolved, so the callback is invoked synchronously by the thread that sets the callback; 2. when the future is not resolved yet, so the callback is invoked asynchronously by the thread that resolves the future once it's resolved. --- scylla-rust-wrapper/src/future.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/scylla-rust-wrapper/src/future.rs b/scylla-rust-wrapper/src/future.rs index 7b3e5518..80b91e86 100644 --- a/scylla-rust-wrapper/src/future.rs +++ b/scylla-rust-wrapper/src/future.rs @@ -30,6 +30,7 @@ pub type CassFutureCallback = Option; type NonNullFutureCallback = unsafe extern "C" fn(future: CassBorrowedSharedPtr, data: *mut c_void); +#[derive(Clone, Copy)] struct BoundCallback { cb: NonNullFutureCallback, data: *mut c_void, @@ -126,13 +127,16 @@ impl CassFuture { let join_handle = RUNTIME.spawn(async move { let r = fut.await; let maybe_cb = { - let mut guard = cass_fut_clone.state.lock().unwrap(); + let guard = cass_fut_clone.state.lock().unwrap(); cass_fut_clone .result .set(r) .expect("Tried to resolve future result twice!"); - // Take the callback and call it after releasing the lock - guard.callback.take() + + // Get the callback and call it after releasing the lock. + // Do not take the callback out, as it prevents other callbacks + // from being set afterwards, which is needed to match CPP Driver's semantics. + guard.callback }; if let Some(bound_cb) = maybe_cb { let fut_ptr = ArcFFI::as_ptr::(&cass_fut_clone); @@ -346,14 +350,17 @@ impl CassFuture { return CassError::CASS_ERROR_LIB_CALLBACK_ALREADY_SET; } let bound_cb = BoundCallback { cb, data }; + + // Store the callback, so that no other callback can be set from now on. + // Rationale: only one callback can be set for the whole lifetime of a future. + lock.callback = Some(bound_cb); + if self.result.get().is_some() { // The value is already available, we need to call the callback ourselves mem::drop(lock); bound_cb.invoke(self_ptr); return CassError::CASS_OK; } - // Store the callback - lock.callback = Some(bound_cb); CassError::CASS_OK } From 71b6faca34c11e3a6a87e6f936b235ce106ae77f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Mon, 4 Aug 2025 13:26:59 +0200 Subject: [PATCH 09/18] future: use scoped locking in `set_callback()` Scoped locking is in general more readable and less deadlock-prone than using `mem::drop()` to release the lock early. This commit refactors the `set_callback()` method to use scoped locking instead of manually dropping the lock. --- scylla-rust-wrapper/src/future.rs | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/scylla-rust-wrapper/src/future.rs b/scylla-rust-wrapper/src/future.rs index 80b91e86..2a9fce3e 100644 --- a/scylla-rust-wrapper/src/future.rs +++ b/scylla-rust-wrapper/src/future.rs @@ -344,20 +344,25 @@ impl CassFuture { cb: NonNullFutureCallback, data: *mut c_void, ) -> CassError { - let mut lock = self.state.lock().unwrap(); - if lock.callback.is_some() { - // Another callback has been already set - return CassError::CASS_ERROR_LIB_CALLBACK_ALREADY_SET; - } let bound_cb = BoundCallback { cb, data }; - // Store the callback, so that no other callback can be set from now on. - // Rationale: only one callback can be set for the whole lifetime of a future. - lock.callback = Some(bound_cb); + // Check if the callback is already set (in such case we must error out). + // If it is not set, we store the callback in the state, so that no different + // callback can be set. + { + let mut lock = self.state.lock().unwrap(); + if lock.callback.is_some() { + // Another callback has been already set + return CassError::CASS_ERROR_LIB_CALLBACK_ALREADY_SET; + } + + // Store the callback, so that no other callback can be set from now on. + // Rationale: only one callback can be set for the whole lifetime of a future. + lock.callback = Some(bound_cb); + } if self.result.get().is_some() { // The value is already available, we need to call the callback ourselves - mem::drop(lock); bound_cb.invoke(self_ptr); return CassError::CASS_OK; } From a1ca3f713e3adf42f71605c29690637a5b16a7f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Thu, 31 Jul 2025 15:21:00 +0200 Subject: [PATCH 10/18] future: introduce FutureKind When trying to introduce configurable runtime to the CassFuture framework, I realized that the current implementation of CassFuture is not flexible enough. Namely, those futures that are immediately ready with the result (e.g. futures that are created with `CassFuture::new_ready`) do not have access to the runtime on creation. To provide better type safety, this kind of future was extracted as a separate enum variant of `FutureKind`, with the other variant representing futures that must be resolved by the tokio runtime. The next commit demonstrates how extracting these two kinds of futures into a separate enum allows us to implement the configurable runtime in the CassFuture framework. --- scylla-rust-wrapper/src/future.rs | 179 +++++++++++++++++++++--------- 1 file changed, 129 insertions(+), 50 deletions(-) diff --git a/scylla-rust-wrapper/src/future.rs b/scylla-rust-wrapper/src/future.rs index 2a9fce3e..297971e2 100644 --- a/scylla-rust-wrapper/src/future.rs +++ b/scylla-rust-wrapper/src/future.rs @@ -9,6 +9,7 @@ use futures::future; use std::future::Future; use std::mem; use std::os::raw::c_void; +use std::sync::atomic::AtomicBool; use std::sync::{Arc, Condvar, Mutex, OnceLock}; use tokio::task::JoinHandle; use tokio::time::Duration; @@ -57,7 +58,18 @@ struct CassFutureState { join_handle: Option>, } -pub struct CassFuture { +enum FutureKind { + /// Future that must be resolved by the tokio runtime. + Resolvable { fut: ResolvableFuture }, + + /// Future that is immediately ready with the result. + Immediate { + res: CassFutureResult, + callback_set: AtomicBool, + }, +} + +struct ResolvableFuture { /// Mutable state of the future that requires synchronized exclusive access /// in order to ensure thread safety of the future execution. state: Mutex, @@ -65,10 +77,6 @@ pub struct CassFuture { /// Result of the future once it is resolved. result: OnceLock, - /// Required as a place to allocate the stringified error message. - /// This is needed to support `cass_future_error_message`. - err_string: OnceLock, - /// Used to notify threads waiting for the future's result. wait_for_value: Condvar, @@ -76,6 +84,15 @@ pub struct CassFuture { recording_listener: Option>, } +pub struct CassFuture { + /// One of the possible implementations of the future. + kind: FutureKind, + + /// Required as a place to allocate the stringified error message. + /// This is needed to support `cass_future_error_message`. + err_string: OnceLock, +} + impl FFI for CassFuture { type Origin = FromArc; } @@ -116,19 +133,30 @@ impl CassFuture { >, ) -> Arc { let cass_fut = Arc::new(CassFuture { - state: Mutex::new(Default::default()), - result: OnceLock::new(), err_string: OnceLock::new(), - wait_for_value: Condvar::new(), - #[cfg(cpp_integration_testing)] - recording_listener, + kind: FutureKind::Resolvable { + fut: ResolvableFuture { + state: Mutex::new(Default::default()), + result: OnceLock::new(), + wait_for_value: Condvar::new(), + #[cfg(cpp_integration_testing)] + recording_listener, + }, + }, }); let cass_fut_clone = Arc::clone(&cass_fut); let join_handle = RUNTIME.spawn(async move { + let resolvable_fut = match cass_fut_clone.kind { + FutureKind::Resolvable { + fut: ref resolvable, + } => resolvable, + _ => unreachable!("CassFuture has been created as Resolvable"), + }; + let r = fut.await; let maybe_cb = { - let guard = cass_fut_clone.state.lock().unwrap(); - cass_fut_clone + let guard = resolvable_fut.state.lock().unwrap(); + resolvable_fut .result .set(r) .expect("Tried to resolve future result twice!"); @@ -144,23 +172,28 @@ impl CassFuture { bound_cb.invoke(fut_ptr); } - cass_fut_clone.wait_for_value.notify_all(); + resolvable_fut.wait_for_value.notify_all(); }); { - let mut lock = cass_fut.state.lock().unwrap(); + let resolvable_fut = match cass_fut.kind { + FutureKind::Resolvable { + fut: ref resolvable, + } => resolvable, + _ => unreachable!("CassFuture has been created as Resolvable"), + }; + let mut lock = resolvable_fut.state.lock().unwrap(); lock.join_handle = Some(join_handle); } cass_fut } - pub(crate) fn new_ready(r: CassFutureResult) -> Arc { + pub(crate) fn new_ready(res: CassFutureResult) -> Arc { Arc::new(CassFuture { - state: Mutex::new(CassFutureState::default()), - result: OnceLock::from(r), + kind: FutureKind::Immediate { + res, + callback_set: AtomicBool::new(false), + }, err_string: OnceLock::new(), - wait_for_value: Condvar::new(), - #[cfg(cpp_integration_testing)] - recording_listener: None, }) } @@ -180,15 +213,23 @@ impl CassFuture { /// timed out (see [CassFuture::waited_result_timed]). We need to /// take the ownership of the handle, and complete the work. pub(crate) fn waited_result(&self) -> &CassFutureResult { + let resolvable_fut = match self.kind { + FutureKind::Resolvable { + fut: ref resolvable_fut, + } => resolvable_fut, + // The future is immediately ready, so we can return the result. + FutureKind::Immediate { ref res, .. } => return res, + }; + // Happy path: if the result is already available, we can return it // without locking the Mutex. - if let Some(result) = self.result.get() { + if let Some(result) = resolvable_fut.result.get() { return result; } - let mut guard = self.state.lock().unwrap(); + let mut guard = resolvable_fut.state.lock().unwrap(); loop { - if let Some(result) = self.result.get() { + if let Some(result) = resolvable_fut.result.get() { // The result is already available, we can return it. return result; } @@ -202,11 +243,11 @@ impl CassFuture { // Once we are here, the future is resolved. // The result is guaranteed to be set. - return self.result.get().unwrap(); + return resolvable_fut.result.get().unwrap(); } else { // Someone has taken the handle, so we need to wait for them to complete // the future. Once they finish or timeout, we will be notified. - guard = self + guard = resolvable_fut .wait_for_value .wait_while(guard, |state| { // There are two cases when we should wake up: @@ -220,7 +261,7 @@ impl CassFuture { // no one else will complete the future, so it's our responsibility. // In the next iteration we will land in the branch with `block_on` // and complete the future. - self.result.get().is_none() && state.join_handle.is_none() + resolvable_fut.result.get().is_none() && state.join_handle.is_none() }) // unwrap: Error appears only when mutex is poisoned. .unwrap(); @@ -250,19 +291,27 @@ impl CassFuture { &self, timeout_duration: Duration, ) -> Result<&CassFutureResult, FutureError> { + let resolvable_fut = match self.kind { + FutureKind::Resolvable { + fut: ref resolvable_fut, + } => resolvable_fut, + // The future is immediately ready, so we can return the result. + FutureKind::Immediate { ref res, .. } => return Ok(res), + }; + // Happy path: if the result is already available, we can return it // without locking the Mutex. - if let Some(result) = self.result.get() { + if let Some(result) = resolvable_fut.result.get() { return Ok(result); } - let mut guard = self.state.lock().unwrap(); + let mut guard = resolvable_fut.state.lock().unwrap(); let deadline = tokio::time::Instant::now() .checked_add(timeout_duration) .ok_or(FutureError::InvalidDuration)?; loop { - if let Some(result) = self.result.get() { + if let Some(result) = resolvable_fut.result.get() { // The result is already available, we can return it. return Ok(result); } @@ -293,9 +342,9 @@ impl CassFuture { // - Signal one thread, so that if all other consumers are // already waiting on condvar, one of them wakes up and // picks up the work. - guard = self.state.lock().unwrap(); + guard = resolvable_fut.state.lock().unwrap(); guard.join_handle = Some(returned_handle); - self.wait_for_value.notify_one(); + resolvable_fut.wait_for_value.notify_one(); return Err(FutureError::TimeoutError); } // unwrap: JoinError appears only when future either panic'ed or canceled. @@ -304,14 +353,14 @@ impl CassFuture { // Once we are here, the future is resolved. // The result is guaranteed to be set. - return Ok(self.result.get().unwrap()); + return Ok(resolvable_fut.result.get().unwrap()); } }; } else { // Someone has taken the handle, so we need to wait for them to complete // the future. Once they finish or timeout, we will be notified. let remaining_timeout = deadline.duration_since(tokio::time::Instant::now()); - let (guard_result, timeout_result) = self + let (guard_result, timeout_result) = resolvable_fut .wait_for_value .wait_timeout_while(guard, remaining_timeout, |state| { // There are two cases when we should wake up: @@ -325,7 +374,7 @@ impl CassFuture { // no one else will complete the future, so it's our responsibility. // In the next iteration we will land in the branch with `block_on` // and attempt to complete the future. - self.result.get().is_none() && state.join_handle.is_none() + resolvable_fut.result.get().is_none() && state.join_handle.is_none() }) // unwrap: Error appears only when mutex is poisoned. .unwrap(); @@ -338,7 +387,26 @@ impl CassFuture { } } - pub(crate) unsafe fn set_callback( + pub(crate) fn into_raw(self: Arc) -> CassOwnedSharedPtr { + ArcFFI::into_ptr(self) + } + + #[cfg(cpp_integration_testing)] + pub(crate) fn attempted_hosts(&self) -> Vec { + if let FutureKind::Resolvable { + fut: ref resolvable_fut, + } = self.kind + && let Some(listener) = &resolvable_fut.recording_listener + { + listener.get_attempted_hosts() + } else { + vec![] + } + } +} + +impl ResolvableFuture { + unsafe fn set_callback( &self, self_ptr: CassBorrowedSharedPtr, cb: NonNullFutureCallback, @@ -368,19 +436,6 @@ impl CassFuture { } CassError::CASS_OK } - - pub(crate) fn into_raw(self: Arc) -> CassOwnedSharedPtr { - ArcFFI::into_ptr(self) - } - - #[cfg(cpp_integration_testing)] - pub(crate) fn attempted_hosts(&self) -> Vec { - if let Some(listener) = &self.recording_listener { - listener.get_attempted_hosts() - } else { - vec![] - } - } } // Do not remove; this asserts that `CassFuture` implements Send + Sync, @@ -405,7 +460,26 @@ pub unsafe extern "C" fn cass_future_set_callback( return CassError::CASS_ERROR_LIB_BAD_PARAMS; }; - unsafe { future.set_callback(future_raw.borrow(), callback, data) } + match future.kind { + FutureKind::Resolvable { + fut: ref resolvable, + } => { + // Safety: `callback` is a valid pointer to a function that matches the signature. + unsafe { resolvable.set_callback(future_raw.borrow(), callback, data) } + } + FutureKind::Immediate { + ref callback_set, .. + } => { + if callback_set.swap(true, std::sync::atomic::Ordering::Relaxed) { + // Another callback has been already set. + return CassError::CASS_ERROR_LIB_CALLBACK_ALREADY_SET; + } + + let bound_cb = BoundCallback { cb: callback, data }; + bound_cb.invoke(future_raw.borrow()); + CassError::CASS_OK + } + } } #[unsafe(no_mangle)] @@ -442,7 +516,12 @@ pub unsafe extern "C" fn cass_future_ready( return cass_false; }; - future.result.get().is_some() as cass_bool_t + (match future.kind { + FutureKind::Resolvable { + fut: ref resolvable_fut, + } => resolvable_fut.result.get().is_some(), + FutureKind::Immediate { .. } => true, + }) as cass_bool_t } #[unsafe(no_mangle)] From f1489669e8db6d69ac9ea565ab10a0d671030bfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Sun, 3 Aug 2025 07:32:31 +0200 Subject: [PATCH 11/18] future: optimise `cass_future_error_message()` The function always allocates a new string for the error message when called the first time on a future in order to return a pointer to that allocated string to the caller. This is unnecessary when the future's error kind is already the string form. This commit performs a small refactor that makes allocation avoided for those two cases with errors given in a string form. Credits to @lorak-mmk for the suggestion. --- scylla-rust-wrapper/src/future.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/scylla-rust-wrapper/src/future.rs b/scylla-rust-wrapper/src/future.rs index 297971e2..aae14107 100644 --- a/scylla-rust-wrapper/src/future.rs +++ b/scylla-rust-wrapper/src/future.rs @@ -552,13 +552,13 @@ pub unsafe extern "C" fn cass_future_error_message( }; let value = future.waited_result(); - let msg = future.err_string.get_or_init(|| match value { - Ok(CassResultValue::QueryError(err)) => err.msg(), - Err((_, s)) => s.msg(), - _ => "".to_string(), - }); + let msg = match value { + Ok(CassResultValue::QueryError(err)) => future.err_string.get_or_init(|| err.msg()), + Err((_, s)) => s, + _ => "", + }; - unsafe { write_str_to_c(msg.as_str(), message, message_length) }; + unsafe { write_str_to_c(msg, message, message_length) }; } #[unsafe(no_mangle)] From 83b3a75b26efd1bd34eb377f933d3177cf1a20ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Thu, 31 Jul 2025 15:34:29 +0200 Subject: [PATCH 12/18] session: handle disconnection errors synchronously When issuing an asynchronous operation on a session, such as executing or preparing a statement, let's synchronously ensure that the session is connected before proceeding. If the session is not connected, return an error immediately instead of allowing the operation to continue and asynchronously fail later. This change is not much observable by a user, but it's necessary to introduce the next change involving runtime configuration. --- scylla-rust-wrapper/src/session.rs | 72 +++++++++++++++++++----------- 1 file changed, 46 insertions(+), 26 deletions(-) diff --git a/scylla-rust-wrapper/src/session.rs b/scylla-rust-wrapper/src/session.rs index 541110d4..eb5665f1 100644 --- a/scylla-rust-wrapper/src/session.rs +++ b/scylla-rust-wrapper/src/session.rs @@ -279,13 +279,18 @@ pub unsafe extern "C" fn cass_session_execute_batch( let mut state = batch_from_raw.state.clone(); let batch_exec_profile = batch_from_raw.exec_profile.clone(); + let Some(connected_session) = session_guard.connected.as_ref() else { + return CassFuture::make_ready_raw(Err(( + CassError::CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, + "Session is not connected".msg(), + ))); + }; + let future = async move { - let Some(connected_session) = session_guard.connected.as_ref() else { - return Err(( - CassError::CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, - "Session is not connected".msg(), - )); - }; + let connected_session = session_guard + .connected + .as_ref() + .expect("This should have been handled synchronously!"); let session = &connected_session.session; @@ -338,6 +343,13 @@ pub unsafe extern "C" fn cass_session_execute( ))); }; + let Some(connected_session) = session_guard.connected.as_ref() else { + return CassFuture::make_ready_raw(Err(( + CassError::CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, + "Session is not connected".msg(), + ))); + }; + let paging_state = statement_opt.paging_state.clone(); let paging_enabled = statement_opt.paging_enabled; let mut statement = statement_opt.statement.clone(); @@ -366,15 +378,13 @@ pub unsafe extern "C" fn cass_session_execute( let statement_exec_profile = statement_opt.exec_profile.clone(); let future = async move { - let Some(cass_connected_session) = session_guard.connected.as_ref() else { - return Err(( - CassError::CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, - "Session is not connected".msg(), - )); - }; - let session = &cass_connected_session.session; + let connected_session = session_guard + .connected + .as_ref() + .expect("This should have been handled synchronously!"); + let session = &connected_session.session; - let handle = cass_connected_session + let handle = connected_session .get_or_resolve_profile_handle(statement_exec_profile.as_ref()) .await?; @@ -500,6 +510,13 @@ pub unsafe extern "C" fn cass_session_prepare_from_existing( ))); }; + let Some(connected_session) = session_guard.connected.as_ref() else { + return CassFuture::make_ready_raw(Err(( + CassError::CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, + "Session is not connected".msg(), + ))); + }; + let statement = cass_statement.statement.clone(); CassFuture::make_raw( @@ -511,12 +528,10 @@ pub unsafe extern "C" fn cass_session_prepare_from_existing( } }; - let Some(connected_session) = session_guard.connected.as_ref() else { - return Err(( - CassError::CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, - "Session is not connected".msg(), - )); - }; + let connected_session = session_guard + .connected + .as_ref() + .expect("This should have been handled synchronously!"); let prepared = connected_session .session @@ -566,15 +581,20 @@ pub unsafe extern "C" fn cass_session_prepare_n( ))); }; + let Some(connected_session) = session_guard.connected.as_ref() else { + return CassFuture::make_ready_raw(Err(( + CassError::CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, + "Session is not connected".msg(), + ))); + }; + let query = Statement::new(query_str.to_string()); let fut = async move { - let Some(connected_session) = session_guard.connected.as_ref() else { - return Err(( - CassError::CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, - "Session is not connected".msg(), - )); - }; + let connected_session = session_guard + .connected + .as_ref() + .expect("This should have been handled synchronously!"); let prepared = connected_session .session From c76ad3ce440589851f1a73e89411c01e1bd280d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 29 Jul 2025 11:41:36 +0200 Subject: [PATCH 13/18] cluster: implement ...set_num_threads_io For now, CassCluster just configures and stores a runtime, but the driver ignores it and still uses the global runtime. --- Makefile | 2 +- README.md | 1 - scylla-rust-wrapper/src/api.rs | 3 +- scylla-rust-wrapper/src/cluster.rs | 47 +++++++++++++++++++ .../src/integration_testing.rs | 8 ---- src/testing_unimplemented.cpp | 3 -- 6 files changed, 49 insertions(+), 15 deletions(-) diff --git a/Makefile b/Makefile index f6a66555..15d7d333 100644 --- a/Makefile +++ b/Makefile @@ -153,7 +153,7 @@ SCYLLA_EXAMPLES_TO_RUN := \ # execution_profiles <- unimplemented `cass_statement_set_keyspace()` # host_listener <- unimplemented `cass_cluster_set_host_listener_callback()` # logging <- unimplemented `cass_cluster_set_host_listener_callback()` - # perf <- unimplemented `cass_cluster_set_num_threads_io()`, `cass_cluster_set_queue_size_io()` + # perf <- unimplemented `cass_cluster_set_queue_size_io()` # schema_meta <- unimplemented multiple schema-related functions # cloud <- out of interest for us, not related to ScyllaDB endif diff --git a/README.md b/README.md index 1fcd1b36..be955a43 100644 --- a/README.md +++ b/README.md @@ -263,7 +263,6 @@ The driver inherits almost all the features of C/C++ and Rust drivers, such as: - cass_cluster_set_monitor_reporting_interval - cass_cluster_set_new_request_ratio - cass_cluster_set_no_compact -- cass_cluster_set_num_threads_io - cass_cluster_set_pending_requests_high_water_mark - cass_cluster_set_pending_requests_low_water_mark - cass_cluster_set_prepare_on_all_hosts diff --git a/scylla-rust-wrapper/src/api.rs b/scylla-rust-wrapper/src/api.rs index 0fbbb754..17bd9b3d 100644 --- a/scylla-rust-wrapper/src/api.rs +++ b/scylla-rust-wrapper/src/api.rs @@ -103,7 +103,7 @@ pub mod cluster { // cass_cluster_set_new_request_ratio, UNIMPLEMENTED // cass_cluster_set_no_compact, UNIMPLEMENTED cass_cluster_set_no_speculative_execution_policy, - // cass_cluster_set_num_threads_io, UNIMPLEMENTED + cass_cluster_set_num_threads_io, cass_cluster_set_port, // cass_cluster_set_pending_requests_high_water_mark, UNIMPLEMENTED // cass_cluster_set_pending_requests_low_water_mark, UNIMPLEMENTED @@ -984,7 +984,6 @@ pub mod integration_testing { cass_cluster_set_authenticator_callbacks, cass_cluster_set_cloud_secure_connection_bundle, cass_cluster_set_host_listener_callback, - cass_cluster_set_num_threads_io, cass_cluster_set_queue_size_io, cass_cluster_set_cloud_secure_connection_bundle_n, cass_cluster_set_exponential_reconnect, diff --git a/scylla-rust-wrapper/src/cluster.rs b/scylla-rust-wrapper/src/cluster.rs index 9d020721..70d125cc 100644 --- a/scylla-rust-wrapper/src/cluster.rs +++ b/scylla-rust-wrapper/src/cluster.rs @@ -83,6 +83,8 @@ const DRIVER_VERSION: &str = env!("CARGO_PKG_VERSION"); #[derive(Clone)] pub struct CassCluster { + runtime: Arc, + session_builder: SessionBuilder, default_execution_profile_builder: ExecutionProfileBuilder, execution_profile_map: HashMap, @@ -100,6 +102,10 @@ pub struct CassCluster { } impl CassCluster { + pub(crate) fn get_runtime(&self) -> &Arc { + &self.runtime + } + pub(crate) fn execution_profile_map(&self) -> &HashMap { &self.execution_profile_map } @@ -179,6 +185,12 @@ impl CassCluster { #[unsafe(no_mangle)] pub unsafe extern "C" fn cass_cluster_new() -> CassOwnedExclusivePtr { + let Ok(default_runtime) = tokio::runtime::Runtime::new() + .inspect_err(|e| tracing::error!("Failed to create async runtime: {}", e)) + else { + return CassPtr::null_mut(); + }; + let default_execution_profile_builder = ExecutionProfileBuilder::default() .consistency(DEFAULT_CONSISTENCY) .serial_consistency(DEFAULT_SERIAL_CONSISTENCY) @@ -310,6 +322,8 @@ pub unsafe extern "C" fn cass_cluster_new() -> CassOwnedExclusivePtr, + num_threads: cass_uint32_t, +) -> CassError { + let Some(cluster) = BoxFFI::as_mut_ref(cluster) else { + tracing::error!("Provided null cluster pointer to cass_cluster_set_num_threads_io!"); + return CassError::CASS_ERROR_LIB_BAD_PARAMS; + }; + + let runtime_res = match num_threads { + 0 => tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(), + n => tokio::runtime::Builder::new_multi_thread() + .worker_threads(n as usize) + .enable_all() + .build(), + }; + + let runtime = match runtime_res { + Ok(runtime) => runtime, + Err(err) => { + tracing::error!("Failed to create async runtime: {}", err); + return CassError::CASS_ERROR_LIB_BAD_PARAMS; + } + }; + + cluster.runtime = Arc::new(runtime); + + CassError::CASS_OK +} + #[cfg(test)] mod tests { use crate::testing::{assert_cass_error_eq, setup_tracing}; diff --git a/scylla-rust-wrapper/src/integration_testing.rs b/scylla-rust-wrapper/src/integration_testing.rs index 768af496..8c18dc73 100644 --- a/scylla-rust-wrapper/src/integration_testing.rs +++ b/scylla-rust-wrapper/src/integration_testing.rs @@ -519,14 +519,6 @@ pub(crate) mod stubs { CassError::CASS_OK } - #[unsafe(no_mangle)] - pub extern "C" fn cass_cluster_set_num_threads_io( - _cluster_raw: CassBorrowedExclusivePtr, - _num_threads: u32, - ) -> CassError { - CassError::CASS_OK - } - #[unsafe(no_mangle)] pub extern "C" fn cass_cluster_set_queue_size_io( _cluster_raw: CassBorrowedExclusivePtr, diff --git a/src/testing_unimplemented.cpp b/src/testing_unimplemented.cpp index ced67512..b1b417bc 100644 --- a/src/testing_unimplemented.cpp +++ b/src/testing_unimplemented.cpp @@ -67,9 +67,6 @@ CASS_EXPORT CassError cass_cluster_set_host_listener_callback(CassCluster* clust CASS_EXPORT CassError cass_cluster_set_no_compact(CassCluster* cluster, cass_bool_t enabled) { throw std::runtime_error("UNIMPLEMENTED cass_cluster_set_no_compact\n"); } -CASS_EXPORT CassError cass_cluster_set_num_threads_io(CassCluster* cluster, unsigned num_threads) { - throw std::runtime_error("UNIMPLEMENTED cass_cluster_set_num_threads_io\n"); -} CASS_EXPORT CassError cass_cluster_set_prepare_on_all_hosts(CassCluster* cluster, cass_bool_t enabled) { throw std::runtime_error("UNIMPLEMENTED cass_cluster_set_prepare_on_all_hosts\n"); From f52a779b06f6d37967b3379f1131999cfc25278f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Thu, 31 Jul 2025 15:49:00 +0200 Subject: [PATCH 14/18] session,future: store & use cluster's tokio runtime This commit finishes the changes required to support configuration of the tokio runtime used by the Scylla Rust wrapper. `cass_cluster_set_num_threads_io` now properly configures the number of threads in the runtime, and the runtime is passed to all futures created by the wrapper. This allows the user to configure the runtime used by the wrapper for each `CassCluster` independently, granting full flexibility. --- scylla-rust-wrapper/src/future.rs | 32 +++++++++++++--- .../src/integration_testing.rs | 15 +++++++- scylla-rust-wrapper/src/lib.rs | 2 - scylla-rust-wrapper/src/session.rs | 38 ++++++++++++++++++- 4 files changed, 78 insertions(+), 9 deletions(-) diff --git a/scylla-rust-wrapper/src/future.rs b/scylla-rust-wrapper/src/future.rs index aae14107..3c5315a9 100644 --- a/scylla-rust-wrapper/src/future.rs +++ b/scylla-rust-wrapper/src/future.rs @@ -1,4 +1,3 @@ -use crate::RUNTIME; use crate::argconv::*; use crate::cass_error::{CassError, CassErrorMessage, CassErrorResult, ToCassError as _}; use crate::prepared::CassPrepared; @@ -70,6 +69,9 @@ enum FutureKind { } struct ResolvableFuture { + /// Runtime used to spawn and execute the future. + runtime: Arc, + /// Mutable state of the future that requires synchronized exclusive access /// in order to ensure thread safety of the future execution. state: Mutex, @@ -113,12 +115,14 @@ impl CassFuture { } pub(crate) fn make_raw( + runtime: Arc, fut: impl Future + Send + 'static, #[cfg(cpp_integration_testing)] recording_listener: Option< Arc, >, ) -> CassOwnedSharedPtr { Self::new_from_future( + runtime, fut, #[cfg(cpp_integration_testing)] recording_listener, @@ -127,6 +131,7 @@ impl CassFuture { } pub(crate) fn new_from_future( + runtime: Arc, fut: impl Future + Send + 'static, #[cfg(cpp_integration_testing)] recording_listener: Option< Arc, @@ -136,6 +141,7 @@ impl CassFuture { err_string: OnceLock::new(), kind: FutureKind::Resolvable { fut: ResolvableFuture { + runtime: Arc::clone(&runtime), state: Mutex::new(Default::default()), result: OnceLock::new(), wait_for_value: Condvar::new(), @@ -145,7 +151,7 @@ impl CassFuture { }, }); let cass_fut_clone = Arc::clone(&cass_fut); - let join_handle = RUNTIME.spawn(async move { + let join_handle = runtime.spawn(async move { let resolvable_fut = match cass_fut_clone.kind { FutureKind::Resolvable { fut: ref resolvable, @@ -239,7 +245,7 @@ impl CassFuture { // the future. mem::drop(guard); // unwrap: JoinError appears only when future either panic'ed or canceled. - RUNTIME.block_on(handle).unwrap(); + resolvable_fut.runtime.block_on(handle).unwrap(); // Once we are here, the future is resolved. // The result is guaranteed to be set. @@ -331,7 +337,7 @@ impl CassFuture { future::Either::Right((_, handle)) => Err(JoinHandleTimeout(handle)), } }; - match RUNTIME.block_on(timed) { + match resolvable_fut.runtime.block_on(timed) { Err(JoinHandleTimeout(returned_handle)) => { // We timed out. so we can't finish waiting for the future. // The problem is that if current thread executor is used, @@ -677,6 +683,15 @@ mod tests { time::Duration, }; + fn runtime_for_test() -> Arc { + Arc::new( + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(), + ) + } + // This is not a particularly smart test, but if some thread is granted access the value // before it is truly computed, then weird things should happen, even a segfault. // In the incorrect implementation that inspired this test to be written, this test @@ -685,11 +700,13 @@ mod tests { #[ntest::timeout(100)] fn cass_future_thread_safety() { const ERROR_MSG: &str = "NOBODY EXPECTED SPANISH INQUISITION"; + let runtime = runtime_for_test(); let fut = async { tokio::time::sleep(Duration::from_millis(10)).await; Err((CassError::CASS_OK, ERROR_MSG.into())) }; let cass_fut = CassFuture::make_raw( + runtime, fut, #[cfg(cpp_integration_testing)] None, @@ -724,11 +741,13 @@ mod tests { fn cass_future_resolves_after_timeout() { const ERROR_MSG: &str = "NOBODY EXPECTED SPANISH INQUISITION"; const HUNDRED_MILLIS_IN_MICROS: u64 = 100 * 1000; + let runtime = runtime_for_test(); let fut = async move { tokio::time::sleep(Duration::from_micros(HUNDRED_MILLIS_IN_MICROS)).await; Err((CassError::CASS_OK, ERROR_MSG.into())) }; let cass_fut = CassFuture::make_raw( + runtime, fut, #[cfg(cpp_integration_testing)] None, @@ -764,6 +783,8 @@ mod tests { const ERROR_MSG: &str = "NOBODY EXPECTED SPANISH INQUISITION"; const HUNDRED_MILLIS_IN_MICROS: u64 = 100 * 1000; + let runtime = runtime_for_test(); + let create_future_and_flag = || { unsafe extern "C" fn mark_flag_cb( _fut: CassBorrowedSharedPtr, @@ -780,6 +801,7 @@ mod tests { Err((CassError::CASS_OK, ERROR_MSG.into())) }; let cass_fut = CassFuture::make_raw( + Arc::clone(&runtime), fut, #[cfg(cpp_integration_testing)] None, @@ -855,7 +877,7 @@ mod tests { { let (cass_fut, flag_ptr) = create_future_and_flag(); - RUNTIME.block_on(async { + runtime.block_on(async { tokio::time::sleep(Duration::from_micros(HUNDRED_MILLIS_IN_MICROS + 10 * 1000)) .await }); diff --git a/scylla-rust-wrapper/src/integration_testing.rs b/scylla-rust-wrapper/src/integration_testing.rs index 8c18dc73..74ac9c81 100644 --- a/scylla-rust-wrapper/src/integration_testing.rs +++ b/scylla-rust-wrapper/src/integration_testing.rs @@ -280,14 +280,27 @@ pub unsafe extern "C" fn testing_future_get_attempted_hosts( unsafe { CString::from_vec_unchecked(concatenated_hosts.into_bytes()) }.into_raw() } +#[cfg(test)] +fn runtime_for_test() -> Arc { + Arc::new( + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(), + ) +} + /// Ensures that the `testing_future_get_attempted_hosts` function /// behaves correctly, i.e., it returns a list of attempted hosts as a concatenated string. #[test] fn test_future_get_attempted_hosts() { use scylla::observability::history::HistoryListener as _; + let runtime = runtime_for_test(); + let listener = Arc::new(RecordingHistoryListener::new()); - let future = CassFuture::new_from_future(std::future::pending(), Some(listener.clone())); + let future = + CassFuture::new_from_future(runtime, std::future::pending(), Some(listener.clone())); fn assert_attempted_hosts_eq(future: &Arc, hosts: &[String]) { let hosts_str = unsafe { testing_future_get_attempted_hosts(ArcFFI::as_ptr(future)) }; diff --git a/scylla-rust-wrapper/src/lib.rs b/scylla-rust-wrapper/src/lib.rs index 537bc741..d6a947bb 100644 --- a/scylla-rust-wrapper/src/lib.rs +++ b/scylla-rust-wrapper/src/lib.rs @@ -4,7 +4,6 @@ use crate::logging::Logger; use crate::logging::stderr_log_callback; use std::sync::LazyLock; use std::sync::RwLock; -use tokio::runtime::Runtime; #[macro_use] mod binding; @@ -190,7 +189,6 @@ pub(crate) mod cass_version_types { include_bindgen_generated!("cppdriver_version_types.rs"); } -pub(crate) static RUNTIME: LazyLock = LazyLock::new(|| Runtime::new().unwrap()); pub(crate) static LOGGER: LazyLock> = LazyLock::new(|| { RwLock::new(Logger { cb: Some(stderr_log_callback), diff --git a/scylla-rust-wrapper/src/session.rs b/scylla-rust-wrapper/src/session.rs index eb5665f1..7ed6babd 100644 --- a/scylla-rust-wrapper/src/session.rs +++ b/scylla-rust-wrapper/src/session.rs @@ -31,6 +31,7 @@ use std::sync::Arc; use tokio::sync::RwLock; pub(crate) struct CassConnectedSession { + runtime: Arc, session: Session, exec_profile_map: HashMap, } @@ -78,6 +79,7 @@ impl CassConnectedSession { let cluster_client_id = cluster.get_client_id(); let fut = Self::connect_fut( + Arc::clone(cluster.get_runtime()), session, session_builder, cluster_client_id, @@ -87,6 +89,7 @@ impl CassConnectedSession { ); CassFuture::make_raw( + Arc::clone(cluster.get_runtime()), fut, #[cfg(cpp_integration_testing)] None, @@ -94,6 +97,7 @@ impl CassConnectedSession { } async fn connect_fut( + runtime: Arc, session: Arc, session_builder_fut: impl Future, cluster_client_id: Option, @@ -152,6 +156,7 @@ impl CassConnectedSession { .map_err(|err| (err.to_cass_error(), err.msg()))?; session_guard.connected = Some(CassConnectedSession { + runtime, session, exec_profile_map, }); @@ -159,13 +164,32 @@ impl CassConnectedSession { } fn close_fut(session_opt: Arc>) -> Arc { + let runtime = { + let Ok(session_guard) = session_opt.try_read() else { + return CassFuture::new_ready(Err(( + CassError::CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, + "Still connecting or already closing".msg(), + ))); + }; + + let Some(connected_session) = session_guard.connected.as_ref() else { + return CassFuture::new_ready(Err(( + CassError::CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, + "Session is not connected".msg(), + ))); + }; + + Arc::clone(&connected_session.runtime) + }; + CassFuture::new_from_future( + runtime, async move { let mut session_guard = session_opt.write().await; if session_guard.connected.is_none() { return Err(( CassError::CASS_ERROR_LIB_UNABLE_TO_CLOSE, - "Already closing or closed".msg(), + "Session is not connected".msg(), )); } @@ -286,6 +310,8 @@ pub unsafe extern "C" fn cass_session_execute_batch( ))); }; + let runtime = Arc::clone(&connected_session.runtime); + let future = async move { let connected_session = session_guard .connected @@ -315,6 +341,7 @@ pub unsafe extern "C" fn cass_session_execute_batch( }; CassFuture::make_raw( + runtime, future, #[cfg(cpp_integration_testing)] None, @@ -350,6 +377,8 @@ pub unsafe extern "C" fn cass_session_execute( ))); }; + let runtime = Arc::clone(&connected_session.runtime); + let paging_state = statement_opt.paging_state.clone(); let paging_enabled = statement_opt.paging_enabled; let mut statement = statement_opt.statement.clone(); @@ -483,6 +512,7 @@ pub unsafe extern "C" fn cass_session_execute( }; CassFuture::make_raw( + runtime, future, #[cfg(cpp_integration_testing)] recording_listener, @@ -517,9 +547,12 @@ pub unsafe extern "C" fn cass_session_prepare_from_existing( ))); }; + let runtime = Arc::clone(&connected_session.runtime); + let statement = cass_statement.statement.clone(); CassFuture::make_raw( + runtime, async move { let query = match &statement { BoundStatement::Simple(q) => q, @@ -588,6 +621,8 @@ pub unsafe extern "C" fn cass_session_prepare_n( ))); }; + let runtime = Arc::clone(&connected_session.runtime); + let query = Statement::new(query_str.to_string()); let fut = async move { @@ -608,6 +643,7 @@ pub unsafe extern "C" fn cass_session_prepare_n( }; CassFuture::make_raw( + runtime, fut, #[cfg(cpp_integration_testing)] None, From b3d748d07ca1f053ab04d260ff552d51d4cd9399 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Mon, 4 Aug 2025 12:19:05 +0200 Subject: [PATCH 15/18] cluster: underive Clone for CassCluster There's no need to derive Clone for CassCluster, as there's never a need to clone it. --- scylla-rust-wrapper/src/cluster.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/scylla-rust-wrapper/src/cluster.rs b/scylla-rust-wrapper/src/cluster.rs index 70d125cc..acd95d0b 100644 --- a/scylla-rust-wrapper/src/cluster.rs +++ b/scylla-rust-wrapper/src/cluster.rs @@ -81,7 +81,6 @@ const DEFAULT_SHARD_AWARE_LOCAL_PORT_RANGE: ShardAwarePortRange = const DRIVER_NAME: &str = "ScyllaDB Cpp-Rust Driver"; const DRIVER_VERSION: &str = env!("CARGO_PKG_VERSION"); -#[derive(Clone)] pub struct CassCluster { runtime: Arc, From a4d74a9d9606c0d0d6f5de85aff95c6879ae99f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Mon, 4 Aug 2025 12:22:01 +0200 Subject: [PATCH 16/18] runtime: introduce cache for tokio runtimes As @lorak-mmk noted in the review, runtimes are currently not shared between `CassCluster` instances, which leads to possible many tokio runtimes being created in the application, with possibly a lot of threads. This commit introduces a cache for tokio runtimes, which is encapsulated in the global `Runtimes` struct. 1. `CassCluster` now does not store a `tokio::runtime::Runtime` directly, but rather an optional number of threads in the runtime. 2. The `Runtimes` struct is a global cache for tokio runtimes. It allows to get a default runtime or a runtime with a specified number of threads. Upon `cass_session_connect`, if a runtime is not created yet, it will create a new one and cache it for future use. The handling of the cache is fully transparent to the user of the abstraction. `CassSession` since then holds an `Arc`. 3. Once all `CassSession` instances that reference a runtime are dropped, the runtime is also dropped. This is done by storing weak pointers to runtimes in the `Runtimes` struct. Interesting to note: as Weak pointers keep the Arc allocation alive, a workflow that for consecutive `i`s connects a `CassSession` with a runtime with `i` threads and then drops it, will lead to space leaks. This is an artificial case, though. Remember that while the allocation will be still kept alive, the runtime itself will not be running, as it is dropped when the last `CassSession` referencing it is dropped. --- scylla-rust-wrapper/src/cluster.rs | 51 ++++++++----------- scylla-rust-wrapper/src/lib.rs | 1 + scylla-rust-wrapper/src/runtime.rs | 81 ++++++++++++++++++++++++++++++ scylla-rust-wrapper/src/session.rs | 4 +- 4 files changed, 106 insertions(+), 31 deletions(-) create mode 100644 scylla-rust-wrapper/src/runtime.rs diff --git a/scylla-rust-wrapper/src/cluster.rs b/scylla-rust-wrapper/src/cluster.rs index acd95d0b..6cf6e7a6 100644 --- a/scylla-rust-wrapper/src/cluster.rs +++ b/scylla-rust-wrapper/src/cluster.rs @@ -7,6 +7,7 @@ use crate::load_balancing::{ CassHostFilter, DcRestriction, LoadBalancingConfig, LoadBalancingKind, }; use crate::retry_policy::CassRetryPolicy; +use crate::runtime::RUNTIMES; use crate::ssl::CassSsl; use crate::timestamp_generator::CassTimestampGen; use crate::types::*; @@ -82,7 +83,11 @@ const DRIVER_NAME: &str = "ScyllaDB Cpp-Rust Driver"; const DRIVER_VERSION: &str = env!("CARGO_PKG_VERSION"); pub struct CassCluster { - runtime: Arc, + /// Number of threads in the tokio runtime thread pool. + /// + /// Specified with `cass_cluster_set_num_threads_io`. + /// If not set, the default tokio runtime is used. + num_threads_io: Option, session_builder: SessionBuilder, default_execution_profile_builder: ExecutionProfileBuilder, @@ -101,8 +106,20 @@ pub struct CassCluster { } impl CassCluster { - pub(crate) fn get_runtime(&self) -> &Arc { - &self.runtime + /// Gets the runtime that has been set for the cluster. + /// If no runtime has been set yet, it creates a default runtime + /// and makes it cached in the global `Runtimes` instance. + pub(crate) fn get_runtime(&self) -> Arc { + let mut runtimes = RUNTIMES.lock().unwrap(); + + if let Some(num_threads_io) = self.num_threads_io { + // If the number of threads is set, we create a runtime with that number of threads. + runtimes.n_thread_runtime(num_threads_io) + } else { + // Otherwise, we use the default runtime. + runtimes.default_runtime() + } + .unwrap_or_else(|err| panic!("Failed to create an async runtime: {err}")) } pub(crate) fn execution_profile_map(&self) -> &HashMap { @@ -184,12 +201,6 @@ impl CassCluster { #[unsafe(no_mangle)] pub unsafe extern "C" fn cass_cluster_new() -> CassOwnedExclusivePtr { - let Ok(default_runtime) = tokio::runtime::Runtime::new() - .inspect_err(|e| tracing::error!("Failed to create async runtime: {}", e)) - else { - return CassPtr::null_mut(); - }; - let default_execution_profile_builder = ExecutionProfileBuilder::default() .consistency(DEFAULT_CONSISTENCY) .serial_consistency(DEFAULT_SERIAL_CONSISTENCY) @@ -321,7 +332,7 @@ pub unsafe extern "C" fn cass_cluster_new() -> CassOwnedExclusivePtr tokio::runtime::Builder::new_current_thread() - .enable_all() - .build(), - n => tokio::runtime::Builder::new_multi_thread() - .worker_threads(n as usize) - .enable_all() - .build(), - }; - - let runtime = match runtime_res { - Ok(runtime) => runtime, - Err(err) => { - tracing::error!("Failed to create async runtime: {}", err); - return CassError::CASS_ERROR_LIB_BAD_PARAMS; - } - }; - - cluster.runtime = Arc::new(runtime); + cluster.num_threads_io = Some(num_threads as usize); CassError::CASS_OK } diff --git a/scylla-rust-wrapper/src/lib.rs b/scylla-rust-wrapper/src/lib.rs index d6a947bb..af0665a0 100644 --- a/scylla-rust-wrapper/src/lib.rs +++ b/scylla-rust-wrapper/src/lib.rs @@ -30,6 +30,7 @@ pub(crate) mod misc; pub(crate) mod prepared; pub(crate) mod query_result; pub(crate) mod retry_policy; +pub(crate) mod runtime; #[cfg(test)] mod ser_de_tests; pub(crate) mod session; diff --git a/scylla-rust-wrapper/src/runtime.rs b/scylla-rust-wrapper/src/runtime.rs new file mode 100644 index 00000000..e16a8eda --- /dev/null +++ b/scylla-rust-wrapper/src/runtime.rs @@ -0,0 +1,81 @@ +//! Manages tokio runtimes for the application. +//! +//! Runtime is per-cluster and can be changed with `cass_cluster_set_num_threads_io`. + +use std::{ + collections::HashMap, + sync::{Arc, Weak}, +}; + +use tokio::runtime::Runtime; + +/// Manages tokio runtimes for the application. +/// +/// Runtime is per-cluster and can be changed with `cass_cluster_set_num_threads_io`. +/// Once a runtime is created, it is cached for future use. +/// Once all `CassSession` instances that reference the runtime are dropped, +/// the runtime is also dropped. +pub(crate) struct Runtimes { + // Weak pointers are used to make runtimes dropped once all `CassSession` instances + // that reference them are freed. + default_runtime: Option>, + // This is Option to allow creating a static instance of Runtimes. + // (`HashMap::new` is not `const`). + n_thread_runtimes: Option>>, +} + +pub(crate) static RUNTIMES: std::sync::Mutex = { + std::sync::Mutex::new(Runtimes { + default_runtime: None, + n_thread_runtimes: None, + }) +}; + +impl Runtimes { + fn cached_or_new_runtime( + weak_runtime: &mut Weak, + create_runtime: impl FnOnce() -> Result, std::io::Error>, + ) -> Result, std::io::Error> { + match weak_runtime.upgrade() { + Some(cached_runtime) => Ok(cached_runtime), + None => { + let runtime = create_runtime()?; + *weak_runtime = Arc::downgrade(&runtime); + Ok(runtime) + } + } + } + + /// Returns a default tokio runtime. + /// + /// If it's not created yet, it will create a new one with the default configuration + /// and cache it for future use. + pub(crate) fn default_runtime(&mut self) -> Result, std::io::Error> { + let default_runtime_slot = self.default_runtime.get_or_insert_with(Weak::new); + Self::cached_or_new_runtime(default_runtime_slot, || Runtime::new().map(Arc::new)) + } + + /// Returns a tokio runtime with `n_threads` worker threads. + /// + /// If it's not created yet, it will create a new one and cache it for future use. + pub(crate) fn n_thread_runtime( + &mut self, + n_threads: usize, + ) -> Result, std::io::Error> { + let n_thread_runtimes = self.n_thread_runtimes.get_or_insert_with(HashMap::new); + let n_thread_runtime_slot = n_thread_runtimes.entry(n_threads).or_default(); + + Self::cached_or_new_runtime(n_thread_runtime_slot, || { + match n_threads { + 0 => tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(), + n => tokio::runtime::Builder::new_multi_thread() + .worker_threads(n) + .enable_all() + .build(), + } + .map(Arc::new) + }) + } +} diff --git a/scylla-rust-wrapper/src/session.rs b/scylla-rust-wrapper/src/session.rs index 7ed6babd..acaec04a 100644 --- a/scylla-rust-wrapper/src/session.rs +++ b/scylla-rust-wrapper/src/session.rs @@ -79,7 +79,7 @@ impl CassConnectedSession { let cluster_client_id = cluster.get_client_id(); let fut = Self::connect_fut( - Arc::clone(cluster.get_runtime()), + cluster.get_runtime(), session, session_builder, cluster_client_id, @@ -89,7 +89,7 @@ impl CassConnectedSession { ); CassFuture::make_raw( - Arc::clone(cluster.get_runtime()), + cluster.get_runtime(), fut, #[cfg(cpp_integration_testing)] None, From 6c94f6a71d96872368463aaa9c5819490edb895d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Thu, 31 Jul 2025 17:26:41 +0200 Subject: [PATCH 17/18] cassandra.h: state correct default IO threads num CPP-Rust Driver since its beginnings has used a default number of IO threads that is the default of tokio multi-threaded runtime, which is equal to the number of CPU cores available to the system. This commit updates the documentation to reflect this behaviour. --- include/cassandra.h | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/include/cassandra.h b/include/cassandra.h index 85af797c..8b503bdb 100644 --- a/include/cassandra.h +++ b/include/cassandra.h @@ -1729,10 +1729,37 @@ cass_cluster_set_serial_consistency(CassCluster* cluster, CassConsistency consistency); /** - * Sets the number of IO threads. This is the number of threads - * that will handle query requests. - * - * Default: 1 + * Sets the number of IO threads. This is the number of dedicated runtime threads + * that will resolve driver's futures, handling requests and other IO operations. + * + * If `num_threads` > 0 is given, the driver will create a dedicated thread pool + * with the specified number of threads. This is the recommended way to use the + * driver, as it allows the driver to execute tasks in parallel and utilize + * multiple CPU cores. Also, this makes the execution of futures eager and + * in-background, allowing the main thread to do whatever it wants concurrently + * with the futures. + * + * If 0 is specified, the `current_thread` tokio runtime will be used. This runtime + * has no dedicated worker threads, but instead uses the current thread to execute + * all tasks. This ensures the lowest possible overhead, may make sense for testing + * and debugging purposes, or for applications that do not require high concurrency. + * Also, single-CPU machines may benefit from this runtime, as operating on a single + * thread is usually faster than switching between multiple threads. + * **BEWARE:** the semantics of `CassFuture` when `current_thread` runtime is enabled + * are different. The futures will not start execution immediately when they are + * created, but only when some user thread awaits some future. That is, any thread + * that awaits a future will start the execution of all futures that are ready + * to be executed at that moment. This means that the only way to ensure that + * a future is executed is to await it. On the other hand, if one future is being + * awaited, then all other existing futures will be executed in the same thread + * until the awaited future is resolved. + * A notable example of code that is not compatible with `current_thread` runtime + * is the `callbacks` example in this codebase. This is because the main thread + * sets up callbacks and then blocks itself on a conditional variable. As no other + * thread exists that drives the futures, the callbacks will never be called + * and thus the program will hang. + * + * Default: Number of CPU cores available to the system. * * @public @memberof CassCluster * From a20c4f0d565ce4c8494da266d7b2fd81b10016eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Thu, 31 Jul 2025 17:35:10 +0200 Subject: [PATCH 18/18] examples/perf: use 1 connection per shard The `perf` example has demonstrated an antipattern of using 1 connection per host, which is not recommended for production use in case of ScyllaDB. This commit updates the example to use 1 connection per shard, which is the recommended practice for ScyllaDB. --- examples/perf/perf.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/perf/perf.c b/examples/perf/perf.c index 91e2c4d4..d036656e 100644 --- a/examples/perf/perf.c +++ b/examples/perf/perf.c @@ -137,7 +137,7 @@ CassCluster* create_cluster(const char* hosts) { cass_cluster_set_credentials(cluster, "cassandra", "cassandra"); cass_cluster_set_num_threads_io(cluster, NUM_IO_WORKER_THREADS); cass_cluster_set_queue_size_io(cluster, 10000); - cass_cluster_set_core_connections_per_host(cluster, 1); + cass_cluster_set_core_connections_per_shard(cluster, 1); return cluster; }