Skip to content

Commit a1ca3f7

Browse files
committed
future: introduce FutureKind
When trying to introduce configurable runtime to the CassFuture framework, I realized that the current implementation of CassFuture is not flexible enough. Namely, those futures that are immediately ready with the result (e.g. futures that are created with `CassFuture::new_ready`) do not have access to the runtime on creation. To provide better type safety, this kind of future was extracted as a separate enum variant of `FutureKind`, with the other variant representing futures that must be resolved by the tokio runtime. The next commit demonstrates how extracting these two kinds of futures into a separate enum allows us to implement the configurable runtime in the CassFuture framework.
1 parent 71b6fac commit a1ca3f7

File tree

1 file changed

+129
-50
lines changed

1 file changed

+129
-50
lines changed

scylla-rust-wrapper/src/future.rs

Lines changed: 129 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use futures::future;
99
use std::future::Future;
1010
use std::mem;
1111
use std::os::raw::c_void;
12+
use std::sync::atomic::AtomicBool;
1213
use std::sync::{Arc, Condvar, Mutex, OnceLock};
1314
use tokio::task::JoinHandle;
1415
use tokio::time::Duration;
@@ -57,25 +58,41 @@ struct CassFutureState {
5758
join_handle: Option<JoinHandle<()>>,
5859
}
5960

60-
pub struct CassFuture {
61+
enum FutureKind {
62+
/// Future that must be resolved by the tokio runtime.
63+
Resolvable { fut: ResolvableFuture },
64+
65+
/// Future that is immediately ready with the result.
66+
Immediate {
67+
res: CassFutureResult,
68+
callback_set: AtomicBool,
69+
},
70+
}
71+
72+
struct ResolvableFuture {
6173
/// Mutable state of the future that requires synchronized exclusive access
6274
/// in order to ensure thread safety of the future execution.
6375
state: Mutex<CassFutureState>,
6476

6577
/// Result of the future once it is resolved.
6678
result: OnceLock<CassFutureResult>,
6779

68-
/// Required as a place to allocate the stringified error message.
69-
/// This is needed to support `cass_future_error_message`.
70-
err_string: OnceLock<String>,
71-
7280
/// Used to notify threads waiting for the future's result.
7381
wait_for_value: Condvar,
7482

7583
#[cfg(cpp_integration_testing)]
7684
recording_listener: Option<Arc<crate::integration_testing::RecordingHistoryListener>>,
7785
}
7886

87+
pub struct CassFuture {
88+
/// One of the possible implementations of the future.
89+
kind: FutureKind,
90+
91+
/// Required as a place to allocate the stringified error message.
92+
/// This is needed to support `cass_future_error_message`.
93+
err_string: OnceLock<String>,
94+
}
95+
7996
impl FFI for CassFuture {
8097
type Origin = FromArc;
8198
}
@@ -116,19 +133,30 @@ impl CassFuture {
116133
>,
117134
) -> Arc<CassFuture> {
118135
let cass_fut = Arc::new(CassFuture {
119-
state: Mutex::new(Default::default()),
120-
result: OnceLock::new(),
121136
err_string: OnceLock::new(),
122-
wait_for_value: Condvar::new(),
123-
#[cfg(cpp_integration_testing)]
124-
recording_listener,
137+
kind: FutureKind::Resolvable {
138+
fut: ResolvableFuture {
139+
state: Mutex::new(Default::default()),
140+
result: OnceLock::new(),
141+
wait_for_value: Condvar::new(),
142+
#[cfg(cpp_integration_testing)]
143+
recording_listener,
144+
},
145+
},
125146
});
126147
let cass_fut_clone = Arc::clone(&cass_fut);
127148
let join_handle = RUNTIME.spawn(async move {
149+
let resolvable_fut = match cass_fut_clone.kind {
150+
FutureKind::Resolvable {
151+
fut: ref resolvable,
152+
} => resolvable,
153+
_ => unreachable!("CassFuture has been created as Resolvable"),
154+
};
155+
128156
let r = fut.await;
129157
let maybe_cb = {
130-
let guard = cass_fut_clone.state.lock().unwrap();
131-
cass_fut_clone
158+
let guard = resolvable_fut.state.lock().unwrap();
159+
resolvable_fut
132160
.result
133161
.set(r)
134162
.expect("Tried to resolve future result twice!");
@@ -144,23 +172,28 @@ impl CassFuture {
144172
bound_cb.invoke(fut_ptr);
145173
}
146174

147-
cass_fut_clone.wait_for_value.notify_all();
175+
resolvable_fut.wait_for_value.notify_all();
148176
});
149177
{
150-
let mut lock = cass_fut.state.lock().unwrap();
178+
let resolvable_fut = match cass_fut.kind {
179+
FutureKind::Resolvable {
180+
fut: ref resolvable,
181+
} => resolvable,
182+
_ => unreachable!("CassFuture has been created as Resolvable"),
183+
};
184+
let mut lock = resolvable_fut.state.lock().unwrap();
151185
lock.join_handle = Some(join_handle);
152186
}
153187
cass_fut
154188
}
155189

156-
pub(crate) fn new_ready(r: CassFutureResult) -> Arc<Self> {
190+
pub(crate) fn new_ready(res: CassFutureResult) -> Arc<Self> {
157191
Arc::new(CassFuture {
158-
state: Mutex::new(CassFutureState::default()),
159-
result: OnceLock::from(r),
192+
kind: FutureKind::Immediate {
193+
res,
194+
callback_set: AtomicBool::new(false),
195+
},
160196
err_string: OnceLock::new(),
161-
wait_for_value: Condvar::new(),
162-
#[cfg(cpp_integration_testing)]
163-
recording_listener: None,
164197
})
165198
}
166199

@@ -180,15 +213,23 @@ impl CassFuture {
180213
/// timed out (see [CassFuture::waited_result_timed]). We need to
181214
/// take the ownership of the handle, and complete the work.
182215
pub(crate) fn waited_result(&self) -> &CassFutureResult {
216+
let resolvable_fut = match self.kind {
217+
FutureKind::Resolvable {
218+
fut: ref resolvable_fut,
219+
} => resolvable_fut,
220+
// The future is immediately ready, so we can return the result.
221+
FutureKind::Immediate { ref res, .. } => return res,
222+
};
223+
183224
// Happy path: if the result is already available, we can return it
184225
// without locking the Mutex.
185-
if let Some(result) = self.result.get() {
226+
if let Some(result) = resolvable_fut.result.get() {
186227
return result;
187228
}
188229

189-
let mut guard = self.state.lock().unwrap();
230+
let mut guard = resolvable_fut.state.lock().unwrap();
190231
loop {
191-
if let Some(result) = self.result.get() {
232+
if let Some(result) = resolvable_fut.result.get() {
192233
// The result is already available, we can return it.
193234
return result;
194235
}
@@ -202,11 +243,11 @@ impl CassFuture {
202243

203244
// Once we are here, the future is resolved.
204245
// The result is guaranteed to be set.
205-
return self.result.get().unwrap();
246+
return resolvable_fut.result.get().unwrap();
206247
} else {
207248
// Someone has taken the handle, so we need to wait for them to complete
208249
// the future. Once they finish or timeout, we will be notified.
209-
guard = self
250+
guard = resolvable_fut
210251
.wait_for_value
211252
.wait_while(guard, |state| {
212253
// There are two cases when we should wake up:
@@ -220,7 +261,7 @@ impl CassFuture {
220261
// no one else will complete the future, so it's our responsibility.
221262
// In the next iteration we will land in the branch with `block_on`
222263
// and complete the future.
223-
self.result.get().is_none() && state.join_handle.is_none()
264+
resolvable_fut.result.get().is_none() && state.join_handle.is_none()
224265
})
225266
// unwrap: Error appears only when mutex is poisoned.
226267
.unwrap();
@@ -250,19 +291,27 @@ impl CassFuture {
250291
&self,
251292
timeout_duration: Duration,
252293
) -> Result<&CassFutureResult, FutureError> {
294+
let resolvable_fut = match self.kind {
295+
FutureKind::Resolvable {
296+
fut: ref resolvable_fut,
297+
} => resolvable_fut,
298+
// The future is immediately ready, so we can return the result.
299+
FutureKind::Immediate { ref res, .. } => return Ok(res),
300+
};
301+
253302
// Happy path: if the result is already available, we can return it
254303
// without locking the Mutex.
255-
if let Some(result) = self.result.get() {
304+
if let Some(result) = resolvable_fut.result.get() {
256305
return Ok(result);
257306
}
258307

259-
let mut guard = self.state.lock().unwrap();
308+
let mut guard = resolvable_fut.state.lock().unwrap();
260309
let deadline = tokio::time::Instant::now()
261310
.checked_add(timeout_duration)
262311
.ok_or(FutureError::InvalidDuration)?;
263312

264313
loop {
265-
if let Some(result) = self.result.get() {
314+
if let Some(result) = resolvable_fut.result.get() {
266315
// The result is already available, we can return it.
267316
return Ok(result);
268317
}
@@ -293,9 +342,9 @@ impl CassFuture {
293342
// - Signal one thread, so that if all other consumers are
294343
// already waiting on condvar, one of them wakes up and
295344
// picks up the work.
296-
guard = self.state.lock().unwrap();
345+
guard = resolvable_fut.state.lock().unwrap();
297346
guard.join_handle = Some(returned_handle);
298-
self.wait_for_value.notify_one();
347+
resolvable_fut.wait_for_value.notify_one();
299348
return Err(FutureError::TimeoutError);
300349
}
301350
// unwrap: JoinError appears only when future either panic'ed or canceled.
@@ -304,14 +353,14 @@ impl CassFuture {
304353

305354
// Once we are here, the future is resolved.
306355
// The result is guaranteed to be set.
307-
return Ok(self.result.get().unwrap());
356+
return Ok(resolvable_fut.result.get().unwrap());
308357
}
309358
};
310359
} else {
311360
// Someone has taken the handle, so we need to wait for them to complete
312361
// the future. Once they finish or timeout, we will be notified.
313362
let remaining_timeout = deadline.duration_since(tokio::time::Instant::now());
314-
let (guard_result, timeout_result) = self
363+
let (guard_result, timeout_result) = resolvable_fut
315364
.wait_for_value
316365
.wait_timeout_while(guard, remaining_timeout, |state| {
317366
// There are two cases when we should wake up:
@@ -325,7 +374,7 @@ impl CassFuture {
325374
// no one else will complete the future, so it's our responsibility.
326375
// In the next iteration we will land in the branch with `block_on`
327376
// and attempt to complete the future.
328-
self.result.get().is_none() && state.join_handle.is_none()
377+
resolvable_fut.result.get().is_none() && state.join_handle.is_none()
329378
})
330379
// unwrap: Error appears only when mutex is poisoned.
331380
.unwrap();
@@ -338,7 +387,26 @@ impl CassFuture {
338387
}
339388
}
340389

341-
pub(crate) unsafe fn set_callback(
390+
pub(crate) fn into_raw(self: Arc<Self>) -> CassOwnedSharedPtr<Self, CMut> {
391+
ArcFFI::into_ptr(self)
392+
}
393+
394+
#[cfg(cpp_integration_testing)]
395+
pub(crate) fn attempted_hosts(&self) -> Vec<std::net::SocketAddr> {
396+
if let FutureKind::Resolvable {
397+
fut: ref resolvable_fut,
398+
} = self.kind
399+
&& let Some(listener) = &resolvable_fut.recording_listener
400+
{
401+
listener.get_attempted_hosts()
402+
} else {
403+
vec![]
404+
}
405+
}
406+
}
407+
408+
impl ResolvableFuture {
409+
unsafe fn set_callback(
342410
&self,
343411
self_ptr: CassBorrowedSharedPtr<CassFuture, CMut>,
344412
cb: NonNullFutureCallback,
@@ -368,19 +436,6 @@ impl CassFuture {
368436
}
369437
CassError::CASS_OK
370438
}
371-
372-
pub(crate) fn into_raw(self: Arc<Self>) -> CassOwnedSharedPtr<Self, CMut> {
373-
ArcFFI::into_ptr(self)
374-
}
375-
376-
#[cfg(cpp_integration_testing)]
377-
pub(crate) fn attempted_hosts(&self) -> Vec<std::net::SocketAddr> {
378-
if let Some(listener) = &self.recording_listener {
379-
listener.get_attempted_hosts()
380-
} else {
381-
vec![]
382-
}
383-
}
384439
}
385440

386441
// Do not remove; this asserts that `CassFuture` implements Send + Sync,
@@ -405,7 +460,26 @@ pub unsafe extern "C" fn cass_future_set_callback(
405460
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
406461
};
407462

408-
unsafe { future.set_callback(future_raw.borrow(), callback, data) }
463+
match future.kind {
464+
FutureKind::Resolvable {
465+
fut: ref resolvable,
466+
} => {
467+
// Safety: `callback` is a valid pointer to a function that matches the signature.
468+
unsafe { resolvable.set_callback(future_raw.borrow(), callback, data) }
469+
}
470+
FutureKind::Immediate {
471+
ref callback_set, ..
472+
} => {
473+
if callback_set.swap(true, std::sync::atomic::Ordering::Relaxed) {
474+
// Another callback has been already set.
475+
return CassError::CASS_ERROR_LIB_CALLBACK_ALREADY_SET;
476+
}
477+
478+
let bound_cb = BoundCallback { cb: callback, data };
479+
bound_cb.invoke(future_raw.borrow());
480+
CassError::CASS_OK
481+
}
482+
}
409483
}
410484

411485
#[unsafe(no_mangle)]
@@ -442,7 +516,12 @@ pub unsafe extern "C" fn cass_future_ready(
442516
return cass_false;
443517
};
444518

445-
future.result.get().is_some() as cass_bool_t
519+
(match future.kind {
520+
FutureKind::Resolvable {
521+
fut: ref resolvable_fut,
522+
} => resolvable_fut.result.get().is_some(),
523+
FutureKind::Immediate { .. } => true,
524+
}) as cass_bool_t
446525
}
447526

448527
#[unsafe(no_mangle)]

0 commit comments

Comments
 (0)