Skip to content

Commit 6a433c2

Browse files
committed
Revert "Implement locking tasks"
This reverts commit 9a23de5.
1 parent d850bf6 commit 6a433c2

File tree

2 files changed

+4
-111
lines changed

2 files changed

+4
-111
lines changed

crates/ark/src/interface.rs

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,6 @@ pub struct RMain {
205205
/// Channel to send and receive tasks from `RTask`s
206206
tasks_interrupt_rx: Receiver<RTask>,
207207
tasks_idle_rx: Receiver<RTask>,
208-
tasks_lock_rx: Receiver<RTask>,
209-
210208
pending_futures: HashMap<Uuid, (BoxFuture<'static, ()>, RTaskStartInfo)>,
211209

212210
/// Channel to communicate requests and events to the frontend
@@ -349,12 +347,11 @@ impl RMain {
349347
};
350348
}
351349

352-
let (tasks_interrupt_rx, tasks_idle_rx, tasks_lock_rx) = r_task::take_receivers();
350+
let (tasks_interrupt_rx, tasks_idle_rx) = r_task::take_receivers();
353351

354352
R_MAIN.set(UnsafeCell::new(RMain::new(
355353
tasks_interrupt_rx,
356354
tasks_idle_rx,
357-
tasks_lock_rx,
358355
comm_manager_tx,
359356
r_request_rx,
360357
stdin_request_tx,
@@ -552,7 +549,6 @@ impl RMain {
552549
pub fn new(
553550
tasks_interrupt_rx: Receiver<RTask>,
554551
tasks_idle_rx: Receiver<RTask>,
555-
tasks_lock_rx: Receiver<RTask>,
556552
comm_manager_tx: Sender<CommManagerEvent>,
557553
r_request_rx: Receiver<RRequest>,
558554
stdin_request_tx: Sender<StdInRequest>,
@@ -583,7 +579,6 @@ impl RMain {
583579
dap: RMainDap::new(dap),
584580
tasks_interrupt_rx,
585581
tasks_idle_rx,
586-
tasks_lock_rx,
587582
pending_futures: HashMap::new(),
588583
session_mode,
589584
positron_ns: None,
@@ -813,7 +808,6 @@ impl RMain {
813808
let stdin_reply_rx = self.stdin_reply_rx.clone();
814809
let kernel_request_rx = self.kernel_request_rx.clone();
815810
let tasks_interrupt_rx = self.tasks_interrupt_rx.clone();
816-
let tasks_lock_rx = self.tasks_lock_rx.clone();
817811
let tasks_idle_rx = self.tasks_idle_rx.clone();
818812

819813
// Process R's polled events regularly while waiting for console input.
@@ -825,7 +819,6 @@ impl RMain {
825819
let stdin_reply_index = select.recv(&stdin_reply_rx);
826820
let kernel_request_index = select.recv(&kernel_request_rx);
827821
let tasks_interrupt_index = select.recv(&tasks_interrupt_rx);
828-
let tasks_lock_index = select.recv(&tasks_lock_rx);
829822
let polled_events_index = select.recv(&polled_events_rx);
830823

831824
// Don't process idle tasks in browser prompts. We currently don't want
@@ -906,18 +899,6 @@ impl RMain {
906899
self.handle_task_interrupt(task);
907900
},
908901

909-
// A lock task woke us up
910-
i if i == tasks_lock_index => {
911-
let task = oper.recv(&tasks_lock_rx).unwrap();
912-
913-
// An async task would defeat the purpose of a locking task
914-
if !matches!(task, RTask::Sync(_)) {
915-
panic!("Expected Sync task");
916-
}
917-
918-
self.handle_task(task);
919-
},
920-
921902
// An idle task woke us up
922903
i if Some(i) == tasks_idle_index => {
923904
let task = oper.recv(&tasks_idle_rx).unwrap();

crates/ark/src/r_task.rs

Lines changed: 3 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,6 @@ static INTERRUPT_TASKS: LazyLock<TaskChannels> = LazyLock::new(|| TaskChannels::
2727
/// Task channels for idle-time tasks
2828
static IDLE_TASKS: LazyLock<TaskChannels> = LazyLock::new(|| TaskChannels::new());
2929

30-
/// Task channels for lock-time tasks
31-
static LOCK_TASKS: LazyLock<TaskChannels> = LazyLock::new(|| TaskChannels::new());
32-
3330
// Compared to `futures::BoxFuture`, this doesn't require the future to be Send.
3431
// We don't need this bound since the executor runs on only on the R thread
3532
pub(crate) type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
@@ -61,15 +58,11 @@ impl TaskChannels {
6158
}
6259
}
6360

64-
/// Returns receivers for interrupt, idle, and lock tasks.
61+
/// Returns receivers for both interrupt and idle tasks.
6562
/// Initializes the task channels if they haven't been initialized yet.
6663
/// Can only be called once (intended for `RMain` during init).
67-
pub(crate) fn take_receivers() -> (Receiver<RTask>, Receiver<RTask>, Receiver<RTask>) {
68-
(
69-
INTERRUPT_TASKS.take_rx(),
70-
IDLE_TASKS.take_rx(),
71-
LOCK_TASKS.take_rx(),
72-
)
64+
pub(crate) fn take_receivers() -> (Receiver<RTask>, Receiver<RTask>) {
65+
(INTERRUPT_TASKS.take_rx(), IDLE_TASKS.take_rx())
7366
}
7467

7568
pub enum RTask {
@@ -281,87 +274,6 @@ where
281274
return result.lock().unwrap().take().unwrap();
282275
}
283276

284-
pub fn spawn_try_lock<'env, F, T>(f: F) -> Option<T>
285-
where
286-
F: FnOnce() -> T,
287-
F: 'env + Send,
288-
T: 'env + Send,
289-
{
290-
if stdext::IS_TESTING && !RMain::is_initialized() {
291-
let _lock = harp::fixtures::R_TEST_LOCK.lock();
292-
r_test_init();
293-
return Some(f());
294-
}
295-
296-
if RMain::on_main_thread() {
297-
panic!("Can't call `spawn_try_lock()` on main thread");
298-
}
299-
300-
let result = SharedOption::default();
301-
302-
{
303-
let result = Arc::clone(&result);
304-
let closure = move || {
305-
*result.lock().unwrap() = Some(f());
306-
};
307-
308-
let closure: Box<dyn FnOnce() + Send + 'env> = Box::new(closure);
309-
let closure: Box<dyn FnOnce() + Send + 'static> = unsafe { std::mem::transmute(closure) };
310-
311-
// Channel to communicate status of the task/closure
312-
let (status_tx, status_rx) = bounded::<RTaskStatus>(0);
313-
314-
// Send the task to the R thread
315-
let task = RTask::Sync(RTaskSync {
316-
fun: closure,
317-
status_tx: Some(status_tx),
318-
start_info: RTaskStartInfo::new(false),
319-
});
320-
if let Err(_) = LOCK_TASKS.tx().try_send(task) {
321-
return None;
322-
};
323-
324-
// Block until we get the signal that the task has started
325-
let status = status_rx.recv().unwrap();
326-
327-
let RTaskStatus::Started = status else {
328-
let trace = std::backtrace::Backtrace::force_capture();
329-
panic!(
330-
"Task `status` value must be `Started`: {status:?}\n\
331-
Backtrace of calling thread:\n\n
332-
{trace}"
333-
);
334-
};
335-
336-
// Block until task was completed
337-
let status = status_rx.recv().unwrap();
338-
339-
let RTaskStatus::Finished(status) = status else {
340-
let trace = std::backtrace::Backtrace::force_capture();
341-
panic!(
342-
"Task `status` value must be `Finished`: {status:?}\n\
343-
Backtrace of calling thread:\n\n
344-
{trace}"
345-
);
346-
};
347-
348-
// If the task failed send a backtrace of the current thread to the
349-
// main thread
350-
if let Err(err) = status {
351-
let trace = std::backtrace::Backtrace::force_capture();
352-
panic!(
353-
"While running task: {err:?}\n\
354-
Backtrace of calling thread:\n\n\
355-
{trace}"
356-
);
357-
}
358-
}
359-
360-
// Retrieve closure result from the synchronized shared option.
361-
// If we get here without panicking we know the result was assigned.
362-
return Some(result.lock().unwrap().take().unwrap());
363-
}
364-
365277
pub(crate) fn spawn_idle<F, Fut>(fun: F)
366278
where
367279
F: FnOnce() -> Fut + 'static + Send,

0 commit comments

Comments
 (0)