Skip to content

Commit b246372

Browse files
authored
feat(target): support additional key for targets (#560)
1 parent 3dbde96 commit b246372

File tree

7 files changed

+182
-71
lines changed

7 files changed

+182
-71
lines changed

src/execution/db_tracking.rs

Lines changed: 76 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,80 @@ use crate::prelude::*;
33
use super::{db_tracking_setup::TrackingTableSetupState, memoization::StoredMemoizationInfo};
44
use crate::utils::{db::WriteAction, fingerprint::Fingerprint};
55
use futures::Stream;
6+
use serde::de::{self, Deserializer, SeqAccess, Visitor};
7+
use serde::ser::SerializeSeq;
68
use sqlx::PgPool;
9+
use std::fmt;
10+
11+
#[derive(Debug, Clone)]
12+
pub struct TrackedTargetKeyInfo {
13+
pub key: serde_json::Value,
14+
pub additional_key: serde_json::Value,
15+
pub process_ordinal: i64,
16+
// None means deletion.
17+
pub fingerprint: Option<Fingerprint>,
18+
}
19+
20+
impl Serialize for TrackedTargetKeyInfo {
21+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
22+
where
23+
S: serde::Serializer,
24+
{
25+
let mut seq = serializer.serialize_seq(None)?;
26+
seq.serialize_element(&self.key)?;
27+
seq.serialize_element(&self.process_ordinal)?;
28+
seq.serialize_element(&self.fingerprint)?;
29+
if !self.additional_key.is_null() {
30+
seq.serialize_element(&self.additional_key)?;
31+
}
32+
seq.end()
33+
}
34+
}
35+
36+
impl<'de> serde::Deserialize<'de> for TrackedTargetKeyInfo {
37+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
38+
where
39+
D: Deserializer<'de>,
40+
{
41+
struct TrackedTargetKeyVisitor;
42+
43+
impl<'de> Visitor<'de> for TrackedTargetKeyVisitor {
44+
type Value = TrackedTargetKeyInfo;
45+
46+
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
47+
formatter.write_str("a sequence of 3 or 4 elements for TrackedTargetKey")
48+
}
49+
50+
fn visit_seq<A>(self, mut seq: A) -> Result<TrackedTargetKeyInfo, A::Error>
51+
where
52+
A: SeqAccess<'de>,
53+
{
54+
let target_key: serde_json::Value = seq
55+
.next_element()?
56+
.ok_or_else(|| de::Error::invalid_length(0, &self))?;
57+
let process_ordinal: i64 = seq
58+
.next_element()?
59+
.ok_or_else(|| de::Error::invalid_length(1, &self))?;
60+
let fingerprint: Option<Fingerprint> = seq
61+
.next_element()?
62+
.ok_or_else(|| de::Error::invalid_length(2, &self))?;
63+
let additional_key: Option<serde_json::Value> = seq.next_element()?;
64+
65+
Ok(TrackedTargetKeyInfo {
66+
key: target_key,
67+
process_ordinal,
68+
fingerprint,
69+
additional_key: additional_key.unwrap_or(serde_json::Value::Null),
70+
})
71+
}
72+
}
73+
74+
deserializer.deserialize_seq(TrackedTargetKeyVisitor)
75+
}
76+
}
777

8-
/// (target_key, process_ordinal, fingerprint)
9-
pub type TrackedTargetKey = (serde_json::Value, i64, Option<Fingerprint>);
1078
/// (source_id, target_key)
11-
pub type TrackedTargetKeyForSource = Vec<(i32, Vec<TrackedTargetKey>)>;
79+
pub type TrackedTargetKeyForSource = Vec<(i32, Vec<TrackedTargetKeyInfo>)>;
1280

1381
#[derive(sqlx::FromRow, Debug)]
1482
pub struct SourceTrackingInfoForProcessing {
@@ -80,7 +148,8 @@ pub async fn precommit_source_tracking_info(
80148
let query_str = match action {
81149
WriteAction::Insert => format!(
82150
"INSERT INTO {} (source_id, source_key, max_process_ordinal, staging_target_keys, memoization_info) VALUES ($1, $2, $3, $4, $5)",
83-
db_setup.table_name),
151+
db_setup.table_name
152+
),
84153
WriteAction::Update => format!(
85154
"UPDATE {} SET max_process_ordinal = $3, staging_target_keys = $4, memoization_info = $5 WHERE source_id = $1 AND source_key = $2",
86155
db_setup.table_name
@@ -205,9 +274,9 @@ impl ListTrackedSourceKeyMetadataState {
205274
pool: &'a PgPool,
206275
) -> impl Stream<Item = Result<TrackedSourceKeyMetadata, sqlx::Error>> + 'a {
207276
self.query_str = format!(
208-
"SELECT source_key, processed_source_ordinal, process_logic_fingerprint FROM {} WHERE source_id = $1",
209-
db_setup.table_name
210-
);
277+
"SELECT source_key, processed_source_ordinal, process_logic_fingerprint FROM {} WHERE source_id = $1",
278+
db_setup.table_name
279+
);
211280
sqlx::query_as(&self.query_str).bind(source_id).fetch(pool)
212281
}
213282
}

src/execution/row_indexer.rs

Lines changed: 73 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use futures::future::try_join_all;
44
use sqlx::PgPool;
55
use std::collections::{HashMap, HashSet};
66

7-
use super::db_tracking::{self, TrackedTargetKey, read_source_tracking_info_for_processing};
7+
use super::db_tracking::{self, TrackedTargetKeyInfo, read_source_tracking_info_for_processing};
88
use super::db_tracking_setup;
99
use super::evaluator::{
1010
EvaluateSourceEntryOutput, SourceRowEvaluationContext, evaluate_source_entry,
@@ -119,18 +119,24 @@ pub enum SkippedOr<T> {
119119
Skipped(SourceVersion),
120120
}
121121

122+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
123+
struct TargetKeyPair {
124+
pub key: serde_json::Value,
125+
pub additional_key: serde_json::Value,
126+
}
127+
122128
#[derive(Default)]
123129
struct TrackingInfoForTarget<'a> {
124130
export_op: Option<&'a AnalyzedExportOp>,
125131

126132
// Existing keys info. Keyed by target key.
127133
// Will be removed after new rows for the same key are added into `new_staging_keys_info` and `mutation.upserts`,
128134
// hence all remaining ones are to be deleted.
129-
existing_staging_keys_info: HashMap<serde_json::Value, Vec<(i64, Option<Fingerprint>)>>,
130-
existing_keys_info: HashMap<serde_json::Value, Vec<(i64, Option<Fingerprint>)>>,
135+
existing_staging_keys_info: HashMap<TargetKeyPair, Vec<(i64, Option<Fingerprint>)>>,
136+
existing_keys_info: HashMap<TargetKeyPair, Vec<(i64, Option<Fingerprint>)>>,
131137

132138
// New keys info for staging.
133-
new_staging_keys_info: Vec<TrackedTargetKey>,
139+
new_staging_keys_info: Vec<TrackedTargetKeyInfo>,
134140

135141
// Mutation to apply to the target storage.
136142
mutation: ExportTargetMutation,
@@ -208,9 +214,12 @@ async fn precommit_source_tracking_info(
208214
for key_info in keys_info.into_iter() {
209215
target_info
210216
.existing_staging_keys_info
211-
.entry(key_info.0)
217+
.entry(TargetKeyPair {
218+
key: key_info.key,
219+
additional_key: key_info.additional_key,
220+
})
212221
.or_default()
213-
.push((key_info.1, key_info.2));
222+
.push((key_info.process_ordinal, key_info.fingerprint));
214223
}
215224
}
216225

@@ -220,9 +229,12 @@ async fn precommit_source_tracking_info(
220229
for key_info in keys_info.into_iter() {
221230
target_info
222231
.existing_keys_info
223-
.entry(key_info.0)
232+
.entry(TargetKeyPair {
233+
key: key_info.key,
234+
additional_key: key_info.additional_key,
235+
})
224236
.or_default()
225-
.push((key_info.1, key_info.2));
237+
.push((key_info.process_ordinal, key_info.fingerprint));
226238
}
227239
}
228240
}
@@ -249,22 +261,24 @@ async fn precommit_source_tracking_info(
249261
.fields
250262
.push(value.fields[*field as usize].clone());
251263
}
252-
let existing_target_keys = target_info.existing_keys_info.remove(&primary_key_json);
264+
let additional_key = export_op.export_target_factory.extract_additional_key(
265+
&primary_key,
266+
&field_values,
267+
export_op.export_context.as_ref(),
268+
)?;
269+
let target_key_pair = TargetKeyPair {
270+
key: primary_key_json,
271+
additional_key,
272+
};
273+
let existing_target_keys = target_info.existing_keys_info.remove(&target_key_pair);
253274
let existing_staging_target_keys = target_info
254275
.existing_staging_keys_info
255-
.remove(&primary_key_json);
276+
.remove(&target_key_pair);
256277

257-
let upsert_entry = export_op.export_target_factory.prepare_upsert_entry(
258-
ExportTargetUpsertEntry {
259-
key: primary_key,
260-
value: field_values,
261-
},
262-
export_op.export_context.as_ref(),
263-
)?;
264278
let curr_fp = if !export_op.value_stable {
265279
Some(
266280
Fingerprinter::default()
267-
.with(&upsert_entry.value)?
281+
.with(&field_values)?
268282
.into_fingerprint(),
269283
)
270284
} else {
@@ -285,16 +299,29 @@ async fn precommit_source_tracking_info(
285299
.into_iter()
286300
.next()
287301
.ok_or_else(invariance_violation)?;
288-
keys_info.push((primary_key_json, existing_ordinal, existing_fp));
302+
keys_info.push(TrackedTargetKeyInfo {
303+
key: target_key_pair.key,
304+
additional_key: target_key_pair.additional_key,
305+
process_ordinal: existing_ordinal,
306+
fingerprint: existing_fp,
307+
});
289308
} else {
290309
// Entry with new value. Needs to be upserted.
291-
target_info.mutation.upserts.push(upsert_entry);
292-
target_info.new_staging_keys_info.push((
293-
primary_key_json.clone(),
310+
let tracked_target_key = TrackedTargetKeyInfo {
311+
key: target_key_pair.key.clone(),
312+
additional_key: target_key_pair.additional_key.clone(),
294313
process_ordinal,
295-
curr_fp,
296-
));
297-
keys_info.push((primary_key_json, process_ordinal, curr_fp));
314+
fingerprint: curr_fp,
315+
};
316+
target_info.mutation.upserts.push(ExportTargetUpsertEntry {
317+
key: primary_key,
318+
additional_key: target_key_pair.additional_key,
319+
value: field_values,
320+
});
321+
target_info
322+
.new_staging_keys_info
323+
.push(tracked_target_key.clone());
324+
keys_info.push(tracked_target_key);
298325
}
299326
}
300327
new_target_keys_info.push((export_op.target_id, keys_info));
@@ -304,32 +331,35 @@ async fn precommit_source_tracking_info(
304331
let mut new_staging_target_keys = db_tracking::TrackedTargetKeyForSource::default();
305332
let mut target_mutations = HashMap::with_capacity(export_ops.len());
306333
for (target_id, target_tracking_info) in tracking_info_for_targets.into_iter() {
307-
let legacy_keys: HashSet<serde_json::Value> = target_tracking_info
334+
let legacy_keys: HashSet<TargetKeyPair> = target_tracking_info
308335
.existing_keys_info
309336
.into_keys()
310337
.chain(target_tracking_info.existing_staging_keys_info.into_keys())
311338
.collect();
312339

313340
let mut new_staging_keys_info = target_tracking_info.new_staging_keys_info;
314341
// Add tracking info for deletions.
315-
new_staging_keys_info.extend(
316-
legacy_keys
317-
.iter()
318-
.map(|key| ((*key).clone(), process_ordinal, None)),
319-
);
342+
new_staging_keys_info.extend(legacy_keys.iter().map(|key| TrackedTargetKeyInfo {
343+
key: key.key.clone(),
344+
additional_key: key.additional_key.clone(),
345+
process_ordinal,
346+
fingerprint: None,
347+
}));
320348
new_staging_target_keys.push((target_id, new_staging_keys_info));
321349

322350
if let Some(export_op) = target_tracking_info.export_op {
323351
let mut mutation = target_tracking_info.mutation;
324-
mutation.delete_keys.reserve(legacy_keys.len());
352+
mutation.deletes.reserve(legacy_keys.len());
325353
for legacy_key in legacy_keys.into_iter() {
326-
mutation.delete_keys.push(
327-
value::Value::<value::ScopeValue>::from_json(
328-
legacy_key,
329-
&export_op.primary_key_type,
330-
)?
331-
.as_key()?,
332-
);
354+
let key = value::Value::<value::ScopeValue>::from_json(
355+
legacy_key.key,
356+
&export_op.primary_key_type,
357+
)?
358+
.as_key()?;
359+
mutation.deletes.push(interface::ExportTargetDeleteEntry {
360+
key,
361+
additional_key: legacy_key.additional_key,
362+
});
333363
}
334364
target_mutations.insert(target_id, mutation);
335365
}
@@ -398,9 +428,10 @@ async fn commit_source_tracking_info(
398428
.filter_map(|(target_id, target_keys)| {
399429
let cleaned_target_keys: Vec<_> = target_keys
400430
.into_iter()
401-
.filter(|(_, ordinal, _)| {
402-
Some(*ordinal) > precommit_metadata.existing_process_ordinal
403-
&& *ordinal != precommit_metadata.process_ordinal
431+
.filter(|key_info| {
432+
Some(key_info.process_ordinal)
433+
> precommit_metadata.existing_process_ordinal
434+
&& key_info.process_ordinal != precommit_metadata.process_ordinal
404435
})
405436
.collect();
406437
if !cleaned_target_keys.is_empty() {

src/ops/factory_bases.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -327,12 +327,13 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
327327

328328
fn describe_resource(&self, key: &Self::Key) -> Result<String>;
329329

330-
fn prepare_upsert_entry<'ctx>(
330+
fn extract_additional_key<'ctx>(
331331
&self,
332-
entry: ExportTargetUpsertEntry,
332+
_key: &value::KeyValue,
333+
_value: &value::FieldValues,
333334
_export_context: &'ctx Self::ExportContext,
334-
) -> Result<ExportTargetUpsertEntry> {
335-
Ok(entry)
335+
) -> Result<serde_json::Value> {
336+
Ok(serde_json::Value::Null)
336337
}
337338

338339
fn register(self, registry: &mut ExecutorFactoryRegistry) -> Result<()>
@@ -459,14 +460,16 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
459460
Ok(result)
460461
}
461462

462-
fn prepare_upsert_entry<'ctx>(
463+
fn extract_additional_key<'ctx>(
463464
&self,
464-
entry: ExportTargetUpsertEntry,
465+
key: &value::KeyValue,
466+
value: &value::FieldValues,
465467
export_context: &'ctx (dyn Any + Send + Sync),
466-
) -> Result<ExportTargetUpsertEntry> {
467-
StorageFactoryBase::prepare_upsert_entry(
468+
) -> Result<serde_json::Value> {
469+
StorageFactoryBase::extract_additional_key(
468470
self,
469-
entry,
471+
key,
472+
value,
470473
export_context
471474
.downcast_ref::<T::ExportContext>()
472475
.ok_or_else(invariance_violation)?,

src/ops/interface.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,18 +195,25 @@ pub trait SimpleFunctionFactory {
195195
#[derive(Debug)]
196196
pub struct ExportTargetUpsertEntry {
197197
pub key: KeyValue,
198+
pub additional_key: serde_json::Value,
198199
pub value: FieldValues,
199200
}
200201

202+
#[derive(Debug)]
203+
pub struct ExportTargetDeleteEntry {
204+
pub key: KeyValue,
205+
pub additional_key: serde_json::Value,
206+
}
207+
201208
#[derive(Debug, Default)]
202209
pub struct ExportTargetMutation {
203210
pub upserts: Vec<ExportTargetUpsertEntry>,
204-
pub delete_keys: Vec<KeyValue>,
211+
pub deletes: Vec<ExportTargetDeleteEntry>,
205212
}
206213

207214
impl ExportTargetMutation {
208215
pub fn is_empty(&self) -> bool {
209-
self.upserts.is_empty() && self.delete_keys.is_empty()
216+
self.upserts.is_empty() && self.deletes.is_empty()
210217
}
211218
}
212219

@@ -286,11 +293,12 @@ pub trait ExportTargetFactory: Send + Sync {
286293

287294
fn describe_resource(&self, key: &serde_json::Value) -> Result<String>;
288295

289-
fn prepare_upsert_entry<'ctx>(
296+
fn extract_additional_key<'ctx>(
290297
&self,
291-
entry: ExportTargetUpsertEntry,
298+
key: &KeyValue,
299+
value: &FieldValues,
292300
export_context: &'ctx (dyn Any + Send + Sync),
293-
) -> Result<ExportTargetUpsertEntry>;
301+
) -> Result<serde_json::Value>;
294302

295303
async fn apply_mutation(
296304
&self,

0 commit comments

Comments
 (0)