Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ SCYLLA_EXAMPLES_TO_RUN := \
# execution_profiles <- unimplemented `cass_statement_set_keyspace()`
# host_listener <- unimplemented `cass_cluster_set_host_listener_callback()`
# logging <- unimplemented `cass_cluster_set_host_listener_callback()`
# perf <- unimplemented `cass_cluster_set_num_threads_io()`, `cass_cluster_set_queue_size_io()`
# perf <- unimplemented `cass_cluster_set_queue_size_io()`
# schema_meta <- unimplemented multiple schema-related functions
# cloud <- out of interest for us, not related to ScyllaDB
endif
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ The driver inherits almost all the features of C/C++ and Rust drivers, such as:
- cass_cluster_set_monitor_reporting_interval
- cass_cluster_set_new_request_ratio
- cass_cluster_set_no_compact
- cass_cluster_set_num_threads_io
- cass_cluster_set_pending_requests_high_water_mark
- cass_cluster_set_pending_requests_low_water_mark
- cass_cluster_set_prepare_on_all_hosts
Expand Down
2 changes: 1 addition & 1 deletion examples/perf/perf.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ CassCluster* create_cluster(const char* hosts) {
cass_cluster_set_credentials(cluster, "cassandra", "cassandra");
cass_cluster_set_num_threads_io(cluster, NUM_IO_WORKER_THREADS);
cass_cluster_set_queue_size_io(cluster, 10000);
cass_cluster_set_core_connections_per_host(cluster, 1);
cass_cluster_set_core_connections_per_shard(cluster, 1);
return cluster;
}

Expand Down
35 changes: 31 additions & 4 deletions include/cassandra.h
Original file line number Diff line number Diff line change
Expand Up @@ -1729,10 +1729,37 @@ cass_cluster_set_serial_consistency(CassCluster* cluster,
CassConsistency consistency);

/**
* Sets the number of IO threads. This is the number of threads
* that will handle query requests.
*
* <b>Default:</b> 1
* Sets the number of IO threads. This is the number of dedicated runtime threads
* that will resolve driver's futures, handling requests and other IO operations.
*
* If `num_threads` > 0 is given, the driver will create a dedicated thread pool
* with the specified number of threads. This is the recommended way to use the
* driver, as it allows the driver to execute tasks in parallel and utilize
* multiple CPU cores. Also, this makes the execution of futures eager and
* in-background, allowing the main thread to do whatever it wants concurrently
* with the futures.
*
* If 0 is specified, the `current_thread` tokio runtime will be used. This runtime
* has no dedicated worker threads, but instead uses the current thread to execute
* all tasks. This ensures the lowest possible overhead, may make sense for testing
* and debugging purposes, or for applications that do not require high concurrency.
* Also, single-CPU machines may benefit from this runtime, as operating on a single
* thread is usually faster than switching between multiple threads.
* **BEWARE:** the semantics of `CassFuture` when `current_thread` runtime is enabled
* are different. The futures will not start execution immediately when they are
* created, but only when some user thread awaits some future. That is, any thread
* that awaits a future will start the execution of all futures that are ready
* to be executed at that moment. This means that the only way to ensure that
* a future is executed is to await it. On the other hand, if one future is being
* awaited, then all other existing futures will be executed in the same thread
* until the awaited future is resolved.
* A notable example of code that is not compatible with `current_thread` runtime
* is the `callbacks` example in this codebase. This is because the main thread
* sets up callbacks and then blocks itself on a conditional variable. As no other
* thread exists that drives the futures, the callbacks will never be called
* and thus the program will hang.
*
* <b>Default:</b> Number of CPU cores available to the system.
*
* @public @memberof CassCluster
*
Expand Down
3 changes: 1 addition & 2 deletions scylla-rust-wrapper/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub mod cluster {
// cass_cluster_set_new_request_ratio, UNIMPLEMENTED
// cass_cluster_set_no_compact, UNIMPLEMENTED
cass_cluster_set_no_speculative_execution_policy,
// cass_cluster_set_num_threads_io, UNIMPLEMENTED
cass_cluster_set_num_threads_io,
cass_cluster_set_port,
// cass_cluster_set_pending_requests_high_water_mark, UNIMPLEMENTED
// cass_cluster_set_pending_requests_low_water_mark, UNIMPLEMENTED
Expand Down Expand Up @@ -984,7 +984,6 @@ pub mod integration_testing {
cass_cluster_set_authenticator_callbacks,
cass_cluster_set_cloud_secure_connection_bundle,
cass_cluster_set_host_listener_callback,
cass_cluster_set_num_threads_io,
cass_cluster_set_queue_size_io,
cass_cluster_set_cloud_secure_connection_bundle_n,
cass_cluster_set_exponential_reconnect,
Expand Down
41 changes: 40 additions & 1 deletion scylla-rust-wrapper/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::load_balancing::{
CassHostFilter, DcRestriction, LoadBalancingConfig, LoadBalancingKind,
};
use crate::retry_policy::CassRetryPolicy;
use crate::runtime::RUNTIMES;
use crate::ssl::CassSsl;
use crate::timestamp_generator::CassTimestampGen;
use crate::types::*;
Expand Down Expand Up @@ -81,8 +82,13 @@ const DEFAULT_SHARD_AWARE_LOCAL_PORT_RANGE: ShardAwarePortRange =
const DRIVER_NAME: &str = "ScyllaDB Cpp-Rust Driver";
const DRIVER_VERSION: &str = env!("CARGO_PKG_VERSION");

#[derive(Clone)]
pub struct CassCluster {
/// Number of threads in the tokio runtime thread pool.
///
/// Specified with `cass_cluster_set_num_threads_io`.
/// If not set, the default tokio runtime is used.
num_threads_io: Option<usize>,

session_builder: SessionBuilder,
default_execution_profile_builder: ExecutionProfileBuilder,
execution_profile_map: HashMap<ExecProfileName, CassExecProfile>,
Expand All @@ -100,6 +106,22 @@ pub struct CassCluster {
}

impl CassCluster {
/// Gets the runtime that has been set for the cluster.
/// If no runtime has been set yet, it creates a default runtime
/// and makes it cached in the global `Runtimes` instance.
pub(crate) fn get_runtime(&self) -> Arc<tokio::runtime::Runtime> {
let mut runtimes = RUNTIMES.lock().unwrap();

if let Some(num_threads_io) = self.num_threads_io {
// If the number of threads is set, we create a runtime with that number of threads.
runtimes.n_thread_runtime(num_threads_io)
} else {
// Otherwise, we use the default runtime.
runtimes.default_runtime()
}
.unwrap_or_else(|err| panic!("Failed to create an async runtime: {err}"))
}

pub(crate) fn execution_profile_map(&self) -> &HashMap<ExecProfileName, CassExecProfile> {
&self.execution_profile_map
}
Expand Down Expand Up @@ -310,6 +332,8 @@ pub unsafe extern "C" fn cass_cluster_new() -> CassOwnedExclusivePtr<CassCluster
};

BoxFFI::into_ptr(Box::new(CassCluster {
num_threads_io: None,

session_builder: default_session_builder,
port: 9042,
contact_points: Vec::new(),
Expand Down Expand Up @@ -1531,6 +1555,21 @@ pub unsafe extern "C" fn cass_cluster_set_execution_profile_n(
CassError::CASS_OK
}

#[unsafe(no_mangle)]
pub unsafe extern "C" fn cass_cluster_set_num_threads_io(
cluster: CassBorrowedExclusivePtr<CassCluster, CMut>,
num_threads: cass_uint32_t,
) -> CassError {
let Some(cluster) = BoxFFI::as_mut_ref(cluster) else {
tracing::error!("Provided null cluster pointer to cass_cluster_set_num_threads_io!");
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
};

cluster.num_threads_io = Some(num_threads as usize);

CassError::CASS_OK
}

#[cfg(test)]
mod tests {
use crate::testing::{assert_cass_error_eq, setup_tracing};
Expand Down
Loading
Loading