Skip to content

Commit 76f55b7

Browse files
Sujay JayakarConvex, Inc.
authored andcommitted
Wire the syscall providers into isolate2 (#24616)
## Open Source There's still a bunch of unimplemented functionality here, but it's enough to get a basic mutation working! GitOrigin-RevId: 649d89d154eeeb7be2bd4affb7ebd409cfa5facd
1 parent ef04801 commit 76f55b7

File tree

6 files changed

+319
-83
lines changed

6 files changed

+319
-83
lines changed

crates/isolate/src/environment/udf/async_syscall.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ impl HeapSize for PendingSyscall {
106106

107107
// Checks if the underlying table and the request's expectation for the table
108108
// line up.
109-
fn system_table_guard(name: &TableName, expect_system_table: bool) -> anyhow::Result<()> {
109+
pub fn system_table_guard(name: &TableName, expect_system_table: bool) -> anyhow::Result<()> {
110110
if expect_system_table && !name.is_system() {
111111
return Err(anyhow::anyhow!(ErrorMetadata::bad_request(
112112
"SystemTableError",
@@ -185,6 +185,10 @@ impl AsyncSyscallBatch {
185185
Self::Unbatched { .. } => 1,
186186
}
187187
}
188+
189+
pub fn is_empty(&self) -> bool {
190+
self.len() == 0
191+
}
188192
}
189193

190194
pub struct QueryManager<RT: Runtime> {
@@ -221,7 +225,8 @@ impl<RT: Runtime> QueryManager<RT> {
221225
}
222226

223227
// Trait for allowing code reuse between `DatabaseUdfEnvironment` and isolate2.
224-
pub trait SyscallProvider<RT: Runtime> {
228+
#[allow(async_fn_in_trait)]
229+
pub trait AsyncSyscallProvider<RT: Runtime> {
225230
fn rt(&self) -> &RT;
226231
fn tx(&mut self) -> Result<&mut Transaction<RT>, ErrorMetadata>;
227232
fn key_broker(&self) -> &KeyBroker;
@@ -258,7 +263,7 @@ pub trait SyscallProvider<RT: Runtime> {
258263
) -> anyhow::Result<Option<FileStorageEntry>>;
259264
}
260265

261-
impl<RT: Runtime> SyscallProvider<RT> for DatabaseUdfEnvironment<RT> {
266+
impl<RT: Runtime> AsyncSyscallProvider<RT> for DatabaseUdfEnvironment<RT> {
262267
fn rt(&self) -> &RT {
263268
&self.phase.rt
264269
}
@@ -361,11 +366,11 @@ impl<RT: Runtime> SyscallProvider<RT> for DatabaseUdfEnvironment<RT> {
361366
///
362367
/// Most of the common logic lives on `Transaction` or `DatabaseSyscallsShared`,
363368
/// and this is mostly just taking care of the argument parsing.
364-
pub struct DatabaseSyscallsV1<RT: Runtime, P: SyscallProvider<RT>> {
369+
pub struct DatabaseSyscallsV1<RT: Runtime, P: AsyncSyscallProvider<RT>> {
365370
_pd: PhantomData<(RT, P)>,
366371
}
367372

368-
impl<RT: Runtime, P: SyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
373+
impl<RT: Runtime, P: AsyncSyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
369374
/// Runs a batch of syscalls, each of which can succeed or fail
370375
/// independently. The returned vec is the same length as the batch.
371376
#[minitrace::trace]
@@ -893,7 +898,7 @@ impl<RT: Runtime, P: SyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
893898
}
894899
}
895900

896-
struct DatabaseSyscallsShared<RT: Runtime, P: SyscallProvider<RT>> {
901+
struct DatabaseSyscallsShared<RT: Runtime, P: AsyncSyscallProvider<RT>> {
897902
_pd: PhantomData<(RT, P)>,
898903
}
899904

@@ -923,7 +928,7 @@ struct QueryPageMetadata {
923928
page_status: Option<QueryPageStatus>,
924929
}
925930

926-
impl<RT: Runtime, P: SyscallProvider<RT>> DatabaseSyscallsShared<RT, P> {
931+
impl<RT: Runtime, P: AsyncSyscallProvider<RT>> DatabaseSyscallsShared<RT, P> {
927932
async fn read_page_from_query<T: QueryType>(
928933
mut query: CompiledQuery<RT, T>,
929934
tx: &mut Transaction<RT>,

crates/isolate/src/environment/udf/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ use model::environment_variables::types::{
88
EnvVarName,
99
EnvVarValue,
1010
};
11-
mod async_syscall;
11+
pub mod async_syscall;
1212

1313
pub mod outcome;
1414
mod phase;
15-
mod syscall;
15+
pub mod syscall;
1616
use std::{
1717
cmp::Ordering,
1818
collections::VecDeque,

crates/isolate/src/environment/udf/syscall.rs

Lines changed: 74 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
#![allow(non_snake_case)]
2-
31
use std::convert::TryFrom;
42

53
use anyhow::Context;
@@ -11,6 +9,7 @@ use common::{
119
use database::{
1210
query::TableFilter,
1311
DeveloperQuery,
12+
Transaction,
1413
};
1514
use errors::ErrorMetadata;
1615
use serde::{
@@ -27,25 +26,49 @@ use value::{
2726
TableName,
2827
};
2928

30-
use super::async_syscall::{
31-
DatabaseSyscallsV1,
32-
SyscallProvider,
29+
use super::{
30+
async_syscall::QueryManager,
31+
DatabaseUdfEnvironment,
3332
};
3433
use crate::environment::helpers::{
3534
parse_version,
3635
with_argument_error,
3736
ArgName,
3837
};
3938

39+
pub trait SyscallProvider<RT: Runtime> {
40+
fn table_filter(&self) -> TableFilter;
41+
fn query_manager(&mut self) -> &mut QueryManager<RT>;
42+
fn tx(&mut self) -> Result<&mut Transaction<RT>, ErrorMetadata>;
43+
}
44+
45+
impl<RT: Runtime> SyscallProvider<RT> for DatabaseUdfEnvironment<RT> {
46+
fn table_filter(&self) -> TableFilter {
47+
if self.udf_path.is_system() {
48+
TableFilter::IncludePrivateSystemTables
49+
} else {
50+
TableFilter::ExcludePrivateSystemTables
51+
}
52+
}
53+
54+
fn query_manager(&mut self) -> &mut QueryManager<RT> {
55+
&mut self.query_manager
56+
}
57+
58+
fn tx(&mut self) -> Result<&mut Transaction<RT>, ErrorMetadata> {
59+
self.phase.tx()
60+
}
61+
}
62+
4063
pub fn syscall_impl<RT: Runtime, P: SyscallProvider<RT>>(
4164
provider: &mut P,
4265
name: &str,
4366
args: JsonValue,
4467
) -> anyhow::Result<JsonValue> {
4568
match name {
46-
"1.0/queryCleanup" => DatabaseSyscallsV1::syscall_queryCleanup(provider, args),
47-
"1.0/queryStream" => DatabaseSyscallsV1::syscall_queryStream(provider, args),
48-
"1.0/db/normalizeId" => syscall_normalizeId(provider, args),
69+
"1.0/queryCleanup" => syscall_query_cleanup(provider, args),
70+
"1.0/queryStream" => syscall_query_stream(provider, args),
71+
"1.0/db/normalizeId" => syscall_normalize_id(provider, args),
4972

5073
#[cfg(test)]
5174
"throwSystemError" => anyhow::bail!("I can't go for that."),
@@ -73,7 +96,7 @@ pub fn syscall_impl<RT: Runtime, P: SyscallProvider<RT>>(
7396
}
7497
}
7598

76-
fn syscall_normalizeId<RT: Runtime, P: SyscallProvider<RT>>(
99+
fn syscall_normalize_id<RT: Runtime, P: SyscallProvider<RT>>(
77100
provider: &mut P,
78101
args: JsonValue,
79102
) -> anyhow::Result<JsonValue> {
@@ -128,48 +151,52 @@ fn syscall_normalizeId<RT: Runtime, P: SyscallProvider<RT>>(
128151
}
129152
}
130153

131-
impl<RT: Runtime, P: SyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
132-
fn syscall_queryStream(provider: &mut P, args: JsonValue) -> anyhow::Result<JsonValue> {
133-
let _s: common::tracing::NoopSpan = static_span!();
134-
let table_filter = provider.table_filter();
135-
let tx = provider.tx()?;
154+
fn syscall_query_stream<RT: Runtime, P: SyscallProvider<RT>>(
155+
provider: &mut P,
156+
args: JsonValue,
157+
) -> anyhow::Result<JsonValue> {
158+
let _s: common::tracing::NoopSpan = static_span!();
159+
let table_filter = provider.table_filter();
160+
let tx = provider.tx()?;
136161

137-
#[derive(Deserialize)]
138-
struct QueryStreamArgs {
139-
query: JsonValue,
140-
version: Option<String>,
141-
}
142-
let (parsed_query, version) = with_argument_error("queryStream", || {
143-
let args: QueryStreamArgs = serde_json::from_value(args)?;
144-
let parsed_query = Query::try_from(args.query).context(ArgName("query"))?;
145-
let version = parse_version(args.version)?;
146-
Ok((parsed_query, version))
147-
})?;
148-
// TODO: Are all invalid query pipelines developer errors? These could be bugs
149-
// in convex/server.
150-
let compiled_query =
151-
{ DeveloperQuery::new_with_version(tx, parsed_query, version, table_filter)? };
152-
let query_id = provider.query_manager().put_developer(compiled_query);
162+
#[derive(Deserialize)]
163+
struct QueryStreamArgs {
164+
query: JsonValue,
165+
version: Option<String>,
166+
}
167+
let (parsed_query, version) = with_argument_error("queryStream", || {
168+
let args: QueryStreamArgs = serde_json::from_value(args)?;
169+
let parsed_query = Query::try_from(args.query).context(ArgName("query"))?;
170+
let version = parse_version(args.version)?;
171+
Ok((parsed_query, version))
172+
})?;
173+
// TODO: Are all invalid query pipelines developer errors? These could be bugs
174+
// in convex/server.
175+
let compiled_query =
176+
{ DeveloperQuery::new_with_version(tx, parsed_query, version, table_filter)? };
177+
let query_id = provider.query_manager().put_developer(compiled_query);
153178

154-
#[derive(Serialize)]
155-
#[serde(rename_all = "camelCase")]
156-
struct QueryStreamResult {
157-
query_id: u32,
158-
}
159-
Ok(serde_json::to_value(QueryStreamResult { query_id })?)
179+
#[derive(Serialize)]
180+
#[serde(rename_all = "camelCase")]
181+
struct QueryStreamResult {
182+
query_id: u32,
160183
}
184+
Ok(serde_json::to_value(QueryStreamResult { query_id })?)
185+
}
161186

162-
fn syscall_queryCleanup(provider: &mut P, args: JsonValue) -> anyhow::Result<JsonValue> {
163-
let _s = static_span!();
187+
fn syscall_query_cleanup<RT: Runtime, P: SyscallProvider<RT>>(
188+
provider: &mut P,
189+
args: JsonValue,
190+
) -> anyhow::Result<JsonValue> {
191+
let _s = static_span!();
164192

165-
#[derive(Deserialize)]
166-
#[serde(rename_all = "camelCase")]
167-
struct QueryCleanupArgs {
168-
query_id: u32,
169-
}
170-
let args: QueryCleanupArgs =
171-
with_argument_error("queryCleanup", || Ok(serde_json::from_value(args)?))?;
172-
let cleaned_up = provider.query_manager().cleanup_developer(args.query_id);
173-
Ok(serde_json::to_value(cleaned_up)?)
193+
#[derive(Deserialize)]
194+
#[serde(rename_all = "camelCase")]
195+
struct QueryCleanupArgs {
196+
query_id: u32,
174197
}
198+
let args: QueryCleanupArgs =
199+
with_argument_error("queryCleanup", || Ok(serde_json::from_value(args)?))?;
200+
let cleaned_up = provider.query_manager().cleanup_developer(args.query_id);
201+
Ok(serde_json::to_value(cleaned_up)?)
175202
}

0 commit comments

Comments
 (0)