Skip to content

Conversation

wprzytula
Copy link
Collaborator

@wprzytula wprzytula commented Jul 31, 2025

Fixes: #259

What's done

This PR does the necessary refactors and implements cass_cluster_set_num_threads_io.

Semantics

The semantics of the function is slightly different than in the CPP Driver:

  • in CPP Driver, there are always n+1 worker threads, where n is the argument passed to cass_cluster_set_num_threads_io. One worker thread serves the control connection and other background Session tasks, whereas the remaining n worker threads serve requests.
  • in CPP-Rust Driver, the n argument is directly passed to worker_threads() config of the tokio multi_thread runtime. Those threads execute all tasks that are spawned in the wrapper and in the Rust Driver itself.

0 worker threads

This is an extension compared to the CPP Driver. 0 is handled specially: it uses current_thread tokio executor undeneath. Limitations and pitfalls of it are described in the function's documentation in cassandra.h.

CassFuture refactors

In order to support holding the runtime in each CassFuture more cleanly, the CassFuture is split into two variants:

  • Resolvable, which is like the CassFuture so far, but it also holds Arc<Runtime> it needs to use to poll the future,
  • Immediate, which is created as an instantly ready future and thus does not need to store a runtime.

As a bonus, I cleaned up the logic in CassFuture a bit and provided more documentation and comments.

Testing

I ran all examples with 0 worker threads. All but callbacks worked. callbacks, as expected, hung.

Pre-review checklist

  • I have split my patch into logically separate commits.
  • All commit messages clearly explain what they change and why.
  • PR description sums up the changes and reasons why they should be introduced.
  • I have implemented Rust unit tests for the features/changes introduced.
  • [ ] I have enabled appropriate tests in Makefile in {SCYLLA,CASSANDRA}_(NO_VALGRIND_)TEST_FILTER.
  • I added appropriate Fixes: annotations to PR description.

@wprzytula wprzytula added this to the 1.0.0 milestone Jul 31, 2025
@wprzytula wprzytula requested review from Lorak-mmk and Copilot July 31, 2025 17:15
@wprzytula wprzytula self-assigned this Jul 31, 2025
Copilot

This comment was marked as outdated.

@wprzytula wprzytula force-pushed the configurable-runtime branch from 7f546b5 to a98a7d2 Compare July 31, 2025 17:22
This is a refactor to avoid using `unwrap()` on the `connected` field
of the `SessionGuard`. Instead, it uses the `let-else` syntax to handle
the case where the session is not connected in a more idiomatic way.

Note that no `unwrap()` could really panic here, as the `connected`
field was always checked before, but this change makes the code
safer and more readable.
The `err_string` field in `CassFuture` serves as an allocation cache
for the error message string, which is generated only when needed
and must have a lifetime of `CassFuture`, as required by the
`cass_future_error_message` function. This field
was previously an `Option<String>` and was held under a `Mutex` inside
`CassFutureState`, but as its semantics fit `OnceLock` perfectlym it has
been changed accordingly and is now a `OnceLock<String>`.

As a pleasant result, `with_waited_result` can now be used instead of
`with_waited_state`, which simplifies the code and avoids `unwrap`.
There has long existed an API for awaiting futures that takes a closure
and invokes it with either the future state or the future result once
the future is ready.
This API is not really ergonomic, but it was a must as long as we were
holding the future result inside the future state, which is held behind
a mutex. As the result could be accessible only while the state's
`MutexGuard` is alive, we could not return references to the result
outside of the closure.
Now that we have the future result stored in an `OnceLock`, we can
return references to it, which makes the API much more ergonomic.
The future state is still held behind a mutex, so it's still not
possible to return a reference to the future state outside of
the closure, but after both the future result and the cached error
string were moved to `OnceLock`s out of the future state, we can
remove the closure-based future state awaiting API altogether,
and make the future result awaiting API reference-based instead
of a closure-based one. Perfect! Clean! Nice!

Refactors induced by this change in `waited_result(_timed)` has showed
that the logic there is a bit convoluted, so I added FIXMEs there.
I'm going to address them in further commits.
`CassFuture::waited_result(_timed)` methods have been documented better
and refactored to improve readability.
**Logic has NOT been changed.**

Code changes & rationale:
The previous logic was a bit convoluted: it `unwrap`ped the result
immediately after both branches of the `if` related to `join_handle`
merged back, which made it hard to understand the flow of the code.
Now, the logic is clearer: the result is `unwrap`ped in the branch
where we are sure that the future is resolved, i.e. the `join_handle`
had been `Some` and we have just completed the future.
The other branch, where we wait for the future to be completed,
is now clearer as well: once we wake up, there are two cases (quoting
the new comments):
1. The result is already available, so we can return it.
2. The `join_handle` was `None`, and now it's `Some` - some
   other thread must have timed out and returned the handle.
   We need to take over the work of completing the future, because
   the result is still not available and we may be using
   `current_thread` tokio executor, in which case no one else will
   complete the future, so it's our responsibility. In the next
   iteration we will land in the branch with `block_on` and complete
   the future.
In both cases we just continue the loop, so the code is simpler
and easier to read.
When getting the result of a future, we can avoid locking the
Mutex if the result is already available. This improves performance
when the result is already available, which is a common case.
CPP Driver has the semantics that a callback can be set only once for
the whole lifetime of a future. This commit fixes the Rust wrapper to
match this behavior. This is guaranteed by unconditionally setting
the callback field in the `set_callback` method, and by leaving the
callback field as `Some` after the callback is invoked in both cases:
1. when the future is already resolved, so the callback is invoked
   synchronously by the thread that sets the callback;
2. when the future is not resolved yet, so the callback is invoked
   asynchronously by the thread that resolves the future once it's
   resolved.
Scoped locking is in general more readable and less deadlock-prone than
using `mem::drop()` to release the lock early.
This commit refactors the `set_callback()` method to use scoped locking
instead of manually dropping the lock.
When trying to introduce configurable runtime to the CassFuture
framework, I realized that the current implementation of CassFuture
is not flexible enough. Namely, those futures that are immediately
ready with the result (e.g. futures that are created with
`CassFuture::new_ready`) do not have access to the runtime on creation.
To provide better type safety, this kind of future was extracted as a
separate enum variant of `FutureKind`, with the other variant
representing futures that must be resolved by the tokio runtime.

The next commit demonstrates how extracting these two kinds of futures
into a separate enum allows us to implement the configurable runtime
in the CassFuture framework.
The function always allocates a new string for the error message when
called the first time on a future in order to return a pointer to that
allocated string to the caller. This is unnecessary when the future's
error kind is already the string form. This commit performs a small
refactor that makes allocation avoided for those two cases with errors
given in a string form.
Credits to @Lorak-mmk for the suggestion.
When issuing an asynchronous operation on a session, such as executing
or preparing a statement, let's synchronously ensure that the session
is connected before proceeding. If the session is not connected, return
an error immediately instead of allowing the operation to continue and
asynchronously fail later.

This change is not much observable by a user, but it's necessary to
introduce the next change involving runtime configuration.
For now, CassCluster just configures and stores a runtime, but
the driver ignores it and still uses the global runtime.
This commit finishes the changes required to support configuration
of the tokio runtime used by the Scylla Rust wrapper.
`cass_cluster_set_num_threads_io` now properly configures the number
of threads in the runtime, and the runtime is passed to all futures
created by the wrapper.
This allows the user to configure the runtime used by the wrapper
for each `CassCluster` independently, granting full flexibility.
There's no need to derive Clone for CassCluster, as there's never a need
to clone it.
@wprzytula wprzytula force-pushed the configurable-runtime branch 2 times, most recently from e5f27c8 to 994355a Compare August 4, 2025 15:23
@wprzytula wprzytula requested review from Lorak-mmk and Copilot August 4, 2025 15:25
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements the cass_cluster_set_num_threads_io function to configure the number of threads in the tokio runtime used by the driver. The implementation includes refactoring CassFuture to support different runtime configurations and adding a runtime management system.

Key changes:

  • Implements cass_cluster_set_num_threads_io with support for 0 threads (single-threaded current_thread runtime) and multi-threaded configurations
  • Refactors CassFuture into Resolvable and Immediate variants to better handle runtime requirements
  • Adds a global runtime management system with caching for different thread configurations

Reviewed Changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
src/testing_unimplemented.cpp Removes unimplemented stub for cass_cluster_set_num_threads_io
scylla-rust-wrapper/src/runtime.rs Adds new runtime management module with caching for different thread configurations
scylla-rust-wrapper/src/cluster.rs Implements cass_cluster_set_num_threads_io and runtime retrieval logic
scylla-rust-wrapper/src/future.rs Major refactor splitting CassFuture into variants and updating runtime handling
scylla-rust-wrapper/src/session.rs Updates session operations to use cluster-specific runtime instead of global runtime
include/cassandra.h Updates documentation for cass_cluster_set_num_threads_io with new semantics
examples/perf/perf.c Fixes function call from deprecated to current API
Other files Various cleanup and integration changes

Copy link
Collaborator

@Lorak-mmk Lorak-mmk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, please fix commit message and comments in the commit that introduces runtime cache.

As @Lorak-mmk noted in the review, runtimes are currently not shared
between `CassCluster` instances, which leads to possible many tokio
runtimes being created in the application, with possibly a lot
of threads. This commit introduces a cache for tokio runtimes, which
is encapsulated in the global `Runtimes` struct.
1. `CassCluster` now does not store a `tokio::runtime::Runtime`
   directly, but rather an optional number of threads in the runtime.
2. The `Runtimes` struct is a global cache for tokio runtimes.
   It allows to get a default runtime or a runtime with a specified
   number of threads. Upon `cass_session_connect`, if a runtime is not
   created yet, it will create a new one and cache it for future use.
   The handling of the cache is fully transparent to the user of the
   abstraction. `CassSession` since then holds an `Arc<Runtime>`.
3. Once all `CassSession` instances that reference a runtime are
   dropped, the runtime is also dropped. This is done by storing weak
   pointers to runtimes in the `Runtimes` struct.

Interesting to note: as Weak pointers keep the Arc allocation alive,
a workflow that for consecutive `i`s connects a `CassSession` with a
runtime with `i` threads and then drops it, will lead to space leaks.
This is an artificial case, though.
Remember that while the allocation will be still kept alive, the
runtime itself will not be running, as it is dropped when the last
`CassSession` referencing it is dropped.
CPP-Rust Driver since its beginnings has used a default number of IO
threads that is the default of tokio multi-threaded runtime, which is
equal to the number of CPU cores available to the system.
This commit updates the documentation to reflect this behaviour.
The `perf` example has demonstrated an antipattern of using 1 connection
per host, which is not recommended for production use in case of
ScyllaDB. This commit updates the example to use 1 connection per shard,
which is the recommended practice for ScyllaDB.
@wprzytula wprzytula force-pushed the configurable-runtime branch from 994355a to a20c4f0 Compare August 4, 2025 17:32
@wprzytula wprzytula requested a review from Lorak-mmk August 4, 2025 17:33
@wprzytula wprzytula merged commit 96962f7 into scylladb:master Aug 5, 2025
8 of 9 checks passed
@wprzytula wprzytula deleted the configurable-runtime branch August 5, 2025 06:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement cass_cluster_set_num_threads_io
2 participants