Skip to content

Commit 1739a9a

Browse files
nipunn1313Convex, Inc.
authored andcommitted
Optimize usher to avoid deserializations with RawValue. (#41284)
Use serde_json::value::RawValue to avoid intermediate deserializations for particularly nested structures. GitOrigin-RevId: f79a0e7e554cb2cdbcb43b49a69b366ae6764e87
1 parent 3744cb1 commit 1739a9a

File tree

30 files changed

+411
-252
lines changed

30 files changed

+411
-252
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/application/src/api.rs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ use model::{
4848
file_storage::FileStorageId,
4949
session_requests::types::SessionRequestIdentifier,
5050
};
51-
use serde_json::Value as JsonValue;
5251
use sync_types::{
52+
types::SerializedArgs,
5353
AuthenticationToken,
5454
SerializedQueryJournal,
5555
Timestamp,
@@ -107,7 +107,7 @@ pub trait ApplicationApi: Send + Sync {
107107
request_id: RequestId,
108108
identity: Identity,
109109
path: ExportPath,
110-
args: Vec<JsonValue>,
110+
args: SerializedArgs,
111111
caller: FunctionCaller,
112112
ts: ExecuteQueryTimestamp,
113113
journal: Option<SerializedQueryJournal>,
@@ -122,7 +122,7 @@ pub trait ApplicationApi: Send + Sync {
122122
request_id: RequestId,
123123
identity: Identity,
124124
path: CanonicalizedComponentFunctionPath,
125-
args: Vec<JsonValue>,
125+
args: SerializedArgs,
126126
caller: FunctionCaller,
127127
ts: ExecuteQueryTimestamp,
128128
journal: Option<SerializedQueryJournal>,
@@ -135,7 +135,7 @@ pub trait ApplicationApi: Send + Sync {
135135
request_id: RequestId,
136136
identity: Identity,
137137
path: ExportPath,
138-
args: Vec<JsonValue>,
138+
args: SerializedArgs,
139139
caller: FunctionCaller,
140140
// Identifier used to make this mutation idempotent.
141141
mutation_identifier: Option<SessionRequestIdentifier>,
@@ -150,7 +150,7 @@ pub trait ApplicationApi: Send + Sync {
150150
request_id: RequestId,
151151
identity: Identity,
152152
path: CanonicalizedComponentFunctionPath,
153-
args: Vec<JsonValue>,
153+
args: SerializedArgs,
154154
caller: FunctionCaller,
155155
mutation_identifier: Option<SessionRequestIdentifier>,
156156
// The length of the mutation queue at the time the mutation was executed.
@@ -164,7 +164,7 @@ pub trait ApplicationApi: Send + Sync {
164164
request_id: RequestId,
165165
identity: Identity,
166166
path: ExportPath,
167-
args: Vec<JsonValue>,
167+
args: SerializedArgs,
168168
caller: FunctionCaller,
169169
) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>>;
170170

@@ -175,7 +175,7 @@ pub trait ApplicationApi: Send + Sync {
175175
request_id: RequestId,
176176
identity: Identity,
177177
path: CanonicalizedComponentFunctionPath,
178-
args: Vec<JsonValue>,
178+
args: SerializedArgs,
179179
caller: FunctionCaller,
180180
) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>>;
181181

@@ -199,7 +199,7 @@ pub trait ApplicationApi: Send + Sync {
199199
request_id: RequestId,
200200
identity: Identity,
201201
path: CanonicalizedComponentFunctionPath,
202-
args: Vec<JsonValue>,
202+
args: SerializedArgs,
203203
caller: FunctionCaller,
204204
) -> anyhow::Result<Result<FunctionReturn, FunctionError>>;
205205

@@ -283,7 +283,7 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
283283
request_id: RequestId,
284284
identity: Identity,
285285
path: ExportPath,
286-
args: Vec<JsonValue>,
286+
args: SerializedArgs,
287287
caller: FunctionCaller,
288288
ts: ExecuteQueryTimestamp,
289289
journal: Option<SerializedQueryJournal>,
@@ -299,7 +299,7 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
299299
self.read_only_udf_at_ts(
300300
request_id,
301301
PublicFunctionPath::RootExport(path),
302-
args,
302+
args.into_args()?,
303303
identity,
304304
ts,
305305
journal,
@@ -314,7 +314,7 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
314314
request_id: RequestId,
315315
identity: Identity,
316316
path: CanonicalizedComponentFunctionPath,
317-
args: Vec<JsonValue>,
317+
args: SerializedArgs,
318318
caller: FunctionCaller,
319319
ts: ExecuteQueryTimestamp,
320320
journal: Option<SerializedQueryJournal>,
@@ -330,7 +330,7 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
330330
self.read_only_udf_at_ts(
331331
request_id,
332332
PublicFunctionPath::Component(path),
333-
args,
333+
args.into_args()?,
334334
identity,
335335
ts,
336336
journal,
@@ -345,7 +345,7 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
345345
request_id: RequestId,
346346
identity: Identity,
347347
path: ExportPath,
348-
args: Vec<JsonValue>,
348+
args: SerializedArgs,
349349
caller: FunctionCaller,
350350
// Identifier used to make this mutation idempotent.
351351
mutation_identifier: Option<SessionRequestIdentifier>,
@@ -358,7 +358,7 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
358358
self.mutation_udf(
359359
request_id,
360360
PublicFunctionPath::RootExport(path),
361-
args,
361+
args.into_args()?,
362362
identity,
363363
mutation_identifier,
364364
caller,
@@ -373,7 +373,7 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
373373
request_id: RequestId,
374374
identity: Identity,
375375
path: CanonicalizedComponentFunctionPath,
376-
args: Vec<JsonValue>,
376+
args: SerializedArgs,
377377
caller: FunctionCaller,
378378
mutation_identifier: Option<SessionRequestIdentifier>,
379379
mutation_queue_length: Option<usize>,
@@ -385,7 +385,7 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
385385
self.mutation_udf(
386386
request_id,
387387
PublicFunctionPath::Component(path),
388-
args,
388+
args.into_args()?,
389389
identity,
390390
mutation_identifier,
391391
caller,
@@ -400,7 +400,7 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
400400
request_id: RequestId,
401401
identity: Identity,
402402
path: ExportPath,
403-
args: Vec<JsonValue>,
403+
args: SerializedArgs,
404404
caller: FunctionCaller,
405405
) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>> {
406406
anyhow::ensure!(
@@ -410,7 +410,7 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
410410
self.action_udf(
411411
request_id,
412412
PublicFunctionPath::RootExport(path),
413-
args,
413+
args.into_args()?,
414414
identity,
415415
caller,
416416
)
@@ -423,7 +423,7 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
423423
request_id: RequestId,
424424
identity: Identity,
425425
path: CanonicalizedComponentFunctionPath,
426-
args: Vec<JsonValue>,
426+
args: SerializedArgs,
427427
caller: FunctionCaller,
428428
) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>> {
429429
anyhow::ensure!(
@@ -433,7 +433,7 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
433433
self.action_udf(
434434
request_id,
435435
PublicFunctionPath::Component(path),
436-
args,
436+
args.into_args()?,
437437
identity,
438438
caller,
439439
)
@@ -446,14 +446,15 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
446446
request_id: RequestId,
447447
identity: Identity,
448448
path: CanonicalizedComponentFunctionPath,
449-
args: Vec<JsonValue>,
449+
args: SerializedArgs,
450450
caller: FunctionCaller,
451451
) -> anyhow::Result<Result<FunctionReturn, FunctionError>> {
452452
anyhow::ensure!(
453453
path.component.is_root() || identity.is_admin() || identity.is_system(),
454454
"Only admin or system users can call functions on non-root components directly"
455455
);
456-
self.any_udf(request_id, path, args, identity, caller).await
456+
self.any_udf(request_id, path, args.into_args()?, identity, caller)
457+
.await
457458
}
458459

459460
async fn latest_timestamp(

crates/application/src/application_function_runner/mod.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,10 @@ use node_executor::{
156156
};
157157
use serde_json::Value as JsonValue;
158158
use storage::Storage;
159-
use sync_types::CanonicalizedModulePath;
159+
use sync_types::{
160+
types::SerializedArgs,
161+
CanonicalizedModulePath,
162+
};
160163
use tokio::sync::mpsc;
161164
use udf::{
162165
environment::system_env_vars,
@@ -1904,15 +1907,15 @@ impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> {
19041907
&self,
19051908
identity: Identity,
19061909
path: CanonicalizedComponentFunctionPath,
1907-
args: Vec<JsonValue>,
1910+
args: SerializedArgs,
19081911
context: ExecutionContext,
19091912
) -> anyhow::Result<FunctionResult> {
19101913
let ts = self.database.now_ts_for_reads();
19111914
let result = self
19121915
.run_query_at_ts(
19131916
context.request_id,
19141917
PublicFunctionPath::Component(path),
1915-
args,
1918+
args.into_args()?,
19161919
identity,
19171920
*ts,
19181921
None,
@@ -1931,14 +1934,14 @@ impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> {
19311934
&self,
19321935
identity: Identity,
19331936
path: CanonicalizedComponentFunctionPath,
1934-
args: Vec<JsonValue>,
1937+
args: SerializedArgs,
19351938
context: ExecutionContext,
19361939
) -> anyhow::Result<FunctionResult> {
19371940
let result = self
19381941
.retry_mutation(
19391942
context.request_id,
19401943
PublicFunctionPath::Component(path),
1941-
args,
1944+
args.into_args()?,
19421945
identity,
19431946
None,
19441947
FunctionCaller::Action {
@@ -1960,15 +1963,15 @@ impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> {
19601963
&self,
19611964
identity: Identity,
19621965
path: CanonicalizedComponentFunctionPath,
1963-
args: Vec<JsonValue>,
1966+
args: SerializedArgs,
19641967
context: ExecutionContext,
19651968
) -> anyhow::Result<FunctionResult> {
19661969
let _tx = self.database.begin(identity.clone()).await?;
19671970
let result = self
19681971
.run_action(
19691972
context.request_id,
19701973
PublicFunctionPath::Component(path),
1971-
args,
1974+
args.into_args()?,
19721975
identity,
19731976
FunctionCaller::Action {
19741977
parent_scheduled_job: context.parent_scheduled_job,
@@ -2079,7 +2082,7 @@ impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> {
20792082
identity: Identity,
20802083
scheduling_component: ComponentId,
20812084
scheduled_path: CanonicalizedComponentFunctionPath,
2082-
udf_args: Vec<JsonValue>,
2085+
udf_args: SerializedArgs,
20832086
scheduled_ts: UnixTimestamp,
20842087
context: ExecutionContext,
20852088
) -> anyhow::Result<DeveloperDocumentId> {
@@ -2096,7 +2099,7 @@ impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> {
20962099
async move {
20972100
let (path, udf_args) = validate_schedule_args(
20982101
path,
2099-
args,
2102+
args.into_args()?,
21002103
scheduled_ts,
21012104
// Scheduling from actions is not transaction and happens at latest
21022105
// timestamp.

crates/application/src/tests/http_action.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use keybroker::Identity;
2121
use must_let::must_let;
2222
use runtime::testing::TestRuntime;
2323
use serde_json::json;
24+
use sync_types::types::SerializedArgs;
2425
use tokio::{
2526
select,
2627
sync::mpsc,
@@ -413,7 +414,7 @@ async fn test_http_action_continues_after_client_disconnects(
413414
component: ComponentPath::root(),
414415
udf_path: "functions:didWrite".parse()?,
415416
},
416-
vec![json!({})],
417+
SerializedArgs::from_args(vec![json!({})])?,
417418
FunctionCaller::HttpEndpoint,
418419
ExecuteQueryTimestamp::Latest,
419420
None,

crates/application/src/tests/storage.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use model::{
2424
use must_let::must_let;
2525
use runtime::testing::TestRuntime;
2626
use serde_json::json;
27+
use sync_types::types::SerializedArgs;
2728
use value::ConvexValue;
2829

2930
use crate::{
@@ -105,7 +106,7 @@ async fn test_storage_get_url(rt: TestRuntime) -> anyhow::Result<()> {
105106
component: ComponentPath::root(),
106107
udf_path: "storage:getFileUrl".parse()?,
107108
},
108-
args.clone(),
109+
SerializedArgs::from_args(args.clone())?,
109110
caller.clone(),
110111
ts.clone(),
111112
None,
@@ -123,7 +124,7 @@ async fn test_storage_get_url(rt: TestRuntime) -> anyhow::Result<()> {
123124
component: ComponentPath::root(),
124125
udf_path: "storage:getFileUrlFromAction".parse()?,
125126
},
126-
args.clone(),
127+
SerializedArgs::from_args(args.clone())?,
127128
caller.clone(),
128129
)
129130
.await??;
@@ -151,7 +152,7 @@ async fn test_storage_get_url(rt: TestRuntime) -> anyhow::Result<()> {
151152
component: ComponentPath::root(),
152153
udf_path: "storage:getFileUrl".parse()?,
153154
},
154-
args.clone(),
155+
SerializedArgs::from_args(args.clone())?,
155156
caller.clone(),
156157
ts.clone(),
157158
None,
@@ -169,7 +170,7 @@ async fn test_storage_get_url(rt: TestRuntime) -> anyhow::Result<()> {
169170
component: ComponentPath::root(),
170171
udf_path: "storage:getFileUrlFromAction".parse()?,
171172
},
172-
args.clone(),
173+
SerializedArgs::from_args(args.clone())?,
173174
caller.clone(),
174175
)
175176
.await??;
@@ -205,7 +206,7 @@ async fn test_storage_generate_upload_url(rt: TestRuntime) -> anyhow::Result<()>
205206
component: ComponentPath::root(),
206207
udf_path: "storage:generateUploadUrl".parse()?,
207208
},
208-
args.clone(),
209+
SerializedArgs::from_args(args.clone())?,
209210
caller.clone(),
210211
None,
211212
None,
@@ -224,7 +225,7 @@ async fn test_storage_generate_upload_url(rt: TestRuntime) -> anyhow::Result<()>
224225
component: ComponentPath::root(),
225226
udf_path: "storage:generateUploadUrlFromAction".parse()?,
226227
},
227-
args.clone(),
228+
SerializedArgs::from_args(args.clone())?,
228229
caller.clone(),
229230
)
230231
.await??;
@@ -252,7 +253,7 @@ async fn test_storage_generate_upload_url(rt: TestRuntime) -> anyhow::Result<()>
252253
component: ComponentPath::root(),
253254
udf_path: "storage:generateUploadUrl".parse()?,
254255
},
255-
args.clone(),
256+
SerializedArgs::from_args(args.clone())?,
256257
caller.clone(),
257258
None,
258259
None,
@@ -271,7 +272,7 @@ async fn test_storage_generate_upload_url(rt: TestRuntime) -> anyhow::Result<()>
271272
component: ComponentPath::root(),
272273
udf_path: "storage:generateUploadUrlFromAction".parse()?,
273274
},
274-
args.clone(),
275+
SerializedArgs::from_args(args.clone())?,
275276
caller.clone(),
276277
)
277278
.await??;

0 commit comments

Comments
 (0)