diff --git a/src/execution/db_tracking.rs b/src/execution/db_tracking.rs index 141e09178..871a9201c 100644 --- a/src/execution/db_tracking.rs +++ b/src/execution/db_tracking.rs @@ -211,3 +211,28 @@ impl ListTrackedSourceKeyMetadataState { sqlx::query_as(&self.query_str).bind(source_id).fetch(pool) } } + +#[derive(sqlx::FromRow, Debug)] +pub struct SourceLastProcessedInfo { + pub processed_source_ordinal: Option, + pub process_logic_fingerprint: Option>, + pub process_time_micros: Option, +} + +pub async fn read_source_last_processed_info( + source_id: i32, + source_key_json: &serde_json::Value, + db_setup: &TrackingTableSetupState, + pool: &PgPool, +) -> Result> { + let query_str = format!( + "SELECT processed_source_ordinal, process_logic_fingerprint, process_time_micros FROM {} WHERE source_id = $1 AND source_key = $2", + db_setup.table_name + ); + let last_processed_info = sqlx::query_as(&query_str) + .bind(source_id) + .bind(source_key_json) + .fetch_optional(pool) + .await?; + Ok(last_processed_info) +} diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs index 592eb38b3..0120d5eb5 100644 --- a/src/execution/dumper.rs +++ b/src/execution/dumper.rs @@ -11,6 +11,7 @@ use std::collections::BTreeMap; use std::path::{Path, PathBuf}; use yaml_rust2::YamlEmitter; +use super::evaluator::SourceRowEvaluationContext; use super::memoization::EvaluationMemoryOptions; use super::row_indexer; use crate::base::{schema, value}; @@ -77,10 +78,12 @@ impl<'a> Dumper<'a> { 'a: 'b, { let data_builder = row_indexer::evaluate_source_entry_with_memory( - self.plan, - import_op, - self.schema, - key, + &SourceRowEvaluationContext { + plan: self.plan, + import_op, + schema: self.schema, + key, + }, EvaluationMemoryOptions { enable_cache: self.options.use_cache, evaluation_only: true, diff --git a/src/execution/evaluator.rs b/src/execution/evaluator.rs index dfba75713..9da0b8cdf 100644 --- a/src/execution/evaluator.rs +++ b/src/execution/evaluator.rs @@ -446,6 +446,13 @@ async fn evaluate_op_scope( Ok(()) } +pub struct SourceRowEvaluationContext<'a> { + pub plan: &'a ExecutionPlan, + pub import_op: &'a AnalyzedImportOp, + pub schema: &'a schema::FlowSchema, + pub key: &'a value::KeyValue, +} + #[derive(Debug)] pub struct EvaluateSourceEntryOutput { pub data_scope: ScopeValueBuilder, @@ -453,23 +460,20 @@ pub struct EvaluateSourceEntryOutput { } pub async fn evaluate_source_entry( - plan: &ExecutionPlan, - import_op: &AnalyzedImportOp, - schema: &schema::FlowSchema, - key: &value::KeyValue, + src_eval_ctx: &SourceRowEvaluationContext<'_>, source_value: value::FieldValues, memory: &EvaluationMemory, ) -> Result { - let root_schema = &schema.schema; + let root_schema = &src_eval_ctx.schema.schema; let root_scope_value = ScopeValueBuilder::new(root_schema.fields.len()); let root_scope_entry = ScopeEntry::new( ScopeKey::None, &root_scope_value, root_schema, - &plan.op_scope, + &src_eval_ctx.plan.op_scope, ); - let table_schema = match &root_schema.fields[import_op.output.field_idx as usize] + let table_schema = match &root_schema.fields[src_eval_ctx.import_op.output.field_idx as usize] .value_type .typ { @@ -482,12 +486,12 @@ pub async fn evaluate_source_entry( let scope_value = ScopeValueBuilder::augmented_from(&value::ScopeValue(source_value), table_schema)?; root_scope_entry.define_field_w_builder( - &import_op.output, - value::Value::KTable(BTreeMap::from([(key.clone(), scope_value)])), + &src_eval_ctx.import_op.output, + value::Value::KTable(BTreeMap::from([(src_eval_ctx.key.clone(), scope_value)])), ); evaluate_op_scope( - &plan.op_scope, + &src_eval_ctx.plan.op_scope, RefList::Nil.prepend(&root_scope_entry), memory, ) diff --git a/src/execution/indexing_status.rs b/src/execution/indexing_status.rs new file mode 100644 index 000000000..f6db8c21f --- /dev/null +++ b/src/execution/indexing_status.rs @@ -0,0 +1,59 @@ +use crate::prelude::*; + +use super::db_tracking; +use super::evaluator; +use futures::try_join; + +#[derive(Debug, Serialize)] +pub struct SourceRowLastProcessedInfo { + pub source_ordinal: Option, + pub processing_time: Option>, + pub is_logic_current: bool, +} + +#[derive(Debug, Serialize)] +pub struct SourceRowInfo { + pub ordinal: Option, +} + +#[derive(Debug, Serialize)] +pub struct SourceRowIndexingStatus { + pub last_processed: Option, + pub current: Option, +} + +pub async fn get_source_row_indexing_status( + src_eval_ctx: &evaluator::SourceRowEvaluationContext<'_>, + pool: &sqlx::PgPool, +) -> Result { + let source_key_json = serde_json::to_value(src_eval_ctx.key)?; + let last_processed_fut = db_tracking::read_source_last_processed_info( + src_eval_ctx.import_op.source_id, + &source_key_json, + &src_eval_ctx.plan.tracking_table_setup, + pool, + ); + let current_fut = src_eval_ctx.import_op.executor.get_value( + &src_eval_ctx.key, + &interface::SourceExecutorGetOptions { + include_value: false, + include_ordinal: true, + }, + ); + let (last_processed, current) = try_join!(last_processed_fut, current_fut)?; + + let last_processed = last_processed.map(|l| SourceRowLastProcessedInfo { + source_ordinal: l.processed_source_ordinal.map(interface::Ordinal), + processing_time: l + .process_time_micros + .map(chrono::DateTime::::from_timestamp_micros) + .flatten(), + is_logic_current: Some(src_eval_ctx.plan.logic_fingerprint.0.as_slice()) + == l.process_logic_fingerprint.as_ref().map(|b| b.as_slice()), + }); + let current = current.map(|c| SourceRowInfo { ordinal: c.ordinal }); + Ok(SourceRowIndexingStatus { + last_processed, + current, + }) +} diff --git a/src/execution/mod.rs b/src/execution/mod.rs index a4703e4cf..162945b56 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -1,6 +1,7 @@ pub(crate) mod db_tracking_setup; pub(crate) mod dumper; pub(crate) mod evaluator; +pub(crate) mod indexing_status; pub(crate) mod memoization; pub(crate) mod query; pub(crate) mod row_indexer; diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index 7254fc099..afb258bfe 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -6,11 +6,12 @@ use std::collections::{HashMap, HashSet}; use super::db_tracking::{self, read_source_tracking_info_for_processing, TrackedTargetKey}; use super::db_tracking_setup; -use super::evaluator::{evaluate_source_entry, EvaluateSourceEntryOutput}; +use super::evaluator::{ + evaluate_source_entry, EvaluateSourceEntryOutput, SourceRowEvaluationContext, +}; use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, StoredMemoizationInfo}; use super::stats; -use crate::base::schema; use crate::base::value::{self, FieldValues, KeyValue}; use crate::builder::plan::*; use crate::ops::interface::{ @@ -439,19 +440,16 @@ async fn commit_source_tracking_info( } pub async fn evaluate_source_entry_with_memory( - plan: &ExecutionPlan, - import_op: &AnalyzedImportOp, - schema: &schema::FlowSchema, - key: &value::KeyValue, + src_eval_ctx: &SourceRowEvaluationContext<'_>, options: EvaluationMemoryOptions, pool: &PgPool, ) -> Result> { let stored_info = if options.enable_cache || !options.evaluation_only { - let source_key_json = serde_json::to_value(key)?; + let source_key_json = serde_json::to_value(src_eval_ctx.key)?; let existing_tracking_info = read_source_tracking_info_for_processing( - import_op.source_id, + src_eval_ctx.import_op.source_id, &source_key_json, - &plan.tracking_table_setup, + &src_eval_ctx.plan.tracking_table_setup, pool, ) .await?; @@ -462,43 +460,42 @@ pub async fn evaluate_source_entry_with_memory( None }; let memory = EvaluationMemory::new(chrono::Utc::now(), stored_info, options); - let source_value = match import_op + let source_value = match src_eval_ctx + .import_op .executor .get_value( - key, + src_eval_ctx.key, &SourceExecutorGetOptions { include_value: true, include_ordinal: false, }, ) .await? - .value { - Some(d) => d, + Some(d) => d + .value + .ok_or_else(|| anyhow::anyhow!("value not returned"))?, None => return Ok(None), }; - let output = evaluate_source_entry(plan, import_op, schema, key, source_value, &memory).await?; + let output = evaluate_source_entry(src_eval_ctx, source_value, &memory).await?; Ok(Some(output)) } pub async fn update_source_row( - plan: &ExecutionPlan, - import_op: &AnalyzedImportOp, - schema: &schema::FlowSchema, - key: &value::KeyValue, + src_eval_ctx: &SourceRowEvaluationContext<'_>, source_value: Option, source_version: &SourceVersion, pool: &PgPool, update_stats: &stats::UpdateStats, ) -> Result> { - let source_key_json = serde_json::to_value(key)?; - let process_timestamp = chrono::Utc::now(); + let source_key_json = serde_json::to_value(src_eval_ctx.key)?; + let process_time = chrono::Utc::now(); // Phase 1: Evaluate with memoization info. let existing_tracking_info = read_source_tracking_info_for_processing( - import_op.source_id, + src_eval_ctx.import_op.source_id, &source_key_json, - &plan.tracking_table_setup, + &src_eval_ctx.plan.tracking_table_setup, pool, ) .await?; @@ -507,7 +504,7 @@ pub async fn update_source_row( let existing_version = SourceVersion::from_stored( info.processed_source_ordinal, &info.process_logic_fingerprint, - plan.logic_fingerprint, + src_eval_ctx.plan.logic_fingerprint, ); if existing_version.should_skip(source_version, Some(update_stats)) { return Ok(SkippedOr::Skipped(existing_version)); @@ -522,22 +519,15 @@ pub async fn update_source_row( let (output, stored_mem_info) = match source_value { Some(source_value) => { let evaluation_memory = EvaluationMemory::new( - process_timestamp, + process_time, memoization_info, EvaluationMemoryOptions { enable_cache: true, evaluation_only: false, }, ); - let output = evaluate_source_entry( - plan, - import_op, - schema, - key, - source_value, - &evaluation_memory, - ) - .await?; + let output = + evaluate_source_entry(src_eval_ctx, source_value, &evaluation_memory).await?; (Some(output), evaluation_memory.into_stored()?) } None => Default::default(), @@ -545,17 +535,17 @@ pub async fn update_source_row( // Phase 2 (precommit): Update with the memoization info and stage target keys. let precommit_output = precommit_source_tracking_info( - import_op.source_id, + src_eval_ctx.import_op.source_id, &source_key_json, source_version, - plan.logic_fingerprint, + src_eval_ctx.plan.logic_fingerprint, output.as_ref().map(|scope_value| PrecommitData { evaluate_output: scope_value, memoization_info: &stored_mem_info, }), - &process_timestamp, - &plan.tracking_table_setup, - &plan.export_ops, + &process_time, + &src_eval_ctx.plan.tracking_table_setup, + &src_eval_ctx.plan.export_ops, update_stats, pool, ) @@ -567,40 +557,44 @@ pub async fn update_source_row( // Phase 3: Apply changes to the target storage, including upserting new target records and removing existing ones. let mut target_mutations = precommit_output.target_mutations; - let apply_futs = plan.export_op_groups.iter().filter_map(|export_op_group| { - let mutations_w_ctx: Vec<_> = export_op_group - .op_idx - .iter() - .filter_map(|export_op_idx| { - let export_op = &plan.export_ops[*export_op_idx]; - target_mutations - .remove(&export_op.target_id) - .filter(|m| !m.is_empty()) - .map(|mutation| interface::ExportTargetMutationWithContext { - mutation, - export_context: export_op.export_context.as_ref(), - }) + let apply_futs = src_eval_ctx + .plan + .export_op_groups + .iter() + .filter_map(|export_op_group| { + let mutations_w_ctx: Vec<_> = export_op_group + .op_idx + .iter() + .filter_map(|export_op_idx| { + let export_op = &src_eval_ctx.plan.export_ops[*export_op_idx]; + target_mutations + .remove(&export_op.target_id) + .filter(|m| !m.is_empty()) + .map(|mutation| interface::ExportTargetMutationWithContext { + mutation, + export_context: export_op.export_context.as_ref(), + }) + }) + .collect(); + (!mutations_w_ctx.is_empty()).then(|| { + export_op_group + .target_factory + .apply_mutation(mutations_w_ctx) }) - .collect(); - (!mutations_w_ctx.is_empty()).then(|| { - export_op_group - .target_factory - .apply_mutation(mutations_w_ctx) - }) - }); + }); // TODO: Handle errors. try_join_all(apply_futs).await?; // Phase 4: Update the tracking record. commit_source_tracking_info( - import_op.source_id, + src_eval_ctx.import_op.source_id, &source_key_json, source_version, - &plan.logic_fingerprint.0, + &src_eval_ctx.plan.logic_fingerprint.0, precommit_output.metadata, - &process_timestamp, - &plan.tracking_table_setup, + &process_time, + &src_eval_ctx.plan.tracking_table_setup, pool, ) .await?; diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index f8ca472fa..671e0756e 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -6,6 +6,7 @@ use tokio::{sync::Semaphore, task::JoinSet}; use super::{ db_tracking, + evaluator::SourceRowEvaluationContext, row_indexer::{self, SkippedOr, SourceVersion, SourceVersionKind}, stats, }; @@ -111,14 +112,17 @@ impl SourceIndexingContext { }, ) .await? - .value + .map(|v| v.value) + .flatten() }; let schema = &self.flow.data_schema; let result = row_indexer::update_source_row( - &plan, - import_op, - schema, - &key, + &SourceRowEvaluationContext { + plan: &plan, + import_op, + schema, + key: &key, + }, source_value, &source_version, &pool, diff --git a/src/ops/interface.rs b/src/ops/interface.rs index fc19e2753..a4102e5af 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -16,7 +16,7 @@ pub struct FlowInstanceContext { pub py_exec_ctx: Option>, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)] pub struct Ordinal(pub i64); impl From for i64 { @@ -72,9 +72,9 @@ pub struct SourceExecutorGetOptions { pub include_value: bool, } -#[derive(Debug, Default)] +#[derive(Debug)] pub struct SourceValue { - // None if not exists, or not included in the option. + // None if not included in the option. pub value: Option, // None if unavailable, or not included in the option. pub ordinal: Option, @@ -93,7 +93,7 @@ pub trait SourceExecutor: Send + Sync { &self, key: &KeyValue, options: &SourceExecutorGetOptions, - ) -> Result; + ) -> Result>; async fn change_stream(&self) -> Result>> { Ok(None) diff --git a/src/ops/sources/google_drive.rs b/src/ops/sources/google_drive.rs index 979cdf849..ce13e3577 100644 --- a/src/ops/sources/google_drive.rs +++ b/src/ops/sources/google_drive.rs @@ -341,7 +341,7 @@ impl SourceExecutor for Executor { &self, key: &KeyValue, options: &SourceExecutorGetOptions, - ) -> Result { + ) -> Result> { let file_id = key.str_value()?; let fields = format!( "id,name,mimeType,trashed{}", @@ -359,7 +359,7 @@ impl SourceExecutor for Executor { let file = match resp { Some((_, file)) if file.trashed != Some(true) => file, _ => { - return Ok(SourceValue::default()); + return Ok(None); } }; let ordinal = if options.include_ordinal { @@ -415,7 +415,7 @@ impl SourceExecutor for Executor { } None => None, }; - Ok(SourceValue { value, ordinal }) + Ok(Some(SourceValue { value, ordinal })) } async fn change_stream(&self) -> Result>> { diff --git a/src/ops/sources/local_file.rs b/src/ops/sources/local_file.rs index 9c6875732..acdc61dd2 100644 --- a/src/ops/sources/local_file.rs +++ b/src/ops/sources/local_file.rs @@ -88,12 +88,9 @@ impl SourceExecutor for Executor { &self, key: &KeyValue, options: &SourceExecutorGetOptions, - ) -> Result { + ) -> Result> { if !self.is_file_included(key.str_value()?.as_ref()) { - return Ok(SourceValue { - value: None, - ordinal: None, - }); + return Ok(None); } let path = self.root_path.join(key.str_value()?.as_ref()); let ordinal = if options.include_ordinal { @@ -117,7 +114,7 @@ impl SourceExecutor for Executor { } else { None }; - Ok(SourceValue { value, ordinal }) + Ok(Some(SourceValue { value, ordinal })) } } diff --git a/src/server.rs b/src/server.rs index ef2c18530..f48df77e2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -61,6 +61,10 @@ pub async fn init_server( "/flows/:flowInstName/data", routing::get(service::flows::evaluate_data), ) + .route( + "/flows/:flowInstName/rowStatus", + routing::get(service::flows::get_row_index_status), + ) .route( "/flows/:flowInstName/update", routing::post(service::flows::update), diff --git a/src/service/flows.rs b/src/service/flows.rs index 510084741..07c43dfec 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -1,11 +1,8 @@ use crate::prelude::*; +use crate::execution::{evaluator, indexing_status, memoization, row_indexer, stats}; use crate::lib_context::LibContext; use crate::{base::schema::FlowSchema, ops::interface::SourceExecutorListOptions}; -use crate::{ - execution::memoization, - execution::{row_indexer, stats}, -}; use axum::{ extract::{Path, State}, http::StatusCode, @@ -103,7 +100,7 @@ pub async fn get_keys( } #[derive(Deserialize)] -pub struct EvaluateDataParams { +pub struct SourceRowKeyParams { field: String, key: Vec, } @@ -114,43 +111,66 @@ pub struct EvaluateDataResponse { data: value::ScopeValue, } +struct SourceRowKeyContextHolder<'a> { + plan: Arc, + import_op_idx: usize, + schema: &'a FlowSchema, + key: value::KeyValue, +} + +impl<'a> SourceRowKeyContextHolder<'a> { + async fn create(flow_ctx: &'a FlowContext, source_row_key: SourceRowKeyParams) -> Result { + let schema = &flow_ctx.flow.data_schema; + let import_op_idx = flow_ctx + .flow + .flow_instance + .import_ops + .iter() + .position(|op| op.name == source_row_key.field) + .ok_or_else(|| { + ApiError::new( + &format!("source field not found: {}", source_row_key.field), + StatusCode::BAD_REQUEST, + ) + })?; + let plan = flow_ctx.flow.get_execution_plan().await?; + let import_op = &plan.import_ops[import_op_idx]; + let field_schema = &schema.fields[import_op.output.field_idx as usize]; + let table_schema = match &field_schema.value_type.typ { + schema::ValueType::Table(table) => table, + _ => api_bail!("field is not a table: {}", source_row_key.field), + }; + let key_field = table_schema + .key_field() + .ok_or_else(|| api_error!("field {} does not have a key", source_row_key.field))?; + let key = value::KeyValue::from_strs(source_row_key.key, &key_field.value_type.typ)?; + Ok(Self { + plan, + import_op_idx, + schema, + key, + }) + } + + fn as_context<'b>(&'b self) -> evaluator::SourceRowEvaluationContext<'b> { + evaluator::SourceRowEvaluationContext { + plan: &self.plan, + import_op: &self.plan.import_ops[self.import_op_idx], + schema: self.schema, + key: &self.key, + } + } +} + pub async fn evaluate_data( Path(flow_name): Path, - Query(query): Query, + Query(query): Query, State(lib_context): State>, ) -> Result, ApiError> { let flow_ctx = lib_context.get_flow_context(&flow_name)?; - let schema = &flow_ctx.flow.data_schema; - - let import_op_idx = flow_ctx - .flow - .flow_instance - .import_ops - .iter() - .position(|op| op.name == query.field) - .ok_or_else(|| { - ApiError::new( - &format!("source field not found: {}", query.field), - StatusCode::BAD_REQUEST, - ) - })?; - let plan = flow_ctx.flow.get_execution_plan().await?; - let import_op = &plan.import_ops[import_op_idx]; - let field_schema = &schema.fields[import_op.output.field_idx as usize]; - let table_schema = match &field_schema.value_type.typ { - schema::ValueType::Table(table) => table, - _ => api_bail!("field is not a table: {}", query.field), - }; - let key_field = table_schema - .key_field() - .ok_or_else(|| api_error!("field {} does not have a key", query.field))?; - let key = value::KeyValue::from_strs(query.key, &key_field.value_type.typ)?; - + let source_row_key_ctx = SourceRowKeyContextHolder::create(&flow_ctx, query).await?; let evaluate_output = row_indexer::evaluate_source_entry_with_memory( - &plan, - import_op, - schema, - &key, + &source_row_key_ctx.as_context(), memoization::EvaluationMemoryOptions { enable_cache: true, evaluation_only: true, @@ -158,10 +178,15 @@ pub async fn evaluate_data( &lib_context.builtin_db_pool, ) .await? - .ok_or_else(|| api_error!("value not found for source at the specified key: {key:?}"))?; + .ok_or_else(|| { + api_error!( + "value not found for source at the specified key: {key:?}", + key = source_row_key_ctx.key + ) + })?; Ok(Json(EvaluateDataResponse { - schema: schema.clone(), + schema: flow_ctx.flow.data_schema.clone(), data: evaluate_output.data_scope.into(), })) } @@ -183,3 +208,18 @@ pub async fn update( live_updater.wait().await?; Ok(Json(live_updater.index_update_info())) } + +pub async fn get_row_index_status( + Path(flow_name): Path, + Query(query): Query, + State(lib_context): State>, +) -> Result, ApiError> { + let flow_ctx = lib_context.get_flow_context(&flow_name)?; + let source_row_key_ctx = SourceRowKeyContextHolder::create(&flow_ctx, query).await?; + let index_status = indexing_status::get_source_row_indexing_status( + &source_row_key_ctx.as_context(), + &lib_context.builtin_db_pool, + ) + .await?; + Ok(Json(index_status)) +}