Skip to content

Commit 4d18b85

Browse files
authored
Add cache utilities and plumbing through the evaluator. (#4)
* Add caching options to execution plan with err message enhancement * Create struct for `MemoizationInfo`. * Add `EvaluationCache` to access cache. * Plubming `EvaluatorCache` through the evaluator.
1 parent 20564d4 commit 4d18b85

File tree

9 files changed

+222
-24
lines changed

9 files changed

+222
-24
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,4 @@ blocking = "1.6.1"
3535
indenter = "0.3.3"
3636
itertools = "0.14.0"
3737
derivative = "2.2.0"
38+
async-lock = "3.4.0"

src/builder/analyzer.rs

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,7 @@ impl<'a> AnalyzerContext<'a> {
645645
source_id
646646
});
647647

648+
let op_name = source_op.name.clone();
648649
let output = scope.add_field(source_op.name, &output_type)?;
649650
let result_fut = async move {
650651
Ok(AnalyzedSourceOp {
@@ -654,9 +655,10 @@ impl<'a> AnalyzerContext<'a> {
654655
primary_key_type: output_type
655656
.typ
656657
.key_type()
657-
.ok_or_else(|| api_error!("Source must produce a type with key"))?
658+
.ok_or_else(|| api_error!("Source must produce a type with key: {op_name}"))?
658659
.typ
659660
.clone(),
661+
name: op_name,
660662
})
661663
};
662664
Ok(result_fut)
@@ -696,10 +698,27 @@ impl<'a> AnalyzerContext<'a> {
696698
let output = scope
697699
.data
698700
.add_field(reactive_op.name.clone(), &output_type)?;
701+
let op_name = reactive_op.name.clone();
699702
async move {
703+
let executor = executor.await.with_context(|| {
704+
format!("Failed to build executor for transform op: {op_name}")
705+
})?;
706+
let function_exec_info = AnalyzedFunctionExecInfo {
707+
enable_caching: executor.enable_caching(),
708+
behavior_version: executor.behavior_version(),
709+
};
710+
if function_exec_info.enable_caching
711+
&& function_exec_info.behavior_version.is_some()
712+
{
713+
api_bail!(
714+
"When caching is enabled, behavior version must be specified for transform op: {op_name}",
715+
);
716+
}
700717
Ok(AnalyzedReactiveOp::Transform(AnalyzedTransformOp {
718+
name: op_name,
701719
inputs: input_value_mappings,
702-
executor: executor.await?,
720+
function_exec_info,
721+
executor,
703722
output,
704723
}))
705724
}
@@ -735,10 +754,14 @@ impl<'a> AnalyzerContext<'a> {
735754
parent_scopes.prepend(&scope),
736755
)?
737756
};
757+
let op_name = reactive_op.name.clone();
738758
async move {
739759
Ok(AnalyzedReactiveOp::ForEach(AnalyzedForEachOp {
740760
local_field_ref,
741-
op_scope: op_scope_fut.await?,
761+
op_scope: op_scope_fut
762+
.await
763+
.with_context(|| format!("Analyzing foreach op: {op_name}"))?,
764+
name: op_name,
742765
}))
743766
}
744767
.boxed()
@@ -753,8 +776,10 @@ impl<'a> AnalyzerContext<'a> {
753776
struct_schema,
754777
scopes,
755778
)?;
779+
let op_name = reactive_op.name.clone();
756780
async move {
757781
Ok(AnalyzedReactiveOp::Collect(AnalyzedCollectOp {
782+
name: op_name,
758783
input: struct_mapping,
759784
collector_ref,
760785
}))
@@ -933,8 +958,12 @@ impl<'a> AnalyzerContext<'a> {
933958
.transpose()?;
934959

935960
Ok(async move {
936-
let (executor, query_target) = executor_futs.await?;
961+
let (executor, query_target) = executor_futs
962+
.await
963+
.with_context(|| format!("Analyzing export op: {}", export_op.name))?;
964+
let name = export_op.name;
937965
Ok(AnalyzedExportOp {
966+
name,
938967
target_id: target_id.unwrap_or_default(),
939968
input: local_collector_ref,
940969
executor,

src/builder/plan.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,24 +55,34 @@ pub struct AnalyzedOpOutput {
5555
}
5656

5757
pub struct AnalyzedSourceOp {
58+
pub name: String,
5859
pub source_id: i32,
5960
pub executor: Box<dyn SourceExecutor>,
6061
pub output: AnalyzedOpOutput,
6162
pub primary_key_type: ValueType,
6263
}
6364

65+
pub struct AnalyzedFunctionExecInfo {
66+
pub enable_caching: bool,
67+
pub behavior_version: Option<u32>,
68+
}
69+
6470
pub struct AnalyzedTransformOp {
71+
pub name: String,
6572
pub inputs: Vec<AnalyzedValueMapping>,
73+
pub function_exec_info: AnalyzedFunctionExecInfo,
6674
pub executor: Box<dyn SimpleFunctionExecutor>,
6775
pub output: AnalyzedOpOutput,
6876
}
6977

7078
pub struct AnalyzedForEachOp {
79+
pub name: String,
7180
pub local_field_ref: AnalyzedLocalFieldReference,
7281
pub op_scope: AnalyzedOpScope,
7382
}
7483

7584
pub struct AnalyzedCollectOp {
85+
pub name: String,
7686
pub input: AnalyzedStructMapping,
7787
pub collector_ref: AnalyzedCollectorReference,
7888
}
@@ -82,6 +92,7 @@ pub enum AnalyzedPrimaryKeyDef {
8292
}
8393

8494
pub struct AnalyzedExportOp {
95+
pub name: String,
8596
pub target_id: i32,
8697
pub input: AnalyzedLocalCollectorReference,
8798
pub executor: Arc<dyn ExportTargetExecutor>,

src/execution/db_tracking.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::db_tracking_setup::TrackingTableSetupState;
1+
use super::{db_tracking_setup::TrackingTableSetupState, memoization::MemoizationInfo};
22
use crate::utils::db::WriteAction;
33
use anyhow::Result;
44
use sqlx::PgPool;
@@ -11,7 +11,7 @@ pub type TrackedTargetKeyForSource = Vec<(i32, Vec<TrackedTargetKey>)>;
1111
pub struct SourceTrackingInfo {
1212
pub max_process_ordinal: i64,
1313
pub staging_target_keys: sqlx::types::Json<TrackedTargetKeyForSource>,
14-
pub memoization_info: Option<serde_json::Value>,
14+
pub memoization_info: Option<sqlx::types::Json<MemoizationInfo>>,
1515

1616
pub processed_source_ordinal: Option<i64>,
1717
pub process_logic_fingerprint: Option<Vec<u8>>,
@@ -73,7 +73,7 @@ pub async fn precommit_source_tracking_info(
7373
source_key_json: &serde_json::Value,
7474
max_process_ordinal: i64,
7575
staging_target_keys: TrackedTargetKeyForSource,
76-
memoization_info: serde_json::Value,
76+
memoization_info: Option<&MemoizationInfo>,
7777
db_setup: &TrackingTableSetupState,
7878
db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
7979
action: WriteAction,
@@ -92,7 +92,7 @@ pub async fn precommit_source_tracking_info(
9292
.bind(source_key_json) // $2
9393
.bind(max_process_ordinal) // $3
9494
.bind(sqlx::types::Json(staging_target_keys)) // $4
95-
.bind(memoization_info) // $5
95+
.bind(memoization_info.map(|m| sqlx::types::Json(m))) // $5
9696
.execute(db_executor)
9797
.await?;
9898
Ok(())

src/execution/evaluator.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use crate::{
1111
utils::immutable::RefList,
1212
};
1313

14+
use super::memoization::EvaluationCache;
15+
1416
#[derive(Debug)]
1517
pub struct ScopeValueBuilder {
1618
// TODO: Share the same lock for values produced in the same execution scope, for stricter atomicity.
@@ -287,13 +289,15 @@ async fn evaluate_child_op_scope(
287289
op_scope: &AnalyzedOpScope,
288290
scoped_entries: RefList<'_, &ScopeEntry<'_>>,
289291
child_scope_entry: ScopeEntry<'_>,
292+
cache: Option<&EvaluationCache>,
290293
) -> Result<()> {
291-
evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry)).await
294+
evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), cache).await
292295
}
293296

294297
async fn evaluate_op_scope(
295298
op_scope: &AnalyzedOpScope,
296299
scoped_entries: RefList<'_, &ScopeEntry<'_>>,
300+
cache: Option<&EvaluationCache>,
297301
) -> Result<()> {
298302
let head_scope = *scoped_entries.head().unwrap();
299303
for reactive_op in op_scope.reactive_ops.iter() {
@@ -324,6 +328,7 @@ async fn evaluate_op_scope(
324328
value: &item,
325329
schema: &collection_schema.row,
326330
},
331+
cache,
327332
)
328333
})
329334
.collect::<Vec<_>>(),
@@ -338,6 +343,7 @@ async fn evaluate_op_scope(
338343
value: v,
339344
schema: &collection_schema.row,
340345
},
346+
cache,
341347
)
342348
})
343349
.collect::<Vec<_>>(),
@@ -353,6 +359,7 @@ async fn evaluate_op_scope(
353359
value: item,
354360
schema: &collection_schema.row,
355361
},
362+
cache,
356363
)
357364
})
358365
.collect::<Vec<_>>(),
@@ -388,6 +395,7 @@ pub async fn evaluate_source_entry<'a>(
388395
source_op_idx: u32,
389396
schema: &schema::DataSchema,
390397
key: &value::KeyValue,
398+
cache: Option<&EvaluationCache>,
391399
) -> Result<Option<ScopeValueBuilder>> {
392400
let root_schema = &schema.schema;
393401
let root_scope_value =
@@ -418,7 +426,12 @@ pub async fn evaluate_source_entry<'a>(
418426
value::Value::Table(BTreeMap::from([(key.clone(), scope_value)])),
419427
);
420428

421-
evaluate_op_scope(&plan.op_scope, RefList::Nil.prepend(&root_scope_entry)).await?;
429+
evaluate_op_scope(
430+
&plan.op_scope,
431+
RefList::Nil.prepend(&root_scope_entry),
432+
cache,
433+
)
434+
.await?;
422435
Some(root_scope_value)
423436
}
424437
None => None,
@@ -452,6 +465,7 @@ pub async fn evaluate_transient_flow(
452465
evaluate_op_scope(
453466
&flow.execution_plan.op_scope,
454467
RefList::Nil.prepend(&root_scope_entry),
468+
None,
455469
)
456470
.await?;
457471
let output_value = assemble_value(

src/execution/indexer.rs

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use sqlx::PgPool;
1010

1111
use super::db_tracking::{self, read_source_tracking_info};
1212
use super::db_tracking_setup;
13+
use super::memoization::{EvaluationCache, MemoizationInfo};
1314
use crate::base::schema;
1415
use crate::base::spec::FlowInstanceSpec;
1516
use crate::base::value::{self, FieldValues, KeyValue};
@@ -115,6 +116,10 @@ struct TrackingInfoForTarget<'a> {
115116
mutation: ExportTargetMutation,
116117
}
117118

119+
struct PrecommitData<'a> {
120+
scope_value: &'a ScopeValueBuilder,
121+
memoization_info: &'a MemoizationInfo,
122+
}
118123
struct PrecommitMetadata {
119124
source_entry_exists: bool,
120125
process_ordinal: i64,
@@ -130,10 +135,9 @@ async fn precommit_source_tracking_info(
130135
source_id: i32,
131136
source_key_json: &serde_json::Value,
132137
source_ordinal: Option<i64>,
133-
memoization_info: serde_json::Value,
138+
data: Option<PrecommitData<'_>>,
134139
process_timestamp: &chrono::DateTime<chrono::Utc>,
135140
db_setup: &db_tracking_setup::TrackingTableSetupState,
136-
scope_value: &Option<ScopeValueBuilder>,
137141
export_ops: &[AnalyzedExportOp],
138142
pool: &PgPool,
139143
) -> Result<WithApplyStatus<PrecommitOutput>> {
@@ -199,9 +203,9 @@ async fn precommit_source_tracking_info(
199203
}
200204

201205
let mut new_target_keys_info = db_tracking::TrackedTargetKeyForSource::default();
202-
if let Some(scope_value) = scope_value {
206+
if let Some(data) = &data {
203207
for export_op in export_ops.iter() {
204-
let collected_values = scope_value.collected_values
208+
let collected_values = data.scope_value.collected_values
205209
[export_op.input.collector_idx as usize]
206210
.lock()
207211
.unwrap();
@@ -304,7 +308,7 @@ async fn precommit_source_tracking_info(
304308
source_key_json,
305309
process_ordinal,
306310
new_staging_target_keys,
307-
memoization_info,
311+
data.as_ref().map(|data| data.memoization_info),
308312
db_setup,
309313
&mut *txn,
310314
if tracking_info_exists {
@@ -319,7 +323,7 @@ async fn precommit_source_tracking_info(
319323

320324
Ok(WithApplyStatus::Normal(PrecommitOutput {
321325
metadata: PrecommitMetadata {
322-
source_entry_exists: scope_value.is_some(),
326+
source_entry_exists: data.is_some(),
323327
process_ordinal,
324328
existing_process_ordinal,
325329
new_target_keys: new_target_keys_info,
@@ -438,26 +442,51 @@ pub async fn update_source_entry<'a>(
438442
pool,
439443
)
440444
.await?;
441-
let scope_value = evaluate_source_entry(plan, source_op_idx, schema, key).await?;
445+
let already_exists = existing_tracking_info.is_some();
446+
let memoization_info = existing_tracking_info
447+
.map(|info| info.memoization_info.map(|info| info.0))
448+
.flatten();
449+
let evaluation_cache = memoization_info
450+
.map(|info| EvaluationCache::from_stored(info.cache))
451+
.unwrap_or_default();
452+
let value_builder =
453+
evaluate_source_entry(plan, source_op_idx, schema, key, Some(&evaluation_cache)).await?;
442454

443455
// Didn't exist and still doesn't exist. No need to apply any changes.
444-
if existing_tracking_info.is_none() && scope_value.is_none() {
456+
if !already_exists && value_builder.is_none() {
445457
return Ok(());
446458
}
447459

448-
// TODO: Generate the actual source ordinal and memoization info.
449-
let source_ordinal: Option<i64> = if scope_value.is_some() { Some(1) } else { None };
450-
let memoization_info = serde_json::Value::Null;
460+
let memoization_info = MemoizationInfo {
461+
cache: evaluation_cache.into_stored()?,
462+
};
463+
let (source_ordinal, precommit_data) = match &value_builder {
464+
Some(scope_value) => {
465+
(
466+
// TODO: Generate the actual source ordinal.
467+
Some(1),
468+
Some(PrecommitData {
469+
scope_value,
470+
memoization_info: &memoization_info,
471+
}),
472+
)
473+
}
474+
None => (None, None),
475+
};
476+
if value_builder.is_some() {
477+
Some(1)
478+
} else {
479+
None
480+
};
451481

452482
// Phase 2 (precommit): Update with the memoization info and stage target keys.
453483
let precommit_output = precommit_source_tracking_info(
454484
source_id,
455485
&source_key_json,
456486
source_ordinal,
457-
memoization_info,
487+
precommit_data,
458488
&process_timestamp,
459489
&plan.tracking_table_setup,
460-
&scope_value,
461490
&plan.export_ops,
462491
pool,
463492
)

0 commit comments

Comments
 (0)