Skip to content

Commit d850bf6

Browse files
committed
Implement locking tasks
1 parent 2013ca1 commit d850bf6

File tree

2 files changed

+111
-4
lines changed

2 files changed

+111
-4
lines changed

crates/ark/src/interface.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,8 @@ pub struct RMain {
205205
/// Channel to send and receive tasks from `RTask`s
206206
tasks_interrupt_rx: Receiver<RTask>,
207207
tasks_idle_rx: Receiver<RTask>,
208+
tasks_lock_rx: Receiver<RTask>,
209+
208210
pending_futures: HashMap<Uuid, (BoxFuture<'static, ()>, RTaskStartInfo)>,
209211

210212
/// Channel to communicate requests and events to the frontend
@@ -347,11 +349,12 @@ impl RMain {
347349
};
348350
}
349351

350-
let (tasks_interrupt_rx, tasks_idle_rx) = r_task::take_receivers();
352+
let (tasks_interrupt_rx, tasks_idle_rx, tasks_lock_rx) = r_task::take_receivers();
351353

352354
R_MAIN.set(UnsafeCell::new(RMain::new(
353355
tasks_interrupt_rx,
354356
tasks_idle_rx,
357+
tasks_lock_rx,
355358
comm_manager_tx,
356359
r_request_rx,
357360
stdin_request_tx,
@@ -549,6 +552,7 @@ impl RMain {
549552
pub fn new(
550553
tasks_interrupt_rx: Receiver<RTask>,
551554
tasks_idle_rx: Receiver<RTask>,
555+
tasks_lock_rx: Receiver<RTask>,
552556
comm_manager_tx: Sender<CommManagerEvent>,
553557
r_request_rx: Receiver<RRequest>,
554558
stdin_request_tx: Sender<StdInRequest>,
@@ -579,6 +583,7 @@ impl RMain {
579583
dap: RMainDap::new(dap),
580584
tasks_interrupt_rx,
581585
tasks_idle_rx,
586+
tasks_lock_rx,
582587
pending_futures: HashMap::new(),
583588
session_mode,
584589
positron_ns: None,
@@ -808,6 +813,7 @@ impl RMain {
808813
let stdin_reply_rx = self.stdin_reply_rx.clone();
809814
let kernel_request_rx = self.kernel_request_rx.clone();
810815
let tasks_interrupt_rx = self.tasks_interrupt_rx.clone();
816+
let tasks_lock_rx = self.tasks_lock_rx.clone();
811817
let tasks_idle_rx = self.tasks_idle_rx.clone();
812818

813819
// Process R's polled events regularly while waiting for console input.
@@ -819,6 +825,7 @@ impl RMain {
819825
let stdin_reply_index = select.recv(&stdin_reply_rx);
820826
let kernel_request_index = select.recv(&kernel_request_rx);
821827
let tasks_interrupt_index = select.recv(&tasks_interrupt_rx);
828+
let tasks_lock_index = select.recv(&tasks_lock_rx);
822829
let polled_events_index = select.recv(&polled_events_rx);
823830

824831
// Don't process idle tasks in browser prompts. We currently don't want
@@ -899,6 +906,18 @@ impl RMain {
899906
self.handle_task_interrupt(task);
900907
},
901908

909+
// A lock task woke us up
910+
i if i == tasks_lock_index => {
911+
let task = oper.recv(&tasks_lock_rx).unwrap();
912+
913+
// An async task would defeat the purpose of a locking task
914+
if !matches!(task, RTask::Sync(_)) {
915+
panic!("Expected Sync task");
916+
}
917+
918+
self.handle_task(task);
919+
},
920+
902921
// An idle task woke us up
903922
i if Some(i) == tasks_idle_index => {
904923
let task = oper.recv(&tasks_idle_rx).unwrap();

crates/ark/src/r_task.rs

Lines changed: 91 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ static INTERRUPT_TASKS: LazyLock<TaskChannels> = LazyLock::new(|| TaskChannels::
2727
/// Task channels for idle-time tasks
2828
static IDLE_TASKS: LazyLock<TaskChannels> = LazyLock::new(|| TaskChannels::new());
2929

30+
/// Task channels for lock-time tasks
31+
static LOCK_TASKS: LazyLock<TaskChannels> = LazyLock::new(|| TaskChannels::new());
32+
3033
// Compared to `futures::BoxFuture`, this doesn't require the future to be Send.
3134
// We don't need this bound since the executor runs on only on the R thread
3235
pub(crate) type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
@@ -58,11 +61,15 @@ impl TaskChannels {
5861
}
5962
}
6063

61-
/// Returns receivers for both interrupt and idle tasks.
64+
/// Returns receivers for interrupt, idle, and lock tasks.
6265
/// Initializes the task channels if they haven't been initialized yet.
6366
/// Can only be called once (intended for `RMain` during init).
64-
pub(crate) fn take_receivers() -> (Receiver<RTask>, Receiver<RTask>) {
65-
(INTERRUPT_TASKS.take_rx(), IDLE_TASKS.take_rx())
67+
pub(crate) fn take_receivers() -> (Receiver<RTask>, Receiver<RTask>, Receiver<RTask>) {
68+
(
69+
INTERRUPT_TASKS.take_rx(),
70+
IDLE_TASKS.take_rx(),
71+
LOCK_TASKS.take_rx(),
72+
)
6673
}
6774

6875
pub enum RTask {
@@ -274,6 +281,87 @@ where
274281
return result.lock().unwrap().take().unwrap();
275282
}
276283

284+
pub fn spawn_try_lock<'env, F, T>(f: F) -> Option<T>
285+
where
286+
F: FnOnce() -> T,
287+
F: 'env + Send,
288+
T: 'env + Send,
289+
{
290+
if stdext::IS_TESTING && !RMain::is_initialized() {
291+
let _lock = harp::fixtures::R_TEST_LOCK.lock();
292+
r_test_init();
293+
return Some(f());
294+
}
295+
296+
if RMain::on_main_thread() {
297+
panic!("Can't call `spawn_try_lock()` on main thread");
298+
}
299+
300+
let result = SharedOption::default();
301+
302+
{
303+
let result = Arc::clone(&result);
304+
let closure = move || {
305+
*result.lock().unwrap() = Some(f());
306+
};
307+
308+
let closure: Box<dyn FnOnce() + Send + 'env> = Box::new(closure);
309+
let closure: Box<dyn FnOnce() + Send + 'static> = unsafe { std::mem::transmute(closure) };
310+
311+
// Channel to communicate status of the task/closure
312+
let (status_tx, status_rx) = bounded::<RTaskStatus>(0);
313+
314+
// Send the task to the R thread
315+
let task = RTask::Sync(RTaskSync {
316+
fun: closure,
317+
status_tx: Some(status_tx),
318+
start_info: RTaskStartInfo::new(false),
319+
});
320+
if let Err(_) = LOCK_TASKS.tx().try_send(task) {
321+
return None;
322+
};
323+
324+
// Block until we get the signal that the task has started
325+
let status = status_rx.recv().unwrap();
326+
327+
let RTaskStatus::Started = status else {
328+
let trace = std::backtrace::Backtrace::force_capture();
329+
panic!(
330+
"Task `status` value must be `Started`: {status:?}\n\
331+
Backtrace of calling thread:\n\n
332+
{trace}"
333+
);
334+
};
335+
336+
// Block until task was completed
337+
let status = status_rx.recv().unwrap();
338+
339+
let RTaskStatus::Finished(status) = status else {
340+
let trace = std::backtrace::Backtrace::force_capture();
341+
panic!(
342+
"Task `status` value must be `Finished`: {status:?}\n\
343+
Backtrace of calling thread:\n\n
344+
{trace}"
345+
);
346+
};
347+
348+
// If the task failed send a backtrace of the current thread to the
349+
// main thread
350+
if let Err(err) = status {
351+
let trace = std::backtrace::Backtrace::force_capture();
352+
panic!(
353+
"While running task: {err:?}\n\
354+
Backtrace of calling thread:\n\n\
355+
{trace}"
356+
);
357+
}
358+
}
359+
360+
// Retrieve closure result from the synchronized shared option.
361+
// If we get here without panicking we know the result was assigned.
362+
return Some(result.lock().unwrap().take().unwrap());
363+
}
364+
277365
pub(crate) fn spawn_idle<F, Fut>(fun: F)
278366
where
279367
F: FnOnce() -> Fut + 'static + Send,

0 commit comments

Comments
 (0)