diff --git a/Cargo.toml b/Cargo.toml index ed2690e1e..9a7bc252a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,4 +89,4 @@ rustls = { version = "0.23.25" } http-body-util = "0.1.3" yaml-rust2 = "0.10.0" urlencoding = "2.1.3" -uuid = "1.16.0" +uuid = { version = "1.16.0", features = ["serde", "v4", "v8"] } diff --git a/src/base/schema.rs b/src/base/schema.rs index de10e6ab8..2a4f45c08 100644 --- a/src/base/schema.rs +++ b/src/base/schema.rs @@ -125,7 +125,7 @@ pub struct CollectionSchema { pub row: StructSchema, #[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")] - pub collectors: Vec>, + pub collectors: Vec>>, } impl CollectionSchema { @@ -157,7 +157,7 @@ impl CollectionSchema { .iter() .map(|c| NamedSpec { name: c.name.clone(), - spec: c.spec.without_attrs(), + spec: Arc::from(c.spec.without_attrs()), }) .collect(), } @@ -339,13 +339,65 @@ impl std::fmt::Display for FieldSchema { } } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct CollectorSchema { + pub fields: Vec, + /// If specified, the collector will have an automatically generated UUID field with the given index. + pub auto_uuid_field_idx: Option, +} + +impl std::fmt::Display for CollectorSchema { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Collector(")?; + for (i, field) in self.fields.iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + write!(f, "{}", field)?; + } + write!(f, ")") + } +} + +impl CollectorSchema { + pub fn from_fields(fields: Vec, has_auto_uuid_field: bool) -> Self { + let mut fields = fields; + let auto_uuid_field_idx = if has_auto_uuid_field { + fields.insert( + 0, + FieldSchema::new( + "uuid".to_string(), + EnrichedValueType { + typ: ValueType::Basic(BasicValueType::Uuid), + nullable: false, + attrs: Default::default(), + }, + ), + ); + Some(0) + } else { + None + }; + Self { + fields, + auto_uuid_field_idx, + } + } + pub fn without_attrs(&self) -> Self { + Self { + fields: self.fields.iter().map(|f| f.without_attrs()).collect(), + auto_uuid_field_idx: self.auto_uuid_field_idx, + } + } +} + /// Top-level schema for a flow instance. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DataSchema { pub schema: StructSchema, #[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")] - pub collectors: Vec>, + pub collectors: Vec>>, } impl Deref for DataSchema { diff --git a/src/base/spec.rs b/src/base/spec.rs index 634f36705..8666793dd 100644 --- a/src/base/spec.rs +++ b/src/base/spec.rs @@ -181,9 +181,15 @@ pub struct ForEachOpSpec { /// Emit data to a given collector at the given scope. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CollectOpSpec { + /// Field values to be collected. pub input: StructMapping, + /// Scope for the collector. pub scope_name: ScopeName, + /// Name of the collector. pub collector_name: FieldName, + /// If specified, the collector will have an automatically generated UUID field with the given name. + /// The uuid will remain stable when collected input values remain unchanged. + pub auto_uuid_field: Option, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index a180ec379..8095d1112 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -179,7 +179,7 @@ fn try_make_common_value_type( ValueType::Basic(basic_type1.clone()) } (ValueType::Struct(struct_type1), ValueType::Struct(struct_type2)) => { - let common_schema = try_make_common_struct_schemas(struct_type1, struct_type2)?; + let common_schema = try_merge_struct_schemas(struct_type1, struct_type2)?; ValueType::Struct(common_schema) } (ValueType::Collection(collection_type1), ValueType::Collection(collection_type2)) => { @@ -190,7 +190,7 @@ fn try_make_common_value_type( collection_type2 ); } - let row = try_make_common_struct_schemas(&collection_type1.row, &collection_type2.row)?; + let row = try_merge_struct_schemas(&collection_type1.row, &collection_type2.row)?; if collection_type1.collectors.len() != collection_type2.collectors.len() { api_bail!( @@ -213,7 +213,7 @@ fn try_make_common_value_type( } let collector = NamedSpec { name: c1.name.clone(), - spec: try_make_common_struct_schemas(&c1.spec, &c2.spec)?, + spec: Arc::new(try_merge_collector_schemas(&c1.spec, &c2.spec)?), }; Ok(collector) }) @@ -258,26 +258,32 @@ fn try_make_common_value_type( }) } -fn try_make_common_struct_schemas( - schema1: &StructSchema, - schema2: &StructSchema, -) -> Result { - if schema1.fields.len() != schema2.fields.len() { +fn try_merge_fields_schemas( + schema1: &[FieldSchema], + schema2: &[FieldSchema], +) -> Result> { + if schema1.len() != schema2.len() { api_bail!( - "Structs are not compatible as they have different fields count:\n {}\n {}\n", - schema1, + "Fields are not compatible as they have different fields count:\n ({})\n ({})\n", + schema1 + .iter() + .map(|f| f.to_string()) + .collect::>() + .join(", "), schema2 + .iter() + .map(|f| f.to_string()) + .collect::>() + .join(", ") ); } - let mut result_fields = Vec::with_capacity(schema1.fields.len()); - for (field1, field2) in schema1.fields.iter().zip(schema2.fields.iter()) { + let mut result_fields = Vec::with_capacity(schema1.len()); + for (field1, field2) in schema1.iter().zip(schema2.iter()) { if field1.name != field2.name { api_bail!( - "Structs are not compatible as they have incompatible field names `{}` vs `{}`:\n {}\n {}\n", + "Structs are not compatible as they have incompatible field names `{}` vs `{}`", field1.name, - field2.name, - schema1, - schema2 + field2.name ); } result_fields.push(FieldSchema { @@ -285,8 +291,16 @@ fn try_make_common_struct_schemas( value_type: try_make_common_value_type(&field1.value_type, &field2.value_type)?, }); } + Ok(result_fields) +} + +fn try_merge_struct_schemas( + schema1: &StructSchema, + schema2: &StructSchema, +) -> Result { + let fields = try_merge_fields_schemas(&schema1.fields, &schema2.fields)?; Ok(StructSchema { - fields: Arc::new(result_fields), + fields: Arc::new(fields), description: schema1 .description .clone() @@ -294,36 +308,45 @@ fn try_make_common_struct_schemas( }) } +fn try_merge_collector_schemas( + schema1: &CollectorSchema, + schema2: &CollectorSchema, +) -> Result { + let fields = try_merge_fields_schemas(&schema1.fields, &schema2.fields)?; + Ok(CollectorSchema { + fields, + auto_uuid_field_idx: if schema1.auto_uuid_field_idx == schema2.auto_uuid_field_idx { + schema1.auto_uuid_field_idx + } else { + None + }, + }) +} + #[derive(Debug)] pub(super) struct CollectorBuilder { - pub schema: StructSchema, + pub schema: Arc, pub is_used: bool, } impl CollectorBuilder { - pub fn new(schema: StructSchema) -> Self { + pub fn new(schema: Arc) -> Self { Self { schema, is_used: false, } } - pub fn merge_schema(&mut self, schema: &StructSchema) -> Result<()> { + pub fn merge_schema(&mut self, schema: &CollectorSchema) -> Result<()> { if self.is_used { api_bail!("Collector is already used"); } - let common_schema = - try_make_common_struct_schemas(&self.schema, schema).with_context(|| { - format!( - "Collectors are sent with entries in incompatible schemas:\n {}\n {}\n", - self.schema, schema - ) - })?; - self.schema = common_schema; + let existing_schema = Arc::make_mut(&mut self.schema); + *existing_schema = try_merge_collector_schemas(existing_schema, schema)?; Ok(()) } - pub fn use_schema(&mut self) -> StructSchema { + pub fn use_schema(&mut self) -> Arc { self.is_used = true; self.schema.clone() } @@ -401,7 +424,7 @@ impl DataScopeBuilder { pub fn consume_collector( &self, collector_name: &FieldName, - ) -> Result<(AnalyzedLocalCollectorReference, StructSchema)> { + ) -> Result<(AnalyzedLocalCollectorReference, Arc)> { let mut collectors = self.collectors.lock().unwrap(); let (collector_idx, _, collector) = collectors .get_full_mut(collector_name) @@ -417,7 +440,7 @@ impl DataScopeBuilder { pub fn add_collector( &self, collector_name: FieldName, - schema: StructSchema, + schema: CollectorSchema, ) -> Result { let mut collectors = self.collectors.lock().unwrap(); let collector_idx = collectors.len() as u32; @@ -426,7 +449,7 @@ impl DataScopeBuilder { entry.get_mut().merge_schema(&schema)?; } indexmap::map::Entry::Vacant(entry) => { - entry.insert(CollectorBuilder::new(schema)); + entry.insert(CollectorBuilder::new(Arc::new(schema))); } } Ok(AnalyzedLocalCollectorReference { collector_idx }) @@ -474,7 +497,7 @@ fn find_scope<'a>( fn analyze_struct_mapping( mapping: &StructMapping, scopes: RefList<'_, &'_ ExecutionScope<'_>>, -) -> Result<(AnalyzedStructMapping, StructSchema)> { +) -> Result<(AnalyzedStructMapping, Vec)> { let mut field_mappings = Vec::with_capacity(mapping.fields.len()); let mut field_schemas = Vec::with_capacity(mapping.fields.len()); for field in mapping.fields.iter() { @@ -489,10 +512,7 @@ fn analyze_struct_mapping( AnalyzedStructMapping { fields: field_mappings, }, - StructSchema { - fields: Arc::new(field_schemas), - description: None, - }, + field_schemas, )) } @@ -523,11 +543,14 @@ fn analyze_value_mapping( } ValueMapping::Struct(v) => { - let (struct_mapping, struct_schema) = analyze_struct_mapping(v, scopes)?; + let (struct_mapping, field_schemas) = analyze_struct_mapping(v, scopes)?; ( AnalyzedValueMapping::Struct(struct_mapping), EnrichedValueType { - typ: ValueType::Struct(struct_schema), + typ: ValueType::Struct(StructSchema { + fields: Arc::new(field_schemas), + description: None, + }), nullable: false, attrs: Default::default(), }, @@ -556,7 +579,7 @@ fn analyze_input_fields( fn add_collector( scope_name: &ScopeName, collector_name: FieldName, - schema: StructSchema, + schema: CollectorSchema, scopes: RefList<'_, &'_ ExecutionScope<'_>>, ) -> Result { let (scope_up_level, scope) = find_scope(scope_name, scopes)?; @@ -766,22 +789,22 @@ impl AnalyzerContext<'_> { ReactiveOpSpec::Collect(op) => { let scopes = parent_scopes.prepend(scope); - let (struct_mapping, struct_schema) = analyze_struct_mapping(&op.input, scopes)?; - let collector_ref = add_collector( - &op.scope_name, - op.collector_name.clone(), - struct_schema, - scopes, - )?; - let op_name = reactive_op.name.clone(); - async move { - Ok(AnalyzedReactiveOp::Collect(AnalyzedCollectOp { - name: op_name, - input: struct_mapping, - collector_ref, - })) - } - .boxed() + let (struct_mapping, fields_schema) = analyze_struct_mapping(&op.input, scopes)?; + let has_auto_uuid_field = op.auto_uuid_field.is_some(); + let fingerprinter = Fingerprinter::default().with(&fields_schema)?; + let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp { + name: reactive_op.name.clone(), + has_auto_uuid_field, + input: struct_mapping, + collector_ref: add_collector( + &op.scope_name, + op.collector_name.clone(), + CollectorSchema::from_fields(fields_schema, has_auto_uuid_field), + scopes, + )?, + fingerprinter, + }); + async move { Ok(collect_op) }.boxed() } }; Ok(result_fut) diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index f984d7603..3e4361920 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -13,7 +13,7 @@ use super::analyzer::{ use crate::{ api_bail, base::{ - schema::{self, FieldSchema, StructSchema}, + schema::{self, CollectorSchema, FieldSchema}, spec::{self, FieldName, NamedSpec}, }, get_lib_context, @@ -514,13 +514,16 @@ impl FlowBuilder { .into_py_result() } + #[pyo3(signature = (collector, fields, auto_uuid_field=None))] pub fn collect( &mut self, collector: &DataCollector, fields: Vec<(FieldName, DataSlice)>, + auto_uuid_field: Option, ) -> PyResult<()> { let common_scope = Self::minimum_common_scope(fields.iter().map(|(_, ds)| &ds.scope), None) .into_py_result()?; + let has_auto_uuid_field = auto_uuid_field.is_some(); let name = format!(".collect.{}", self.next_generated_op_id); self.next_generated_op_id += 1; self.do_in_scope( @@ -540,6 +543,7 @@ impl FlowBuilder { }, scope_name: collector.scope.scope_name.clone(), collector_name: collector.name.clone(), + auto_uuid_field, }), }; @@ -553,25 +557,22 @@ impl FlowBuilder { ) .into_py_result()?; - let struct_schema = StructSchema { - fields: Arc::new( - fields - .into_iter() - .map(|(name, ds)| FieldSchema { - name, - value_type: ds.data_type.schema, - }) - .collect(), - ), - description: None, - }; - + let collector_schema = CollectorSchema::from_fields( + fields + .into_iter() + .map(|(name, ds)| FieldSchema { + name, + value_type: ds.data_type.schema, + }) + .collect(), + has_auto_uuid_field, + ); { let mut collector = collector.collector.lock().unwrap(); if let Some(collector) = collector.as_mut() { - collector.merge_schema(&struct_schema).into_py_result()?; + collector.merge_schema(&collector_schema).into_py_result()?; } else { - *collector = Some(CollectorBuilder::new(struct_schema)); + *collector = Some(CollectorBuilder::new(Arc::new(collector_schema))); } } diff --git a/src/builder/plan.rs b/src/builder/plan.rs index de45cfa78..8b7802210 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -88,8 +88,11 @@ pub struct AnalyzedForEachOp { pub struct AnalyzedCollectOp { pub name: String, + pub has_auto_uuid_field: bool, pub input: AnalyzedStructMapping, pub collector_ref: AnalyzedCollectorReference, + /// Fingerprinter of the collector's schema. Used to decide when to reuse auto-generated UUIDs. + pub fingerprinter: Fingerprinter, } pub enum AnalyzedPrimaryKeyDef { diff --git a/src/execution/db_tracking.rs b/src/execution/db_tracking.rs index c267b7fcc..00d14cc67 100644 --- a/src/execution/db_tracking.rs +++ b/src/execution/db_tracking.rs @@ -1,16 +1,18 @@ -use super::{db_tracking_setup::TrackingTableSetupState, memoization::MemoizationInfo}; +use super::{db_tracking_setup::TrackingTableSetupState, memoization::StoredMemoizationInfo}; use crate::utils::{db::WriteAction, fingerprint::Fingerprint}; use anyhow::Result; use sqlx::PgPool; +/// (target_key, process_ordinal, fingerprint) pub type TrackedTargetKey = (serde_json::Value, i64, Option); +/// (source_id, target_key) pub type TrackedTargetKeyForSource = Vec<(i32, Vec)>; #[derive(sqlx::FromRow, Debug)] pub struct SourceTrackingInfo { pub max_process_ordinal: i64, pub staging_target_keys: sqlx::types::Json, - pub memoization_info: Option>>, + pub memoization_info: Option>>, pub processed_source_ordinal: Option, pub process_logic_fingerprint: Option>, @@ -72,7 +74,7 @@ pub async fn precommit_source_tracking_info( source_key_json: &serde_json::Value, max_process_ordinal: i64, staging_target_keys: TrackedTargetKeyForSource, - memoization_info: Option<&MemoizationInfo>, + memoization_info: Option<&StoredMemoizationInfo>, db_setup: &TrackingTableSetupState, db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>, action: WriteAction, diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs index 10298b506..39c78322b 100644 --- a/src/execution/dumper.rs +++ b/src/execution/dumper.rs @@ -11,6 +11,7 @@ use std::path::{Path, PathBuf}; use yaml_rust2::YamlEmitter; use super::indexer; +use super::memoization::EvaluationMemoryOptions; use crate::base::{schema, value}; use crate::builder::plan::{AnalyzedSourceOp, ExecutionPlan}; use crate::utils::yaml_ser::YamlSerializer; @@ -73,18 +74,16 @@ impl<'a> Dumper<'a> { where 'a: 'b, { - let cache_option = if self.options.use_cache { - indexer::EvaluationCacheOption::UseCache(self.pool) - } else { - indexer::EvaluationCacheOption::NoCache - }; - - let data_builder = indexer::evaluate_source_entry_with_cache( + let data_builder = indexer::evaluate_source_entry_with_memory( self.plan, source_op, self.schema, key, - cache_option, + EvaluationMemoryOptions { + enable_cache: self.options.use_cache, + evaluation_only: true, + }, + self.pool, ) .await?; diff --git a/src/execution/evaluator.rs b/src/execution/evaluator.rs index e17c208a7..d808ce178 100644 --- a/src/execution/evaluator.rs +++ b/src/execution/evaluator.rs @@ -11,7 +11,7 @@ use crate::{ utils::immutable::RefList, }; -use super::memoization::{evaluate_with_cell, EvaluationCache}; +use super::memoization::{evaluate_with_cell, EvaluationMemory, EvaluationMemoryOptions}; #[derive(Debug)] pub struct ScopeValueBuilder { @@ -278,23 +278,22 @@ fn assemble_value( } } -fn assemble_input_values( - value_mappings: &[AnalyzedValueMapping], - scoped_entries: RefList<'_, &ScopeEntry<'_>>, -) -> Vec { +fn assemble_input_values<'a>( + value_mappings: &'a [AnalyzedValueMapping], + scoped_entries: RefList<'a, &ScopeEntry<'a>>, +) -> impl Iterator + 'a { value_mappings .iter() - .map(|value_mapping| assemble_value(value_mapping, scoped_entries)) - .collect() + .map(move |value_mapping| assemble_value(value_mapping, scoped_entries)) } async fn evaluate_child_op_scope( op_scope: &AnalyzedOpScope, scoped_entries: RefList<'_, &ScopeEntry<'_>>, child_scope_entry: ScopeEntry<'_>, - cache: Option<&EvaluationCache>, + memory: &EvaluationMemory, ) -> Result<()> { - evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), cache) + evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), memory) .await .with_context(|| { format!( @@ -310,30 +309,27 @@ async fn evaluate_child_op_scope( async fn evaluate_op_scope( op_scope: &AnalyzedOpScope, scoped_entries: RefList<'_, &ScopeEntry<'_>>, - cache: Option<&EvaluationCache>, + memory: &EvaluationMemory, ) -> Result<()> { let head_scope = *scoped_entries.head().unwrap(); for reactive_op in op_scope.reactive_ops.iter() { match reactive_op { AnalyzedReactiveOp::Transform(op) => { - let input_values = assemble_input_values(&op.inputs, scoped_entries); - - let output_value_cell = match (op.function_exec_info.enable_cache, cache) { - (true, Some(cache)) => { - let key = op + let mut input_values = Vec::with_capacity(op.inputs.len()); + input_values + .extend(assemble_input_values(&op.inputs, scoped_entries).collect::>()); + let output_value_cell = memory.get_cache_entry( + || { + Ok(op .function_exec_info .fingerprinter .clone() .with(&input_values)? - .into_fingerprint(); - Some(cache.get( - key, - &op.function_exec_info.output_type, - /*ttl=*/ None, - )?) - } - _ => None, - }; + .into_fingerprint()) + }, + &op.function_exec_info.output_type, + /*ttl=*/ None, + )?; let output_value = evaluate_with_cell(output_value_cell.as_ref(), move || { op.executor.evaluate(input_values) }) @@ -362,7 +358,7 @@ async fn evaluate_op_scope( value: item, schema: &collection_schema.row, }, - cache, + memory, ) }) .collect::>(), @@ -377,7 +373,7 @@ async fn evaluate_op_scope( value: v, schema: &collection_schema.row, }, - cache, + memory, ) }) .collect::>(), @@ -393,7 +389,7 @@ async fn evaluate_op_scope( value: item, schema: &collection_schema.row, }, - cache, + memory, ) }) .collect::>(), @@ -407,7 +403,23 @@ async fn evaluate_op_scope( } AnalyzedReactiveOp::Collect(op) => { - let field_values = assemble_input_values(&op.input.fields, scoped_entries); + let mut field_values = Vec::with_capacity( + op.input.fields.len() + if op.has_auto_uuid_field { 1 } else { 0 }, + ); + let field_values_iter = assemble_input_values(&op.input.fields, scoped_entries); + if op.has_auto_uuid_field { + field_values.push(value::Value::Null); + field_values.extend(field_values_iter); + let uuid = memory.next_uuid( + op.fingerprinter + .clone() + .with(&field_values[1..])? + .into_fingerprint(), + )?; + field_values[0] = value::Value::Basic(value::BasicValue::Uuid(uuid)); + } else { + field_values.extend(field_values_iter); + }; let collector_entry = scoped_entries .headn(op.collector_ref.scope_up_level as usize) .unwrap(); @@ -431,7 +443,7 @@ pub async fn evaluate_source_entry( source_op: &AnalyzedSourceOp, schema: &schema::DataSchema, key: &value::KeyValue, - cache: Option<&EvaluationCache>, + memory: &EvaluationMemory, ) -> Result> { let root_schema = &schema.schema; let root_scope_value = @@ -464,7 +476,7 @@ pub async fn evaluate_source_entry( evaluate_op_scope( &plan.op_scope, RefList::Nil.prepend(&root_scope_entry), - cache, + memory, ) .await?; Some(root_scope_value) @@ -497,10 +509,18 @@ pub async fn evaluate_transient_flow( for (field, value) in flow.execution_plan.input_fields.iter().zip(input_values) { root_scope_entry.define_field(field, value)?; } + let eval_memory = EvaluationMemory::new( + chrono::Utc::now(), + None, + EvaluationMemoryOptions { + enable_cache: false, + evaluation_only: true, + }, + ); evaluate_op_scope( &flow.execution_plan.op_scope, RefList::Nil.prepend(&root_scope_entry), - None, + &eval_memory, ) .await?; let output_value = assemble_value( diff --git a/src/execution/indexer.rs b/src/execution/indexer.rs index 525a42eda..ef42bc400 100644 --- a/src/execution/indexer.rs +++ b/src/execution/indexer.rs @@ -9,7 +9,7 @@ use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; use super::db_tracking::{self, read_source_tracking_info, TrackedTargetKey}; use super::db_tracking_setup; -use super::memoization::{EvaluationCache, MemoizationInfo}; +use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, StoredMemoizationInfo}; use crate::base::schema; use crate::base::value::{self, FieldValues, KeyValue}; use crate::builder::plan::*; @@ -118,7 +118,7 @@ struct TrackingInfoForTarget<'a> { #[derive(Debug)] struct PrecommitData<'a> { scope_value: &'a ScopeValueBuilder, - memoization_info: &'a MemoizationInfo, + memoization_info: &'a StoredMemoizationInfo, } struct PrecommitMetadata { source_entry_exists: bool, @@ -417,40 +417,31 @@ async fn commit_source_tracking_info( Ok(WithApplyStatus::Normal(())) } -pub enum EvaluationCacheOption<'a> { - NoCache, - UseCache(&'a PgPool), -} - -pub async fn evaluate_source_entry_with_cache( +pub async fn evaluate_source_entry_with_memory( plan: &ExecutionPlan, source_op: &AnalyzedSourceOp, schema: &schema::DataSchema, key: &value::KeyValue, - cache_option: EvaluationCacheOption<'_>, + options: EvaluationMemoryOptions, + pool: &PgPool, ) -> Result> { - let cache = match cache_option { - EvaluationCacheOption::NoCache => None, - EvaluationCacheOption::UseCache(pool) => { - let source_key_json = serde_json::to_value(key)?; - let existing_tracking_info = read_source_tracking_info( - source_op.source_id, - &source_key_json, - &plan.tracking_table_setup, - pool, - ) - .await?; - let process_timestamp = chrono::Utc::now(); - let memoization_info = existing_tracking_info - .and_then(|info| info.memoization_info.map(|info| info.0)) - .flatten(); - Some(EvaluationCache::new( - process_timestamp, - memoization_info.map(|info| info.cache), - )) - } + let stored_info = if options.enable_cache || !options.evaluation_only { + let source_key_json = serde_json::to_value(key)?; + let existing_tracking_info = read_source_tracking_info( + source_op.source_id, + &source_key_json, + &plan.tracking_table_setup, + pool, + ) + .await?; + existing_tracking_info + .and_then(|info| info.memoization_info.map(|info| info.0)) + .flatten() + } else { + None }; - let data_builder = evaluate_source_entry(plan, source_op, schema, key, cache.as_ref()).await?; + let memory = EvaluationMemory::new(chrono::Utc::now(), stored_info, options); + let data_builder = evaluate_source_entry(plan, source_op, schema, key, &memory).await?; Ok(data_builder) } @@ -480,10 +471,16 @@ pub async fn update_source_entry( let memoization_info = existing_tracking_info .and_then(|info| info.memoization_info.map(|info| info.0)) .flatten(); - let evaluation_cache = - EvaluationCache::new(process_timestamp, memoization_info.map(|info| info.cache)); + let evaluation_memory = EvaluationMemory::new( + process_timestamp, + memoization_info, + EvaluationMemoryOptions { + enable_cache: true, + evaluation_only: false, + }, + ); let value_builder = if !only_for_deletion { - evaluate_source_entry(plan, source_op, schema, key, Some(&evaluation_cache)).await? + evaluate_source_entry(plan, source_op, schema, key, &evaluation_memory).await? } else { None }; @@ -501,9 +498,7 @@ pub async fn update_source_entry( return Ok(()); } - let memoization_info = MemoizationInfo { - cache: evaluation_cache.into_stored()?, - }; + let memoization_info = evaluation_memory.into_stored()?; let (source_ordinal, precommit_data) = match &value_builder { Some(scope_value) => { ( diff --git a/src/execution/memoization.rs b/src/execution/memoization.rs index b32b6f593..b51d75124 100644 --- a/src/execution/memoization.rs +++ b/src/execution/memoization.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{bail, Result}; use serde::{Deserialize, Serialize}; use std::{ borrow::Cow, @@ -10,126 +10,218 @@ use std::{ use crate::{ base::{schema, value}, service::error::{SharedError, SharedResultExtRef}, - utils::fingerprint::Fingerprint, + utils::fingerprint::{Fingerprint, Fingerprinter}, }; #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CacheEntry { +pub struct StoredCacheEntry { time_sec: i64, value: serde_json::Value, } #[derive(Debug, Clone, Serialize, Deserialize, Default)] -pub struct MemoizationInfo { - pub cache: HashMap, -} +pub struct StoredMemoizationInfo { + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub cache: HashMap, -struct EvaluationCacheEntry { - time: chrono::DateTime, - data: EvaluationCacheData, + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub uuids: HashMap>, } pub type CacheEntryCell = Arc>>; -enum EvaluationCacheData { +enum CacheData { /// Existing entry in previous runs, but not in current run yet. Previous(serde_json::Value), /// Value appeared in current run. Current(CacheEntryCell), } -pub struct EvaluationCache { +struct CacheEntry { + time: chrono::DateTime, + data: CacheData, +} + +#[derive(Default)] +struct UuidEntry { + uuids: Vec, + num_current: usize, +} + +impl UuidEntry { + fn new(uuids: Vec) -> Self { + Self { + uuids, + num_current: 0, + } + } + + fn into_stored(self) -> Option> { + if self.num_current == 0 { + return None; + } + let mut uuids = self.uuids; + if self.num_current < uuids.len() { + uuids.truncate(self.num_current); + } + Some(uuids) + } +} + +pub struct EvaluationMemoryOptions { + pub enable_cache: bool, + + /// If true, it's for evaluation only. + /// In this mode, we don't memoize anything. + pub evaluation_only: bool, +} + +pub struct EvaluationMemory { current_time: chrono::DateTime, - cache: Mutex>, + cache: Option>>, + uuids: Mutex>, + evaluation_only: bool, } -impl EvaluationCache { +impl EvaluationMemory { pub fn new( current_time: chrono::DateTime, - existing_cache: Option>, + stored_info: Option, + options: EvaluationMemoryOptions, ) -> Self { + let (stored_cache, stored_uuids) = stored_info + .map(|stored_info| (stored_info.cache, stored_info.uuids)) + .unzip(); Self { current_time, - cache: Mutex::new( - existing_cache + cache: options.enable_cache.then(|| { + Mutex::new( + stored_cache + .into_iter() + .flat_map(|iter| iter.into_iter()) + .map(|(k, e)| { + ( + k, + CacheEntry { + time: chrono::DateTime::from_timestamp(e.time_sec, 0) + .unwrap_or(chrono::DateTime::::MIN_UTC), + data: CacheData::Previous(e.value), + }, + ) + }) + .collect(), + ) + }), + uuids: Mutex::new( + (!options.evaluation_only) + .then(|| stored_uuids) + .flatten() .into_iter() - .flat_map(|e| e.into_iter()) - .map(|(k, e)| { - ( - k, - EvaluationCacheEntry { - time: chrono::DateTime::from_timestamp(e.time_sec, 0) - .unwrap_or(chrono::DateTime::::MIN_UTC), - data: EvaluationCacheData::Previous(e.value), - }, - ) - }) + .flat_map(|iter| iter.into_iter()) + .map(|(k, v)| (k, UuidEntry::new(v))) .collect(), ), + evaluation_only: options.evaluation_only, } } - pub fn into_stored(self) -> Result> { - Ok(self - .cache + pub fn into_stored(self) -> Result { + if self.evaluation_only { + bail!("For evaluation only, cannot convert to stored MemoizationInfo"); + } + let cache = if let Some(cache) = self.cache { + cache + .into_inner()? + .into_iter() + .filter_map(|(k, e)| match e.data { + CacheData::Previous(_) => None, + CacheData::Current(entry) => match entry.get() { + Some(Ok(v)) => Some(serde_json::to_value(v).map(|value| { + ( + k, + StoredCacheEntry { + time_sec: e.time.timestamp(), + value, + }, + ) + })), + _ => None, + }, + }) + .collect::>()? + } else { + bail!("Cache is disabled, cannot convert to stored MemoizationInfo"); + }; + let uuids = self + .uuids .into_inner()? .into_iter() - .filter_map(|(k, e)| match e.data { - EvaluationCacheData::Previous(_) => None, - EvaluationCacheData::Current(entry) => match entry.get() { - Some(Ok(v)) => Some(serde_json::to_value(v).map(|value| { - ( - k, - CacheEntry { - time_sec: e.time.timestamp(), - value, - }, - ) - })), - _ => None, - }, - }) - .collect::>()?) + .filter_map(|(k, v)| v.into_stored().map(|uuids| (k, uuids))) + .collect(); + Ok(StoredMemoizationInfo { cache, uuids }) } - pub fn get( + pub fn get_cache_entry( &self, - key: Fingerprint, + key: impl FnOnce() -> Result, typ: &schema::ValueType, ttl: Option, - ) -> Result { - let mut cache = self.cache.lock().unwrap(); - let result = { - match cache.entry(key) { - std::collections::hash_map::Entry::Occupied(mut entry) - if !ttl - .map(|ttl| entry.get().time + ttl < self.current_time) - .unwrap_or(false) => - { - let entry_mut = &mut entry.get_mut(); - match &mut entry_mut.data { - EvaluationCacheData::Previous(value) => { - let value = value::Value::from_json(std::mem::take(value), typ)?; - let cell = Arc::new(async_lock::OnceCell::from(Ok(value))); - let time = entry_mut.time; - entry.insert(EvaluationCacheEntry { - time, - data: EvaluationCacheData::Current(cell.clone()), - }); - cell - } - EvaluationCacheData::Current(cell) => cell.clone(), + ) -> Result> { + let mut cache = if let Some(cache) = &self.cache { + cache.lock().unwrap() + } else { + return Ok(None); + }; + let result = match cache.entry(key()?) { + std::collections::hash_map::Entry::Occupied(mut entry) + if !ttl + .map(|ttl| entry.get().time + ttl < self.current_time) + .unwrap_or(false) => + { + let entry_mut = &mut entry.get_mut(); + match &mut entry_mut.data { + CacheData::Previous(value) => { + let value = value::Value::from_json(std::mem::take(value), typ)?; + let cell = Arc::new(async_lock::OnceCell::from(Ok(value))); + let time = entry_mut.time; + entry.insert(CacheEntry { + time, + data: CacheData::Current(cell.clone()), + }); + cell } - } - entry => { - let cell = Arc::new(async_lock::OnceCell::new()); - entry.insert_entry(EvaluationCacheEntry { - time: self.current_time, - data: EvaluationCacheData::Current(cell.clone()), - }); - cell + CacheData::Current(cell) => cell.clone(), } } + entry => { + let cell = Arc::new(async_lock::OnceCell::new()); + entry.insert_entry(CacheEntry { + time: self.current_time, + data: CacheData::Current(cell.clone()), + }); + cell + } + }; + Ok(Some(result)) + } + + pub fn next_uuid(&self, key: Fingerprint) -> Result { + let mut uuids = self.uuids.lock().unwrap(); + + let entry = uuids.entry(key).or_default(); + let uuid = if self.evaluation_only { + let fp = Fingerprinter::default() + .with(&key)? + .with(&entry.num_current)? + .into_fingerprint(); + uuid::Uuid::new_v8(fp.0) + } else if entry.num_current < entry.uuids.len() { + entry.uuids[entry.num_current] + } else { + let uuid = uuid::Uuid::new_v4(); + entry.uuids.push(uuid); + uuid }; - Ok(result) + entry.num_current += 1; + Ok(uuid) } } diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 463cc43e5..5de6b5e74 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -1,9 +1,9 @@ -pub mod dumper; -pub mod evaluator; -pub mod indexer; -pub mod query; +pub(crate) mod dumper; +pub(crate) mod evaluator; +pub(crate) mod indexer; +pub(crate) mod query; mod db_tracking; pub mod db_tracking_setup; -mod memoization; +pub(crate) mod memoization; diff --git a/src/service/flows.rs b/src/service/flows.rs index fc055b785..7a35814d1 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -16,6 +16,7 @@ use crate::{ api_bail, api_error, base::{schema, spec}, execution::indexer, + execution::memoization, }; pub async fn list_flows( @@ -144,12 +145,16 @@ pub async fn evaluate_data( .ok_or_else(|| api_error!("field {} does not have a key", query.field))?; let key = value::KeyValue::from_strs(query.key, &key_field.value_type.typ)?; - let value_builder = indexer::evaluate_source_entry_with_cache( + let value_builder = indexer::evaluate_source_entry_with_memory( &plan, source_op, schema, &key, - indexer::EvaluationCacheOption::UseCache(&lib_context.pool), + memoization::EvaluationMemoryOptions { + enable_cache: true, + evaluation_only: true, + }, + &lib_context.pool, ) .await? .ok_or_else(|| api_error!("value not found for source at the specified key: {key:?}"))?; diff --git a/src/utils/fingerprint.rs b/src/utils/fingerprint.rs index ba5b42d7c..c6542872b 100644 --- a/src/utils/fingerprint.rs +++ b/src/utils/fingerprint.rs @@ -31,7 +31,7 @@ impl serde::ser::Error for FingerprinterError { } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct Fingerprint([u8; 16]); +pub struct Fingerprint(pub [u8; 16]); impl Fingerprint { pub fn to_base64(self) -> String { @@ -81,13 +81,13 @@ impl Fingerprinter { Fingerprint(self.hasher.finalize().into()) } - pub fn with(self, value: &S) -> Result { + pub fn with(self, value: &S) -> Result { let mut fingerprinter = self; value.serialize(&mut fingerprinter)?; Ok(fingerprinter) } - pub fn write(&mut self, value: &S) -> Result<(), FingerprinterError> { + pub fn write(&mut self, value: &S) -> Result<(), FingerprinterError> { value.serialize(self) }