-
Notifications
You must be signed in to change notification settings - Fork 13
session: fix closing semantics #328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
fead714
to
b477893
Compare
b477893
to
e3c2b0a
Compare
Rebased on master. |
e3c2b0a
to
1c31e9c
Compare
Reprioritized to P0 due to relation to #329. |
8b3237b
to
41c4046
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I found a serious error - lmk if I'm right.
scylla-rust-wrapper/src/session.rs
Outdated
let prev = cass_session.connected.compare_and_swap( | ||
&None::<Arc<CassConnectedSession>>, | ||
Some(Arc::new(CassConnectedSession { | ||
session, | ||
exec_profile_map, | ||
})), | ||
); | ||
if prev.is_some() { | ||
return Err(error()); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit sad that if there are multiple connection attempts, all will happen. Peviously others woul wait on lock.
I don't think it is an important scenario, so it is acceptable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The caller is responsible for not connecting session concurrently.
scylla-rust-wrapper/src/session.rs
Outdated
// We start by setting the Notify, so that we won't lose a wakeup | ||
// if the last request finishes before we set it. | ||
cass_session_connected | ||
.requests_finished_notify | ||
.set(Notify::new()) | ||
.expect( | ||
"The swap guarantees that only one thread takes the connected session. \ | ||
And this should never be an issue IRL, because session should be closed \ | ||
from a single thread, only once", | ||
); | ||
|
||
let fut = async move { | ||
// TODO: add waiting for the pending requests to finish. | ||
let _ = cass_session_connected; | ||
while cass_session_connected.has_pending_requests() { | ||
// Wait for all pending requests to finish. | ||
// This will block until the last request finishes and calls `requests_finished_notify.notify_one()`. | ||
cass_session_connected | ||
.requests_finished_notify | ||
.get() | ||
.expect("We have initialized the OnceLock prior") | ||
.notified() | ||
.await; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the following race possible?
- No requests are pending
- Thread 1 starts
cass_session_execute
. performs load_full on session. - Thread 2 performs
cass_session_free
- Thread 2 sets notify, checks that session has no pending requests, finishes
- Thread 1 pends a request and performs it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I forgot to reason about multi-threaded concurrency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the described scenario can be fixed by a "check-set-confirm" (my own crafted name) operation, by requiring the session to not be closed yet after the request is pended. What do I mean?
- Thread 1 starts
cass_session_execute
. It performs load_full on session. - Thread 2 performs
cass_session_free
- Thread 2 sets notify, checks that session has no pending requests, finishes
- Thread 1 pends a request.
- (the changed part) Thread 1 verifies that the session is still open. If it is, it performs the request. If it is not, it unpends the request and returns an error ("session has been closed").
Why is this correct?
cass_session_close
's future must wait for running requests. It's not clear who was first - session closing or a pended request - in a parallel scenario. Then, I believe we are free to refuse starting the request that encountered a closed session after it has been pended.
Note: we must prevent the ABA problem, i.e., distinguish the old session from the newly connected, different session.
- I believe this can be done by merely using
Arc::ptr_eq
on the old and the newArc<CassConnectedSession>
. - If you see a way to break this, we could always use a static
usize
counter that will be unique for every connected session.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pushed a commit which I believe solves the problem. Please verify it.
41c4046
to
e848d07
Compare
4a93ded
to
5d0b098
Compare
@Lorak-mmk I've redone this in a massively simplified way, leveraging the existing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refines session closing semantics to ensure all in-flight requests complete before freeing, streamlines future creation, updates the Tokio runtime to multi-threaded, and fully enables the AsyncTests::Close
suite with associated C++ and Rust tests.
- Introduce
make_ready_raw()
for simpler ready-future creation and refactor async session operations to use non-blockingtry_read
/write_owned
locks. - Update the global Tokio runtime to a multi-threaded builder and adjust tests to validate session-free blocking behavior.
- Enable and tune the
AsyncTests
suite, add sleep delays in C++ async tests, and implement a Rust integration test forcass_session_free
.
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
tests/src/integration/tests/test_async.cpp | Reduced concurrent request count to 5 and added insert.set_sleep_time(100) to force overlap in close test. |
scylla-rust-wrapper/src/session.rs | Refactored session connect/execute/close calls to use try_read_owned /write_owned , added null/disconnect checks. |
scylla-rust-wrapper/src/lib.rs | Changed RUNTIME to use a multi-threaded Tokio runtime with 2 worker threads. |
scylla-rust-wrapper/src/future.rs | Added make_ready_raw() helper and removed an unused import. |
Makefile | Enabled AsyncTests.* filter for both Scylla and Cassandra test targets. |
5d0b098
to
6ee294a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks really good now! I just left a few minor comments.
|
||
fn connect( | ||
session: Arc<RwLock<CassSessionInner>>, | ||
session: Arc<CassSession>, | ||
cluster: &CassCluster, | ||
keyspace: Option<String>, | ||
) -> CassOwnedSharedPtr<CassFuture, CMut> { | ||
let session_builder = cluster.build_session_builder(); | ||
let exec_profile_map = cluster.execution_profile_map().clone(); | ||
let host_filter = cluster.build_host_filter(); | ||
|
||
let mut session_guard = RUNTIME.block_on(session.write_owned()); | ||
|
||
if let Some(cluster_client_id) = cluster.get_client_id() { | ||
// If the user set a client id, use it instead of the random one. | ||
session_guard.client_id = cluster_client_id; | ||
} | ||
let cluster_client_id = cluster.get_client_id(); | ||
|
||
let fut = Self::connect_fut( | ||
session_guard, | ||
session, | ||
session_builder, | ||
cluster_client_id, | ||
exec_profile_map, | ||
host_filter, | ||
keyspace, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like the new approach. The only thing I don't yet understand is why do we need to revert the approach to handling client id. connect is not something that would be used in callbacks, right? In which case it should be fine to use block_on here. What do I miss?
I'm asking because I know you wanted client id to work synchronously, so it is surprising that you abandoned the idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
connect is not something that would be used in callbacks, right?
I won't assume so.
I'm asking because I know you wanted client id to work synchronously, so it is surprising that you abandoned the idea.
I haven't found a way to make it work with the RwLock
, while retaining full API usability in callbacks.
6ee294a
to
afb9683
Compare
Addressed the comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Please remember to remove the TMP commit before merging!
I'd be perfectly ok with cass_session_connect not working in callbacks (and it would give us the desired semantics for client id), but I leave that up to you.
By accident, the code was not deduplicated to use the `close_fut` function.
The function mistakenly had the `one` part of its name duplicated.
As this set of rules is going to be useful for more than one test, it makes sense to make it a separate function.
The mechanism around locking the session has been problematic in the number of ways. **1. Late locking**: The primary issue was that requests would not lock lock the session for reading until their futures were polled, which meant that the following code could lead to request failure due to session having been closed: ```c CassFuture *exec_fut = cass_session_execute(cass_session, cass_statement); CassFuture *close_fut = cass_session_close(cass_session); cass_future_wait(exec_fut); cass_future_wait(close_fut); ``` This is because locking the session for reading is done asynchronously to the code that follows `cass_session_execute` invocation. The same issue was present in all request-making functions, that is, `cass_session_execute`, `cass_session_execute_batch`, and all of the `cass_session_prepare*` family of functions. **2. Thread-blocking locking**: The second issue was that, in some cases, the session was locked for reading by blocking the current thread idly. This was the case for `cass_session_get_metrics`, `cass_session_get_schema_meta`, and `cass_session_get_client_id`. This could lead to deadlocks, especially when the number of threads in the thread pool were low (with `current_thread` tokio executor being the most vulnerable case). **3. Runtime-blocking locking**: The third issue was that, in the case of `cass_session_connect*` functions, the session was locked for writing by blocking the current thread as the executor thread for the awaited future. While this was designed with the `current_thread` executor in mind and worked perfectly for its case, it showed to cause panic when called by a tokio executor thread. **Solution**: This commit addresses all of the above issues by adopting an asymmetric locking mechanism for the session. The session is now locked for reading in advance yet fallibly, by calling `try_read(_owned)` on the session rwlock. This is done in the request-making functions, so that the session is guaranteed to be locked for reading when the request future is returned. The session is still locked for writing (upon connecting or closing) asynchronously, by calling and awaiting `write(_owned)` on the rwlock. This is done in the `cass_session_connect*` and `cass_session_close`. A downside of this approach is that the session is not guaranteed to be locked for writing when the `cass_session_connect*` or `cass_session_close` futures are returned, but this is not a problem because closing and connecting are considered to be "long-running", complex operations and thus are not expected to have conducted a specific part of their logic by the time their future is returned. **Results**: All enabled tests still pass, while the `callbacks` example now passes, too! This best part is that the number and complexity of the required changes is minimal, and the code is now much more robust. I hope @Lorak-mmk will be happy with this solution, as compared to the complex requests pending mechanism and atomics. Note that the test for `cass_session_get_client_id` had to be adjusted. This is because the session has the client ID set only in the connect future instead of in the connect function synchronously, so the test (which did not await the connect future) would fail after the changes.
Since the commit c1e40d7, the Session `execute(_batch)` methods now clone the Session's Arc, which prevents UAF if the Session is closed while the requests are still running. That commit's message says: "we cannot enable `AsyncTests::Close` yet since it expects that prematurely dropped session awaits all async tasks before closing". This is now taken care of by the previous commits. Thus, we can enable the `AsyncTests::Close` test suite.
This tests that `cass_session_free` synchronously waits for all in-flight requests to complete before freeing the session.
afb9683
to
42d7cc4
Compare
Rebased on master and dropped the temporary commit. |
Note: generated with GPT-4o and manually redacted.
Fix Session Closing Semantics and Enable
AsyncTests
SuiteSummary
This pull request introduces critical improvements to the session closing mechanism. Additionally, it enables the
AsyncTests::Close
test suite, aligns the behavior with the expectations of the CPP Driver, and satisfies the contract defined in thecassandra.h
documentation regardingcass_session_free()
andcass_session_close()
.Key Changes
Empirical Proof of Flaws in Current Implementation:
913a7c2b
) was introduced to empirically demonstrate flaws in the current implementation. TheAsyncTests::Close
test was tuned to fail by increasing concurrent requests, adding sleep times, and switching to a multi-threaded runtime with a hardcoded number of worker threads. This highlights issues with session closure concurrent to running requests.Simplified Future Logic:
CassFuture::make_ready_raw()
to streamline the creation of ready futures.Improved Session Closing Logic:
RwLock
mechanism now ensures that the session is protected from premature drops by synchronously taking a read lock for all running requests. This guarantees thatcass_session_close()
andcass_session_free()
block until all requests are completed, aligning with the expectations of theAsyncTests::Close
suite. The synchronous taking of the lock is done nonblockingly, which prevents the issues described in Panic whencass_future_error_code()
is called from a future callback #329.RwLock
) that all in-flight requests are completed before the session is freed. This prevents potential data loss or inconsistencies during session closure.Enabled
AsyncTests::Close
Suite:Session::execute(_batch)
methods has already been cloning the Session'sArc
, preventing use-after-free (UAF) scenarios when the session is closed while requests are still running [introduced in c1e40d7].Implemented a unit test for
cass_session_free
:Notes to reviewers
AsyncTests::Close
test's semantics are satisfied.current_thread
runtime.913a7c2b
) is removed before merging.Fixes: #304
Fixes: #329
Pre-review checklist
Makefile
in{SCYLLA,CASSANDRA}_(NO_VALGRIND_)TEST_FILTER
.Fixes:
annotations to PR description.