Skip to content

Commit e49816d

Browse files
committed
feat(target): add prepare_upsert_entry() to ExportTargetFactory
1 parent 65253ed commit e49816d

File tree

5 files changed

+49
-11
lines changed

5 files changed

+49
-11
lines changed

src/builder/analyzer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,6 +1064,7 @@ impl AnalyzerContext<'_> {
10641064
existing_target_states,
10651065
)?;
10661066
let op_name = export_op.name.clone();
1067+
let export_target_factory = export_op_group.target_factory.clone();
10671068
Ok(async move {
10681069
trace!("Start building executor for export op `{op_name}`");
10691070
let executors = data_coll_output
@@ -1075,6 +1076,7 @@ impl AnalyzerContext<'_> {
10751076
name: op_name,
10761077
target_id,
10771078
input: data_fields_info.local_collector_ref,
1079+
export_target_factory,
10781080
export_context: executors.export_context,
10791081
query_target: executors.query_target,
10801082
primary_key_def: data_fields_info.primary_key_def,

src/builder/plan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ pub struct AnalyzedExportOp {
101101
pub name: String,
102102
pub target_id: i32,
103103
pub input: AnalyzedLocalCollectorReference,
104+
pub export_target_factory: Arc<dyn ExportTargetFactory + Send + Sync>,
104105
pub export_context: Arc<dyn Any + Send + Sync>,
105106
pub query_target: Option<Arc<dyn QueryTarget>>,
106107
pub primary_key_def: AnalyzedPrimaryKeyDef,

src/execution/row_indexer.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ use futures::future::try_join_all;
44
use sqlx::PgPool;
55
use std::collections::{HashMap, HashSet};
66

7-
use super::db_tracking::{self, read_source_tracking_info_for_processing, TrackedTargetKey};
7+
use super::db_tracking::{self, TrackedTargetKey, read_source_tracking_info_for_processing};
88
use super::db_tracking_setup;
99
use super::evaluator::{
10-
evaluate_source_entry, EvaluateSourceEntryOutput, SourceRowEvaluationContext,
10+
EvaluateSourceEntryOutput, SourceRowEvaluationContext, evaluate_source_entry,
1111
};
1212
use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, StoredMemoizationInfo};
1313
use super::stats;
@@ -254,10 +254,17 @@ async fn precommit_source_tracking_info(
254254
.existing_staging_keys_info
255255
.remove(&primary_key_json);
256256

257+
let upsert_entry = export_op.export_target_factory.prepare_upsert_entry(
258+
ExportTargetUpsertEntry {
259+
key: primary_key,
260+
value: field_values,
261+
},
262+
export_op.export_context.as_ref(),
263+
)?;
257264
let curr_fp = if !export_op.value_stable {
258265
Some(
259266
Fingerprinter::default()
260-
.with(&field_values)?
267+
.with(&upsert_entry.value)?
261268
.into_fingerprint(),
262269
)
263270
} else {
@@ -273,15 +280,15 @@ async fn precommit_source_tracking_info(
273280
{
274281
// Already exists, with exactly the same value fingerprint.
275282
// Nothing need to be changed, except carrying over the existing target keys info.
276-
let (existing_ordinal, existing_fp) =
277-
existing_target_keys.unwrap().into_iter().next().unwrap();
283+
let (existing_ordinal, existing_fp) = existing_target_keys
284+
.ok_or_else(invariance_violation)?
285+
.into_iter()
286+
.next()
287+
.ok_or_else(invariance_violation)?;
278288
keys_info.push((primary_key_json, existing_ordinal, existing_fp));
279289
} else {
280290
// Entry with new value. Needs to be upserted.
281-
target_info.mutation.upserts.push(ExportTargetUpsertEntry {
282-
key: primary_key,
283-
value: field_values,
284-
});
291+
target_info.mutation.upserts.push(upsert_entry);
285292
target_info.new_staging_keys_info.push((
286293
primary_key_json.clone(),
287294
process_ordinal,

src/ops/factory_bases.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,14 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
327327

328328
fn describe_resource(&self, key: &Self::Key) -> Result<String>;
329329

330+
fn prepare_upsert_entry<'ctx>(
331+
&self,
332+
entry: ExportTargetUpsertEntry,
333+
_export_context: &'ctx Self::ExportContext,
334+
) -> Result<ExportTargetUpsertEntry> {
335+
Ok(entry)
336+
}
337+
330338
fn register(self, registry: &mut ExecutorFactoryRegistry) -> Result<()>
331339
where
332340
Self: Sized,
@@ -451,6 +459,20 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
451459
Ok(result)
452460
}
453461

462+
fn prepare_upsert_entry<'ctx>(
463+
&self,
464+
entry: ExportTargetUpsertEntry,
465+
export_context: &'ctx (dyn Any + Send + Sync),
466+
) -> Result<ExportTargetUpsertEntry> {
467+
StorageFactoryBase::prepare_upsert_entry(
468+
self,
469+
entry,
470+
export_context
471+
.downcast_ref::<T::ExportContext>()
472+
.ok_or_else(invariance_violation)?,
473+
)
474+
}
475+
454476
async fn apply_mutation(
455477
&self,
456478
mutations: Vec<ExportTargetMutationWithContext<'async_trait, dyn Any + Send + Sync>>,
@@ -463,7 +485,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
463485
export_context: m
464486
.export_context
465487
.downcast_ref::<T::ExportContext>()
466-
.ok_or_else(|| anyhow!("Unexpected export context type"))?,
488+
.ok_or_else(invariance_violation)?,
467489
})
468490
})
469491
.collect::<Result<_>>()?;
@@ -486,7 +508,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
486508
.setup_status
487509
.as_any()
488510
.downcast_ref::<T::SetupStatus>()
489-
.ok_or_else(|| anyhow!("Unexpected setup status type"))?,
511+
.ok_or_else(invariance_violation)?,
490512
})
491513
})
492514
.collect::<Result<Vec<_>>>()?,

src/ops/interface.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,12 @@ pub trait ExportTargetFactory: Send + Sync {
286286

287287
fn describe_resource(&self, key: &serde_json::Value) -> Result<String>;
288288

289+
fn prepare_upsert_entry<'ctx>(
290+
&self,
291+
entry: ExportTargetUpsertEntry,
292+
export_context: &'ctx (dyn Any + Send + Sync),
293+
) -> Result<ExportTargetUpsertEntry>;
294+
289295
async fn apply_mutation(
290296
&self,
291297
mutations: Vec<ExportTargetMutationWithContext<'async_trait, dyn Any + Send + Sync>>,

0 commit comments

Comments
 (0)