Skip to content

Commit bd3e77b

Browse files
Sujay JayakarConvex, Inc.
authored andcommitted
Wire in isolate2 to UdfTest::query (#24591)
This currently only supports queries and requires opting in with... ```rust #[convex_macro::test_runtime] async fn test_basic(rt: TestRuntime) -> anyhow::Result<()> { let mut t = UdfTest::default(rt).await?; t.enable_isolate_v2(); ... } ``` GitOrigin-RevId: d5d5d27294f4d24af72fd593b7021f32f8ad3eac
1 parent 0a8b1a9 commit bd3e77b

File tree

5 files changed

+446
-413
lines changed

5 files changed

+446
-413
lines changed

crates/isolate/src/isolate2/client.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,16 @@ use std::{
88

99
use common::{
1010
errors::JsError,
11+
runtime::Runtime,
1112
types::UdfType,
1213
};
13-
use crossbeam_channel;
1414
use deno_core::ModuleSpecifier;
1515
use futures::{
1616
self,
17-
channel::oneshot,
18-
FutureExt,
17+
channel::{
18+
mpsc,
19+
oneshot,
20+
},
1921
};
2022
use serde_json::Value as JsonValue;
2123
use tokio::sync::Semaphore;
@@ -74,19 +76,22 @@ pub struct AsyncSyscallCompletion {
7476
pub result: Result<JsonValue, JsError>,
7577
}
7678

77-
pub struct IsolateThreadClient {
78-
sender: crossbeam_channel::Sender<IsolateThreadRequest>,
79+
pub struct IsolateThreadClient<RT: Runtime> {
80+
rt: RT,
81+
sender: mpsc::Sender<IsolateThreadRequest>,
7982
user_time_remaining: Duration,
8083
semaphore: Arc<Semaphore>,
8184
}
8285

83-
impl IsolateThreadClient {
86+
impl<RT: Runtime> IsolateThreadClient<RT> {
8487
pub fn new(
85-
sender: crossbeam_channel::Sender<IsolateThreadRequest>,
88+
rt: RT,
89+
sender: mpsc::Sender<IsolateThreadRequest>,
8690
user_timeout: Duration,
8791
semaphore: Arc<Semaphore>,
8892
) -> Self {
8993
Self {
94+
rt,
9095
sender,
9196
user_time_remaining: user_timeout,
9297
semaphore,
@@ -108,11 +113,10 @@ impl IsolateThreadClient {
108113

109114
// Start the user timer after we acquire the permit.
110115
let user_start = Instant::now();
111-
let user_timeout = tokio::time::sleep(self.user_time_remaining);
112-
113-
self.sender.send(request)?;
116+
let mut user_timeout = self.rt.wait(self.user_time_remaining);
117+
self.sender.try_send(request)?;
114118
let result = futures::select_biased! {
115-
_ = user_timeout.fuse() => {
119+
_ = user_timeout => {
116120
// XXX: We need to terminate the isolate handle here in
117121
// case user code is in an infinite loop.
118122
anyhow::bail!("User time exhausted");

0 commit comments

Comments
 (0)