Skip to content

Commit c76ad3c

Browse files
committed
cluster: implement ...set_num_threads_io
For now, CassCluster just configures and stores a runtime, but the driver ignores it and still uses the global runtime.
1 parent 83b3a75 commit c76ad3c

File tree

6 files changed

+49
-15
lines changed

6 files changed

+49
-15
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ SCYLLA_EXAMPLES_TO_RUN := \
153153
# execution_profiles <- unimplemented `cass_statement_set_keyspace()`
154154
# host_listener <- unimplemented `cass_cluster_set_host_listener_callback()`
155155
# logging <- unimplemented `cass_cluster_set_host_listener_callback()`
156-
# perf <- unimplemented `cass_cluster_set_num_threads_io()`, `cass_cluster_set_queue_size_io()`
156+
# perf <- unimplemented `cass_cluster_set_queue_size_io()`
157157
# schema_meta <- unimplemented multiple schema-related functions
158158
# cloud <- out of interest for us, not related to ScyllaDB
159159
endif

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,6 @@ The driver inherits almost all the features of C/C++ and Rust drivers, such as:
263263
- cass_cluster_set_monitor_reporting_interval
264264
- cass_cluster_set_new_request_ratio
265265
- cass_cluster_set_no_compact
266-
- cass_cluster_set_num_threads_io
267266
- cass_cluster_set_pending_requests_high_water_mark
268267
- cass_cluster_set_pending_requests_low_water_mark
269268
- cass_cluster_set_prepare_on_all_hosts

scylla-rust-wrapper/src/api.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ pub mod cluster {
103103
// cass_cluster_set_new_request_ratio, UNIMPLEMENTED
104104
// cass_cluster_set_no_compact, UNIMPLEMENTED
105105
cass_cluster_set_no_speculative_execution_policy,
106-
// cass_cluster_set_num_threads_io, UNIMPLEMENTED
106+
cass_cluster_set_num_threads_io,
107107
cass_cluster_set_port,
108108
// cass_cluster_set_pending_requests_high_water_mark, UNIMPLEMENTED
109109
// cass_cluster_set_pending_requests_low_water_mark, UNIMPLEMENTED
@@ -984,7 +984,6 @@ pub mod integration_testing {
984984
cass_cluster_set_authenticator_callbacks,
985985
cass_cluster_set_cloud_secure_connection_bundle,
986986
cass_cluster_set_host_listener_callback,
987-
cass_cluster_set_num_threads_io,
988987
cass_cluster_set_queue_size_io,
989988
cass_cluster_set_cloud_secure_connection_bundle_n,
990989
cass_cluster_set_exponential_reconnect,

scylla-rust-wrapper/src/cluster.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ const DRIVER_VERSION: &str = env!("CARGO_PKG_VERSION");
8383

8484
#[derive(Clone)]
8585
pub struct CassCluster {
86+
runtime: Arc<tokio::runtime::Runtime>,
87+
8688
session_builder: SessionBuilder,
8789
default_execution_profile_builder: ExecutionProfileBuilder,
8890
execution_profile_map: HashMap<ExecProfileName, CassExecProfile>,
@@ -100,6 +102,10 @@ pub struct CassCluster {
100102
}
101103

102104
impl CassCluster {
105+
pub(crate) fn get_runtime(&self) -> &Arc<tokio::runtime::Runtime> {
106+
&self.runtime
107+
}
108+
103109
pub(crate) fn execution_profile_map(&self) -> &HashMap<ExecProfileName, CassExecProfile> {
104110
&self.execution_profile_map
105111
}
@@ -179,6 +185,12 @@ impl CassCluster {
179185

180186
#[unsafe(no_mangle)]
181187
pub unsafe extern "C" fn cass_cluster_new() -> CassOwnedExclusivePtr<CassCluster, CMut> {
188+
let Ok(default_runtime) = tokio::runtime::Runtime::new()
189+
.inspect_err(|e| tracing::error!("Failed to create async runtime: {}", e))
190+
else {
191+
return CassPtr::null_mut();
192+
};
193+
182194
let default_execution_profile_builder = ExecutionProfileBuilder::default()
183195
.consistency(DEFAULT_CONSISTENCY)
184196
.serial_consistency(DEFAULT_SERIAL_CONSISTENCY)
@@ -310,6 +322,8 @@ pub unsafe extern "C" fn cass_cluster_new() -> CassOwnedExclusivePtr<CassCluster
310322
};
311323

312324
BoxFFI::into_ptr(Box::new(CassCluster {
325+
runtime: Arc::new(default_runtime),
326+
313327
session_builder: default_session_builder,
314328
port: 9042,
315329
contact_points: Vec::new(),
@@ -1531,6 +1545,39 @@ pub unsafe extern "C" fn cass_cluster_set_execution_profile_n(
15311545
CassError::CASS_OK
15321546
}
15331547

1548+
#[unsafe(no_mangle)]
1549+
pub unsafe extern "C" fn cass_cluster_set_num_threads_io(
1550+
cluster: CassBorrowedExclusivePtr<CassCluster, CMut>,
1551+
num_threads: cass_uint32_t,
1552+
) -> CassError {
1553+
let Some(cluster) = BoxFFI::as_mut_ref(cluster) else {
1554+
tracing::error!("Provided null cluster pointer to cass_cluster_set_num_threads_io!");
1555+
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
1556+
};
1557+
1558+
let runtime_res = match num_threads {
1559+
0 => tokio::runtime::Builder::new_current_thread()
1560+
.enable_all()
1561+
.build(),
1562+
n => tokio::runtime::Builder::new_multi_thread()
1563+
.worker_threads(n as usize)
1564+
.enable_all()
1565+
.build(),
1566+
};
1567+
1568+
let runtime = match runtime_res {
1569+
Ok(runtime) => runtime,
1570+
Err(err) => {
1571+
tracing::error!("Failed to create async runtime: {}", err);
1572+
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
1573+
}
1574+
};
1575+
1576+
cluster.runtime = Arc::new(runtime);
1577+
1578+
CassError::CASS_OK
1579+
}
1580+
15341581
#[cfg(test)]
15351582
mod tests {
15361583
use crate::testing::{assert_cass_error_eq, setup_tracing};

scylla-rust-wrapper/src/integration_testing.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -519,14 +519,6 @@ pub(crate) mod stubs {
519519
CassError::CASS_OK
520520
}
521521

522-
#[unsafe(no_mangle)]
523-
pub extern "C" fn cass_cluster_set_num_threads_io(
524-
_cluster_raw: CassBorrowedExclusivePtr<CassCluster, CMut>,
525-
_num_threads: u32,
526-
) -> CassError {
527-
CassError::CASS_OK
528-
}
529-
530522
#[unsafe(no_mangle)]
531523
pub extern "C" fn cass_cluster_set_queue_size_io(
532524
_cluster_raw: CassBorrowedExclusivePtr<CassCluster, CMut>,

src/testing_unimplemented.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,6 @@ CASS_EXPORT CassError cass_cluster_set_host_listener_callback(CassCluster* clust
6767
CASS_EXPORT CassError cass_cluster_set_no_compact(CassCluster* cluster, cass_bool_t enabled) {
6868
throw std::runtime_error("UNIMPLEMENTED cass_cluster_set_no_compact\n");
6969
}
70-
CASS_EXPORT CassError cass_cluster_set_num_threads_io(CassCluster* cluster, unsigned num_threads) {
71-
throw std::runtime_error("UNIMPLEMENTED cass_cluster_set_num_threads_io\n");
72-
}
7370
CASS_EXPORT CassError cass_cluster_set_prepare_on_all_hosts(CassCluster* cluster,
7471
cass_bool_t enabled) {
7572
throw std::runtime_error("UNIMPLEMENTED cass_cluster_set_prepare_on_all_hosts\n");

0 commit comments

Comments
 (0)