Skip to content

Commit e5adc37

Browse files
committed
future: introduce FutureKind
When trying to introduce configurable runtime to the CassFuture framework, I realized that the current implementation of CassFuture is not flexible enough. Namely, those futures that are immediately ready with the result (e.g. futures that are created with `CassFuture::new_ready`) do not have access to the runtime on creation. To provide better type safety, this kind of future was extracted as a separate enum variant of `FutureKind`, with the other variant representing futures that must be resolved by the tokio runtime. The next commit demonstrates how extracting these two kinds of futures into a separate enum allows us to implement the configurable runtime in the CassFuture framework.
1 parent f3bc83d commit e5adc37

File tree

1 file changed

+98
-49
lines changed

1 file changed

+98
-49
lines changed

scylla-rust-wrapper/src/future.rs

Lines changed: 98 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -59,25 +59,38 @@ struct CassFutureState {
5959
join_handle: Option<JoinHandle<()>>,
6060
}
6161

62-
pub struct CassFuture {
62+
enum FutureKind {
63+
/// Future that must be resolved by the tokio runtime.
64+
Resolvable(ResolvableFuture),
65+
66+
/// Future that is immediately ready with the result.
67+
Immediate(CassFutureResult),
68+
}
69+
70+
struct ResolvableFuture {
6371
/// Mutable state of the future that requires synchronized exclusive access
6472
/// in order to ensure thread safety of the future execution.
6573
state: Mutex<CassFutureState>,
6674

6775
/// Result of the future once it is resolved.
6876
result: OnceLock<CassFutureResult>,
6977

70-
/// Required as a place to allocate the stringified error message.
71-
/// This is needed to support `cass_future_error_message`.
72-
err_string: OnceLock<String>,
73-
7478
/// Used to notify threads waiting for the future's result.
7579
wait_for_value: Condvar,
7680

7781
#[cfg(cpp_integration_testing)]
7882
recording_listener: Option<Arc<crate::integration_testing::RecordingHistoryListener>>,
7983
}
8084

85+
pub struct CassFuture {
86+
/// One of the possible implementations of the future.
87+
kind: FutureKind,
88+
89+
/// Required as a place to allocate the stringified error message.
90+
/// This is needed to support `cass_future_error_message`.
91+
err_string: OnceLock<String>,
92+
}
93+
8194
impl FFI for CassFuture {
8295
type Origin = FromArc;
8396
}
@@ -118,19 +131,26 @@ impl CassFuture {
118131
>,
119132
) -> Arc<CassFuture> {
120133
let cass_fut = Arc::new(CassFuture {
121-
state: Mutex::new(Default::default()),
122-
result: OnceLock::new(),
123134
err_string: OnceLock::new(),
124-
wait_for_value: Condvar::new(),
125-
#[cfg(cpp_integration_testing)]
126-
recording_listener,
135+
kind: FutureKind::Resolvable(ResolvableFuture {
136+
state: Mutex::new(Default::default()),
137+
result: OnceLock::new(),
138+
wait_for_value: Condvar::new(),
139+
#[cfg(cpp_integration_testing)]
140+
recording_listener,
141+
}),
127142
});
128143
let cass_fut_clone = Arc::clone(&cass_fut);
129144
let join_handle = RUNTIME.spawn(async move {
145+
let resolvable_fut = match cass_fut_clone.kind {
146+
FutureKind::Resolvable(ref resolvable) => resolvable,
147+
_ => unreachable!("CassFuture has been created as Resolvable"),
148+
};
149+
130150
let r = fut.await;
131151
let maybe_cb = {
132-
let mut guard = cass_fut_clone.state.lock().unwrap();
133-
cass_fut_clone
152+
let mut guard = resolvable_fut.state.lock().unwrap();
153+
resolvable_fut
134154
.result
135155
.set(r)
136156
.expect("Tried to resolve future result twice!");
@@ -143,23 +163,23 @@ impl CassFuture {
143163
bound_cb.invoke(fut_ptr);
144164
}
145165

146-
cass_fut_clone.wait_for_value.notify_all();
166+
resolvable_fut.wait_for_value.notify_all();
147167
});
148168
{
149-
let mut lock = cass_fut.state.lock().unwrap();
169+
let resolvable_fut = match cass_fut.kind {
170+
FutureKind::Resolvable(ref resolvable) => resolvable,
171+
_ => unreachable!("CassFuture has been created as Resolvable"),
172+
};
173+
let mut lock = resolvable_fut.state.lock().unwrap();
150174
lock.join_handle = Some(join_handle);
151175
}
152176
cass_fut
153177
}
154178

155179
pub(crate) fn new_ready(r: CassFutureResult) -> Arc<Self> {
156180
Arc::new(CassFuture {
157-
state: Mutex::new(CassFutureState::default()),
158-
result: OnceLock::from(r),
181+
kind: FutureKind::Immediate(r),
159182
err_string: OnceLock::new(),
160-
wait_for_value: Condvar::new(),
161-
#[cfg(cpp_integration_testing)]
162-
recording_listener: None,
163183
})
164184
}
165185

@@ -179,15 +199,21 @@ impl CassFuture {
179199
/// timed out (see [CassFuture::waited_result_timed]). We need to
180200
/// take the ownership of the handle, and complete the work.
181201
pub(crate) fn waited_result<'s>(&'s self) -> &'s CassFutureResult {
202+
let resolvable_fut = match self.kind {
203+
FutureKind::Resolvable(ref resolvable_fut) => resolvable_fut,
204+
// The future is immediately ready, so we can return the result.
205+
FutureKind::Immediate(ref r) => return r,
206+
};
207+
182208
// Happy path: if the result is already available, we can return it
183209
// without locking the Mutex.
184-
if let Some(result) = self.result.get() {
210+
if let Some(result) = resolvable_fut.result.get() {
185211
return result;
186212
}
187213

188-
let mut guard = self.state.lock().unwrap();
214+
let mut guard = resolvable_fut.state.lock().unwrap();
189215
loop {
190-
if let Some(result) = self.result.get() {
216+
if let Some(result) = resolvable_fut.result.get() {
191217
// The result is already available, we can return it.
192218
return result;
193219
}
@@ -201,11 +227,11 @@ impl CassFuture {
201227

202228
// Once we are here, the future is resolved.
203229
// The result is guaranteed to be set.
204-
return self.result.get().unwrap();
230+
return resolvable_fut.result.get().unwrap();
205231
} else {
206232
// Someone has taken the handle, so we need to wait for them to complete
207233
// the future. Once they finish or timeout, we will be notified.
208-
guard = self
234+
guard = resolvable_fut
209235
.wait_for_value
210236
.wait_while(guard, |state| {
211237
// There are two cases when we should wake up:
@@ -219,7 +245,7 @@ impl CassFuture {
219245
// no one else will complete the future, so it's our responsibility.
220246
// In the next iteration we will land in the branch with `block_on`
221247
// and complete the future.
222-
self.result.get().is_none() && state.join_handle.is_none()
248+
resolvable_fut.result.get().is_none() && state.join_handle.is_none()
223249
})
224250
// unwrap: Error appears only when mutex is poisoned.
225251
.unwrap();
@@ -249,19 +275,25 @@ impl CassFuture {
249275
&'s self,
250276
timeout_duration: Duration,
251277
) -> Result<&'s CassFutureResult, FutureError> {
278+
let resolvable_fut = match self.kind {
279+
FutureKind::Resolvable(ref resolvable_fut) => resolvable_fut,
280+
// The future is immediately ready, so we can return the result.
281+
FutureKind::Immediate(ref r) => return Ok(r),
282+
};
283+
252284
// Happy path: if the result is already available, we can return it
253285
// without locking the Mutex.
254-
if let Some(result) = self.result.get() {
286+
if let Some(result) = resolvable_fut.result.get() {
255287
return Ok(result);
256288
}
257289

258-
let mut guard = self.state.lock().unwrap();
290+
let mut guard = resolvable_fut.state.lock().unwrap();
259291
let deadline = tokio::time::Instant::now()
260292
.checked_add(timeout_duration)
261293
.ok_or(FutureError::InvalidDuration)?;
262294

263295
loop {
264-
if let Some(result) = self.result.get() {
296+
if let Some(result) = resolvable_fut.result.get() {
265297
// The result is already available, we can return it.
266298
return Ok(result);
267299
}
@@ -292,9 +324,9 @@ impl CassFuture {
292324
// - Signal one thread, so that if all other consumers are
293325
// already waiting on condvar, one of them wakes up and
294326
// picks up the work.
295-
guard = self.state.lock().unwrap();
327+
guard = resolvable_fut.state.lock().unwrap();
296328
guard.join_handle = Some(returned_handle);
297-
self.wait_for_value.notify_one();
329+
resolvable_fut.wait_for_value.notify_one();
298330
return Err(FutureError::TimeoutError);
299331
}
300332
// unwrap: JoinError appears only when future either panic'ed or canceled.
@@ -303,14 +335,14 @@ impl CassFuture {
303335

304336
// Once we are here, the future is resolved.
305337
// The result is guaranteed to be set.
306-
return Ok(self.result.get().unwrap());
338+
return Ok(resolvable_fut.result.get().unwrap());
307339
}
308340
};
309341
} else {
310342
// Someone has taken the handle, so we need to wait for them to complete
311343
// the future. Once they finish or timeout, we will be notified.
312344
let remaining_timeout = deadline.duration_since(tokio::time::Instant::now());
313-
let (guard_result, timeout_result) = self
345+
let (guard_result, timeout_result) = resolvable_fut
314346
.wait_for_value
315347
.wait_timeout_while(guard, remaining_timeout, |state| {
316348
// There are two cases when we should wake up:
@@ -324,7 +356,7 @@ impl CassFuture {
324356
// no one else will complete the future, so it's our responsibility.
325357
// In the next iteration we will land in the branch with `block_on`
326358
// and attempt to complete the future.
327-
self.result.get().is_none() && state.join_handle.is_none()
359+
resolvable_fut.result.get().is_none() && state.join_handle.is_none()
328360
})
329361
// unwrap: Error appears only when mutex is poisoned.
330362
.unwrap();
@@ -337,7 +369,24 @@ impl CassFuture {
337369
}
338370
}
339371

340-
pub(crate) unsafe fn set_callback(
372+
pub(crate) fn into_raw(self: Arc<Self>) -> CassOwnedSharedPtr<Self, CMut> {
373+
ArcFFI::into_ptr(self)
374+
}
375+
376+
#[cfg(cpp_integration_testing)]
377+
pub(crate) fn attempted_hosts(&self) -> Vec<std::net::SocketAddr> {
378+
if let FutureKind::Resolvable(ref resolvable_fut) = self.kind
379+
&& let Some(listener) = &resolvable_fut.recording_listener
380+
{
381+
listener.get_attempted_hosts()
382+
} else {
383+
vec![]
384+
}
385+
}
386+
}
387+
388+
impl ResolvableFuture {
389+
unsafe fn set_callback(
341390
&self,
342391
self_ptr: CassBorrowedSharedPtr<CassFuture, CMut>,
343392
cb: NonNullFutureCallback,
@@ -359,19 +408,6 @@ impl CassFuture {
359408
lock.callback = Some(bound_cb);
360409
CassError::CASS_OK
361410
}
362-
363-
pub(crate) fn into_raw(self: Arc<Self>) -> CassOwnedSharedPtr<Self, CMut> {
364-
ArcFFI::into_ptr(self)
365-
}
366-
367-
#[cfg(cpp_integration_testing)]
368-
pub(crate) fn attempted_hosts(&self) -> Vec<std::net::SocketAddr> {
369-
if let Some(listener) = &self.recording_listener {
370-
listener.get_attempted_hosts()
371-
} else {
372-
vec![]
373-
}
374-
}
375411
}
376412

377413
// Do not remove; this asserts that `CassFuture` implements Send + Sync,
@@ -396,7 +432,17 @@ pub unsafe extern "C" fn cass_future_set_callback(
396432
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
397433
};
398434

399-
unsafe { future.set_callback(future_raw.borrow(), callback, data) }
435+
match future.kind {
436+
FutureKind::Resolvable(ref resolvable) => {
437+
// Safety: `callback` is a valid pointer to a function that matches the signature.
438+
unsafe { resolvable.set_callback(future_raw.borrow(), callback, data) }
439+
}
440+
FutureKind::Immediate(_) => {
441+
let bound_cb = BoundCallback { cb: callback, data };
442+
bound_cb.invoke(future_raw.borrow());
443+
CassError::CASS_OK
444+
}
445+
}
400446
}
401447

402448
#[unsafe(no_mangle)]
@@ -433,7 +479,10 @@ pub unsafe extern "C" fn cass_future_ready(
433479
return cass_false;
434480
};
435481

436-
future.result.get().is_some() as cass_bool_t
482+
(match future.kind {
483+
FutureKind::Resolvable(ref resolvable_fut) => resolvable_fut.result.get().is_some(),
484+
FutureKind::Immediate(_) => true,
485+
}) as cass_bool_t
437486
}
438487

439488
#[unsafe(no_mangle)]

0 commit comments

Comments
 (0)