Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,7 @@ impl AnalyzerContext<'_> {
existing_target_states,
)?;
let op_name = export_op.name.clone();
let export_target_factory = export_op_group.target_factory.clone();
Ok(async move {
trace!("Start building executor for export op `{op_name}`");
let executors = data_coll_output
Expand All @@ -1075,6 +1076,7 @@ impl AnalyzerContext<'_> {
name: op_name,
target_id,
input: data_fields_info.local_collector_ref,
export_target_factory,
export_context: executors.export_context,
query_target: executors.query_target,
primary_key_def: data_fields_info.primary_key_def,
Expand Down
1 change: 1 addition & 0 deletions src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub struct AnalyzedExportOp {
pub name: String,
pub target_id: i32,
pub input: AnalyzedLocalCollectorReference,
pub export_target_factory: Arc<dyn ExportTargetFactory + Send + Sync>,
pub export_context: Arc<dyn Any + Send + Sync>,
pub query_target: Option<Arc<dyn QueryTarget>>,
pub primary_key_def: AnalyzedPrimaryKeyDef,
Expand Down
25 changes: 16 additions & 9 deletions src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use futures::future::try_join_all;
use sqlx::PgPool;
use std::collections::{HashMap, HashSet};

use super::db_tracking::{self, read_source_tracking_info_for_processing, TrackedTargetKey};
use super::db_tracking::{self, TrackedTargetKey, read_source_tracking_info_for_processing};
use super::db_tracking_setup;
use super::evaluator::{
evaluate_source_entry, EvaluateSourceEntryOutput, SourceRowEvaluationContext,
EvaluateSourceEntryOutput, SourceRowEvaluationContext, evaluate_source_entry,
};
use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, StoredMemoizationInfo};
use super::stats;
Expand Down Expand Up @@ -254,10 +254,17 @@ async fn precommit_source_tracking_info(
.existing_staging_keys_info
.remove(&primary_key_json);

let upsert_entry = export_op.export_target_factory.prepare_upsert_entry(
ExportTargetUpsertEntry {
key: primary_key,
value: field_values,
},
export_op.export_context.as_ref(),
)?;
let curr_fp = if !export_op.value_stable {
Some(
Fingerprinter::default()
.with(&field_values)?
.with(&upsert_entry.value)?
.into_fingerprint(),
)
} else {
Expand All @@ -273,15 +280,15 @@ async fn precommit_source_tracking_info(
{
// Already exists, with exactly the same value fingerprint.
// Nothing need to be changed, except carrying over the existing target keys info.
let (existing_ordinal, existing_fp) =
existing_target_keys.unwrap().into_iter().next().unwrap();
let (existing_ordinal, existing_fp) = existing_target_keys
.ok_or_else(invariance_violation)?
.into_iter()
.next()
.ok_or_else(invariance_violation)?;
keys_info.push((primary_key_json, existing_ordinal, existing_fp));
} else {
// Entry with new value. Needs to be upserted.
target_info.mutation.upserts.push(ExportTargetUpsertEntry {
key: primary_key,
value: field_values,
});
target_info.mutation.upserts.push(upsert_entry);
target_info.new_staging_keys_info.push((
primary_key_json.clone(),
process_ordinal,
Expand Down
26 changes: 24 additions & 2 deletions src/ops/factory_bases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,14 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {

fn describe_resource(&self, key: &Self::Key) -> Result<String>;

fn prepare_upsert_entry<'ctx>(
&self,
entry: ExportTargetUpsertEntry,
_export_context: &'ctx Self::ExportContext,
) -> Result<ExportTargetUpsertEntry> {
Ok(entry)
}

fn register(self, registry: &mut ExecutorFactoryRegistry) -> Result<()>
where
Self: Sized,
Expand Down Expand Up @@ -451,6 +459,20 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
Ok(result)
}

fn prepare_upsert_entry<'ctx>(
&self,
entry: ExportTargetUpsertEntry,
export_context: &'ctx (dyn Any + Send + Sync),
) -> Result<ExportTargetUpsertEntry> {
StorageFactoryBase::prepare_upsert_entry(
self,
entry,
export_context
.downcast_ref::<T::ExportContext>()
.ok_or_else(invariance_violation)?,
)
}

async fn apply_mutation(
&self,
mutations: Vec<ExportTargetMutationWithContext<'async_trait, dyn Any + Send + Sync>>,
Expand All @@ -463,7 +485,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
export_context: m
.export_context
.downcast_ref::<T::ExportContext>()
.ok_or_else(|| anyhow!("Unexpected export context type"))?,
.ok_or_else(invariance_violation)?,
})
})
.collect::<Result<_>>()?;
Expand All @@ -486,7 +508,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
.setup_status
.as_any()
.downcast_ref::<T::SetupStatus>()
.ok_or_else(|| anyhow!("Unexpected setup status type"))?,
.ok_or_else(invariance_violation)?,
})
})
.collect::<Result<Vec<_>>>()?,
Expand Down
6 changes: 6 additions & 0 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ pub trait ExportTargetFactory: Send + Sync {

fn describe_resource(&self, key: &serde_json::Value) -> Result<String>;

fn prepare_upsert_entry<'ctx>(
&self,
entry: ExportTargetUpsertEntry,
export_context: &'ctx (dyn Any + Send + Sync),
) -> Result<ExportTargetUpsertEntry>;

async fn apply_mutation(
&self,
mutations: Vec<ExportTargetMutationWithContext<'async_trait, dyn Any + Send + Sync>>,
Expand Down