Skip to content

Commit e85ffbd

Browse files
committed
Support auto UUID in StoredMemoizationInfo and EvaluationMemory.
1 parent 7644d1f commit e85ffbd

File tree

9 files changed

+252
-158
lines changed

9 files changed

+252
-158
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,4 @@ rustls = { version = "0.23.25" }
8989
http-body-util = "0.1.3"
9090
yaml-rust2 = "0.10.0"
9191
urlencoding = "2.1.3"
92-
uuid = "1.16.0"
92+
uuid = { version = "1.16.0", features = ["serde", "v4", "v8"] }

src/execution/db_tracking.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::{db_tracking_setup::TrackingTableSetupState, memoization::MemoizationInfo};
1+
use super::{db_tracking_setup::TrackingTableSetupState, memoization::StoredMemoizationInfo};
22
use crate::utils::{db::WriteAction, fingerprint::Fingerprint};
33
use anyhow::Result;
44
use sqlx::PgPool;
@@ -12,7 +12,7 @@ pub type TrackedTargetKeyForSource = Vec<(i32, Vec<TrackedTargetKey>)>;
1212
pub struct SourceTrackingInfo {
1313
pub max_process_ordinal: i64,
1414
pub staging_target_keys: sqlx::types::Json<TrackedTargetKeyForSource>,
15-
pub memoization_info: Option<sqlx::types::Json<Option<MemoizationInfo>>>,
15+
pub memoization_info: Option<sqlx::types::Json<Option<StoredMemoizationInfo>>>,
1616

1717
pub processed_source_ordinal: Option<i64>,
1818
pub process_logic_fingerprint: Option<Vec<u8>>,
@@ -74,7 +74,7 @@ pub async fn precommit_source_tracking_info(
7474
source_key_json: &serde_json::Value,
7575
max_process_ordinal: i64,
7676
staging_target_keys: TrackedTargetKeyForSource,
77-
memoization_info: Option<&MemoizationInfo>,
77+
memoization_info: Option<&StoredMemoizationInfo>,
7878
db_setup: &TrackingTableSetupState,
7979
db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
8080
action: WriteAction,

src/execution/dumper.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::path::{Path, PathBuf};
1111
use yaml_rust2::YamlEmitter;
1212

1313
use super::indexer;
14+
use super::memoization::EvaluationMemoryOptions;
1415
use crate::base::{schema, value};
1516
use crate::builder::plan::{AnalyzedSourceOp, ExecutionPlan};
1617
use crate::utils::yaml_ser::YamlSerializer;
@@ -73,18 +74,16 @@ impl<'a> Dumper<'a> {
7374
where
7475
'a: 'b,
7576
{
76-
let cache_option = if self.options.use_cache {
77-
indexer::EvaluationCacheOption::UseCache(self.pool)
78-
} else {
79-
indexer::EvaluationCacheOption::NoCache
80-
};
81-
82-
let data_builder = indexer::evaluate_source_entry_with_cache(
77+
let data_builder = indexer::evaluate_source_entry_with_memory(
8378
self.plan,
8479
source_op,
8580
self.schema,
8681
key,
87-
cache_option,
82+
EvaluationMemoryOptions {
83+
enable_cache: self.options.use_cache,
84+
evaluation_only: true,
85+
},
86+
self.pool,
8887
)
8988
.await?;
9089

src/execution/evaluator.rs

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
utils::immutable::RefList,
1212
};
1313

14-
use super::memoization::{evaluate_with_cell, EvaluationCache};
14+
use super::memoization::{evaluate_with_cell, EvaluationMemory, EvaluationMemoryOptions};
1515

1616
#[derive(Debug)]
1717
pub struct ScopeValueBuilder {
@@ -292,9 +292,9 @@ async fn evaluate_child_op_scope(
292292
op_scope: &AnalyzedOpScope,
293293
scoped_entries: RefList<'_, &ScopeEntry<'_>>,
294294
child_scope_entry: ScopeEntry<'_>,
295-
cache: Option<&EvaluationCache>,
295+
memory: &EvaluationMemory,
296296
) -> Result<()> {
297-
evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), cache)
297+
evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), memory)
298298
.await
299299
.with_context(|| {
300300
format!(
@@ -310,30 +310,25 @@ async fn evaluate_child_op_scope(
310310
async fn evaluate_op_scope(
311311
op_scope: &AnalyzedOpScope,
312312
scoped_entries: RefList<'_, &ScopeEntry<'_>>,
313-
cache: Option<&EvaluationCache>,
313+
memory: &EvaluationMemory,
314314
) -> Result<()> {
315315
let head_scope = *scoped_entries.head().unwrap();
316316
for reactive_op in op_scope.reactive_ops.iter() {
317317
match reactive_op {
318318
AnalyzedReactiveOp::Transform(op) => {
319319
let input_values = assemble_input_values(&op.inputs, scoped_entries);
320-
321-
let output_value_cell = match (op.function_exec_info.enable_cache, cache) {
322-
(true, Some(cache)) => {
323-
let key = op
320+
let output_value_cell = memory.get_cache_entry(
321+
|| {
322+
Ok(op
324323
.function_exec_info
325324
.fingerprinter
326325
.clone()
327326
.with(&input_values)?
328-
.into_fingerprint();
329-
Some(cache.get(
330-
key,
331-
&op.function_exec_info.output_type,
332-
/*ttl=*/ None,
333-
)?)
334-
}
335-
_ => None,
336-
};
327+
.into_fingerprint())
328+
},
329+
&op.function_exec_info.output_type,
330+
/*ttl=*/ None,
331+
)?;
337332
let output_value = evaluate_with_cell(output_value_cell.as_ref(), move || {
338333
op.executor.evaluate(input_values)
339334
})
@@ -362,7 +357,7 @@ async fn evaluate_op_scope(
362357
value: item,
363358
schema: &collection_schema.row,
364359
},
365-
cache,
360+
memory,
366361
)
367362
})
368363
.collect::<Vec<_>>(),
@@ -377,7 +372,7 @@ async fn evaluate_op_scope(
377372
value: v,
378373
schema: &collection_schema.row,
379374
},
380-
cache,
375+
memory,
381376
)
382377
})
383378
.collect::<Vec<_>>(),
@@ -393,7 +388,7 @@ async fn evaluate_op_scope(
393388
value: item,
394389
schema: &collection_schema.row,
395390
},
396-
cache,
391+
memory,
397392
)
398393
})
399394
.collect::<Vec<_>>(),
@@ -431,7 +426,7 @@ pub async fn evaluate_source_entry(
431426
source_op: &AnalyzedSourceOp,
432427
schema: &schema::DataSchema,
433428
key: &value::KeyValue,
434-
cache: Option<&EvaluationCache>,
429+
memory: &EvaluationMemory,
435430
) -> Result<Option<ScopeValueBuilder>> {
436431
let root_schema = &schema.schema;
437432
let root_scope_value =
@@ -464,7 +459,7 @@ pub async fn evaluate_source_entry(
464459
evaluate_op_scope(
465460
&plan.op_scope,
466461
RefList::Nil.prepend(&root_scope_entry),
467-
cache,
462+
memory,
468463
)
469464
.await?;
470465
Some(root_scope_value)
@@ -497,10 +492,18 @@ pub async fn evaluate_transient_flow(
497492
for (field, value) in flow.execution_plan.input_fields.iter().zip(input_values) {
498493
root_scope_entry.define_field(field, value)?;
499494
}
495+
let eval_memory = EvaluationMemory::new(
496+
chrono::Utc::now(),
497+
None,
498+
EvaluationMemoryOptions {
499+
enable_cache: false,
500+
evaluation_only: true,
501+
},
502+
);
500503
evaluate_op_scope(
501504
&flow.execution_plan.op_scope,
502505
RefList::Nil.prepend(&root_scope_entry),
503-
None,
506+
&eval_memory,
504507
)
505508
.await?;
506509
let output_value = assemble_value(

src/execution/indexer.rs

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
99

1010
use super::db_tracking::{self, read_source_tracking_info, TrackedTargetKey};
1111
use super::db_tracking_setup;
12-
use super::memoization::{EvaluationCache, MemoizationInfo};
12+
use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, StoredMemoizationInfo};
1313
use crate::base::schema;
1414
use crate::base::value::{self, FieldValues, KeyValue};
1515
use crate::builder::plan::*;
@@ -118,7 +118,7 @@ struct TrackingInfoForTarget<'a> {
118118
#[derive(Debug)]
119119
struct PrecommitData<'a> {
120120
scope_value: &'a ScopeValueBuilder,
121-
memoization_info: &'a MemoizationInfo,
121+
memoization_info: &'a StoredMemoizationInfo,
122122
}
123123
struct PrecommitMetadata {
124124
source_entry_exists: bool,
@@ -417,40 +417,31 @@ async fn commit_source_tracking_info(
417417
Ok(WithApplyStatus::Normal(()))
418418
}
419419

420-
pub enum EvaluationCacheOption<'a> {
421-
NoCache,
422-
UseCache(&'a PgPool),
423-
}
424-
425-
pub async fn evaluate_source_entry_with_cache(
420+
pub async fn evaluate_source_entry_with_memory(
426421
plan: &ExecutionPlan,
427422
source_op: &AnalyzedSourceOp,
428423
schema: &schema::DataSchema,
429424
key: &value::KeyValue,
430-
cache_option: EvaluationCacheOption<'_>,
425+
options: EvaluationMemoryOptions,
426+
pool: &PgPool,
431427
) -> Result<Option<ScopeValueBuilder>> {
432-
let cache = match cache_option {
433-
EvaluationCacheOption::NoCache => None,
434-
EvaluationCacheOption::UseCache(pool) => {
435-
let source_key_json = serde_json::to_value(key)?;
436-
let existing_tracking_info = read_source_tracking_info(
437-
source_op.source_id,
438-
&source_key_json,
439-
&plan.tracking_table_setup,
440-
pool,
441-
)
442-
.await?;
443-
let process_timestamp = chrono::Utc::now();
444-
let memoization_info = existing_tracking_info
445-
.and_then(|info| info.memoization_info.map(|info| info.0))
446-
.flatten();
447-
Some(EvaluationCache::new(
448-
process_timestamp,
449-
memoization_info.map(|info| info.cache),
450-
))
451-
}
428+
let stored_info = if options.enable_cache || !options.evaluation_only {
429+
let source_key_json = serde_json::to_value(key)?;
430+
let existing_tracking_info = read_source_tracking_info(
431+
source_op.source_id,
432+
&source_key_json,
433+
&plan.tracking_table_setup,
434+
pool,
435+
)
436+
.await?;
437+
existing_tracking_info
438+
.and_then(|info| info.memoization_info.map(|info| info.0))
439+
.flatten()
440+
} else {
441+
None
452442
};
453-
let data_builder = evaluate_source_entry(plan, source_op, schema, key, cache.as_ref()).await?;
443+
let memory = EvaluationMemory::new(chrono::Utc::now(), stored_info, options);
444+
let data_builder = evaluate_source_entry(plan, source_op, schema, key, &memory).await?;
454445
Ok(data_builder)
455446
}
456447

@@ -480,10 +471,16 @@ pub async fn update_source_entry(
480471
let memoization_info = existing_tracking_info
481472
.and_then(|info| info.memoization_info.map(|info| info.0))
482473
.flatten();
483-
let evaluation_cache =
484-
EvaluationCache::new(process_timestamp, memoization_info.map(|info| info.cache));
474+
let evaluation_memory = EvaluationMemory::new(
475+
process_timestamp,
476+
memoization_info,
477+
EvaluationMemoryOptions {
478+
enable_cache: true,
479+
evaluation_only: false,
480+
},
481+
);
485482
let value_builder = if !only_for_deletion {
486-
evaluate_source_entry(plan, source_op, schema, key, Some(&evaluation_cache)).await?
483+
evaluate_source_entry(plan, source_op, schema, key, &evaluation_memory).await?
487484
} else {
488485
None
489486
};
@@ -501,9 +498,7 @@ pub async fn update_source_entry(
501498
return Ok(());
502499
}
503500

504-
let memoization_info = MemoizationInfo {
505-
cache: evaluation_cache.into_stored()?,
506-
};
501+
let memoization_info = evaluation_memory.into_stored()?;
507502
let (source_ordinal, precommit_data) = match &value_builder {
508503
Some(scope_value) => {
509504
(

0 commit comments

Comments
 (0)