Skip to content

Commit 81317f5

Browse files
authored
Refactor: split indexer into row_indexer, source_indexer and stats (#230)
1 parent fc740a3 commit 81317f5

File tree

9 files changed

+194
-178
lines changed

9 files changed

+194
-178
lines changed

src/execution/dumper.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use std::collections::BTreeMap;
1111
use std::path::{Path, PathBuf};
1212
use yaml_rust2::YamlEmitter;
1313

14-
use super::indexer;
1514
use super::memoization::EvaluationMemoryOptions;
15+
use super::row_indexer;
1616
use crate::base::{schema, value};
1717
use crate::builder::plan::{AnalyzedSourceOp, ExecutionPlan};
1818
use crate::ops::interface::SourceExecutorListOptions;
@@ -76,7 +76,7 @@ impl<'a> Dumper<'a> {
7676
where
7777
'a: 'b,
7878
{
79-
let data_builder = indexer::evaluate_source_entry_with_memory(
79+
let data_builder = row_indexer::evaluate_source_entry_with_memory(
8080
self.plan,
8181
source_op,
8282
self.schema,
@@ -113,8 +113,10 @@ impl<'a> Dumper<'a> {
113113
data: collected_values_buffer[collector_idx]
114114
.iter()
115115
.map(|v| -> Result<_> {
116-
let key =
117-
indexer::extract_primary_key(&export_op.primary_key_def, v)?;
116+
let key = row_indexer::extract_primary_key(
117+
&export_op.primary_key_def,
118+
v,
119+
)?;
118120
Ok((key, v))
119121
})
120122
.collect::<Result<_>>()?,

src/execution/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
pub(crate) mod db_tracking_setup;
12
pub(crate) mod dumper;
23
pub(crate) mod evaluator;
3-
pub(crate) mod indexer;
4+
pub(crate) mod memoization;
45
pub(crate) mod query;
6+
pub(crate) mod row_indexer;
7+
pub(crate) mod source_indexer;
8+
pub(crate) mod stats;
59

610
mod db_tracking;
7-
pub mod db_tracking_setup;
8-
9-
pub(crate) mod memoization;

src/execution/indexer.rs renamed to src/execution/row_indexer.rs

Lines changed: 11 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -1,92 +1,24 @@
11
use crate::prelude::*;
22

3-
use futures::future::{join, join_all, try_join_all};
4-
use itertools::Itertools;
3+
use futures::future::try_join_all;
54
use log::error;
6-
use serde::Serialize;
75
use sqlx::PgPool;
86
use std::collections::{HashMap, HashSet};
9-
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
7+
use std::sync::atomic::Ordering::Relaxed;
108

119
use super::db_tracking::{self, read_source_tracking_info_for_processing, TrackedTargetKey};
1210
use super::db_tracking_setup;
11+
use super::evaluator::{evaluate_source_entry, ScopeValueBuilder};
1312
use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, StoredMemoizationInfo};
13+
use super::stats;
14+
1415
use crate::base::schema;
1516
use crate::base::value::{self, FieldValues, KeyValue};
1617
use crate::builder::plan::*;
17-
use crate::ops::interface::{
18-
ExportTargetMutation, ExportTargetUpsertEntry, Ordinal, SourceExecutorListOptions,
19-
};
18+
use crate::ops::interface::{ExportTargetMutation, ExportTargetUpsertEntry, Ordinal};
2019
use crate::utils::db::WriteAction;
2120
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};
2221

23-
use super::evaluator::{evaluate_source_entry, ScopeValueBuilder};
24-
25-
#[derive(Debug, Serialize, Default)]
26-
pub struct UpdateStats {
27-
pub num_skipped: AtomicUsize,
28-
pub num_insertions: AtomicUsize,
29-
pub num_deletions: AtomicUsize,
30-
pub num_repreocesses: AtomicUsize,
31-
pub num_errors: AtomicUsize,
32-
}
33-
34-
impl std::fmt::Display for UpdateStats {
35-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36-
let num_skipped = self.num_skipped.load(Relaxed);
37-
if num_skipped > 0 {
38-
write!(f, "{} rows skipped", num_skipped)?;
39-
}
40-
41-
let num_insertions = self.num_insertions.load(Relaxed);
42-
let num_deletions = self.num_deletions.load(Relaxed);
43-
let num_reprocesses = self.num_repreocesses.load(Relaxed);
44-
let num_source_rows = num_insertions + num_deletions + num_reprocesses;
45-
if num_source_rows > 0 {
46-
if num_skipped > 0 {
47-
write!(f, ", ")?;
48-
}
49-
write!(f, "{num_source_rows} source rows processed",)?;
50-
51-
let num_errors = self.num_errors.load(Relaxed);
52-
if num_errors > 0 {
53-
write!(f, " with {num_errors} ERRORS",)?;
54-
}
55-
write!(
56-
f,
57-
": {num_insertions} added, {num_deletions} removed, {num_reprocesses} repocessed",
58-
)?;
59-
}
60-
Ok(())
61-
}
62-
}
63-
64-
#[derive(Debug, Serialize)]
65-
pub struct SourceUpdateInfo {
66-
pub source_name: String,
67-
pub stats: UpdateStats,
68-
}
69-
70-
impl std::fmt::Display for SourceUpdateInfo {
71-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72-
write!(f, "{}: {}", self.source_name, self.stats)
73-
}
74-
}
75-
76-
#[derive(Debug, Serialize)]
77-
pub struct IndexUpdateInfo {
78-
pub sources: Vec<SourceUpdateInfo>,
79-
}
80-
81-
impl std::fmt::Display for IndexUpdateInfo {
82-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83-
for source in self.sources.iter() {
84-
writeln!(f, "{}", source)?;
85-
}
86-
Ok(())
87-
}
88-
}
89-
9022
pub fn extract_primary_key(
9123
primary_key_def: &AnalyzedPrimaryKeyDef,
9224
record: &FieldValues,
@@ -470,14 +402,14 @@ pub async fn evaluate_source_entry_with_memory(
470402
Ok(Some(output))
471403
}
472404

473-
pub async fn update_source_entry(
405+
pub async fn update_source_row(
474406
plan: &ExecutionPlan,
475407
source_op: &AnalyzedSourceOp,
476408
schema: &schema::DataSchema,
477409
key: &value::KeyValue,
478410
only_for_deletion: bool,
479411
pool: &PgPool,
480-
stats: &UpdateStats,
412+
stats: &stats::UpdateStats,
481413
) -> Result<()> {
482414
let source_key_json = serde_json::to_value(key)?;
483415
let process_timestamp = chrono::Utc::now();
@@ -617,85 +549,18 @@ pub async fn update_source_entry(
617549
Ok(())
618550
}
619551

620-
async fn update_source_entry_with_err_handling(
552+
pub(super) async fn update_source_row_with_err_handling(
621553
plan: &ExecutionPlan,
622554
source_op: &AnalyzedSourceOp,
623555
schema: &schema::DataSchema,
624556
key: &value::KeyValue,
625557
only_for_deletion: bool,
626558
pool: &PgPool,
627-
stats: &UpdateStats,
559+
stats: &stats::UpdateStats,
628560
) {
629-
let r = update_source_entry(plan, source_op, schema, key, only_for_deletion, pool, stats).await;
561+
let r = update_source_row(plan, source_op, schema, key, only_for_deletion, pool, stats).await;
630562
if let Err(e) = r {
631563
stats.num_errors.fetch_add(1, Relaxed);
632564
error!("{:?}", e.context("Error in indexing a source row"));
633565
}
634566
}
635-
636-
async fn update_source(
637-
source_name: &str,
638-
plan: &ExecutionPlan,
639-
source_op: &AnalyzedSourceOp,
640-
schema: &schema::DataSchema,
641-
pool: &PgPool,
642-
) -> Result<SourceUpdateInfo> {
643-
let existing_keys_json = db_tracking::list_source_tracking_keys(
644-
source_op.source_id,
645-
&plan.tracking_table_setup,
646-
pool,
647-
)
648-
.await?;
649-
650-
let mut keys = Vec::new();
651-
let mut rows_stream = source_op.executor.list(SourceExecutorListOptions {
652-
include_ordinal: false,
653-
});
654-
while let Some(rows) = rows_stream.next().await {
655-
keys.extend(rows?.into_iter().map(|row| row.key));
656-
}
657-
658-
let stats = UpdateStats::default();
659-
let upsert_futs = join_all(keys.iter().map(|key| {
660-
update_source_entry_with_err_handling(plan, source_op, schema, key, false, pool, &stats)
661-
}));
662-
let deleted_keys = existing_keys_json
663-
.into_iter()
664-
.map(|existing_key_json| {
665-
value::Value::<value::ScopeValue>::from_json(
666-
existing_key_json.source_key,
667-
&source_op.primary_key_type,
668-
)?
669-
.as_key()
670-
})
671-
.filter_ok(|existing_key| !keys.contains(existing_key))
672-
.collect::<Result<Vec<_>>>()?;
673-
let delete_futs = join_all(deleted_keys.iter().map(|key| {
674-
update_source_entry_with_err_handling(plan, source_op, schema, key, true, pool, &stats)
675-
}));
676-
join(upsert_futs, delete_futs).await;
677-
678-
Ok(SourceUpdateInfo {
679-
source_name: source_name.to_string(),
680-
stats,
681-
})
682-
}
683-
684-
pub async fn update(
685-
plan: &ExecutionPlan,
686-
schema: &schema::DataSchema,
687-
pool: &PgPool,
688-
) -> Result<IndexUpdateInfo> {
689-
let source_update_stats = try_join_all(
690-
plan.source_ops
691-
.iter()
692-
.map(|source_op| async move {
693-
update_source(source_op.name.as_str(), plan, source_op, schema, pool).await
694-
})
695-
.collect::<Vec<_>>(),
696-
)
697-
.await?;
698-
Ok(IndexUpdateInfo {
699-
sources: source_update_stats,
700-
})
701-
}

src/execution/source_indexer.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use crate::prelude::*;
2+
3+
use super::{db_tracking, row_indexer, stats};
4+
use futures::future::{join, join_all, try_join_all};
5+
use sqlx::PgPool;
6+
7+
async fn update_source(
8+
source_name: &str,
9+
plan: &plan::ExecutionPlan,
10+
source_op: &plan::AnalyzedSourceOp,
11+
schema: &schema::DataSchema,
12+
pool: &PgPool,
13+
) -> Result<stats::SourceUpdateInfo> {
14+
let existing_keys_json = db_tracking::list_source_tracking_keys(
15+
source_op.source_id,
16+
&plan.tracking_table_setup,
17+
pool,
18+
)
19+
.await?;
20+
21+
let mut keys = Vec::new();
22+
let mut rows_stream = source_op
23+
.executor
24+
.list(interface::SourceExecutorListOptions {
25+
include_ordinal: false,
26+
});
27+
while let Some(rows) = rows_stream.next().await {
28+
keys.extend(rows?.into_iter().map(|row| row.key));
29+
}
30+
31+
let stats = stats::UpdateStats::default();
32+
let upsert_futs = join_all(keys.iter().map(|key| {
33+
row_indexer::update_source_row_with_err_handling(
34+
plan, source_op, schema, key, false, pool, &stats,
35+
)
36+
}));
37+
let deleted_keys = existing_keys_json
38+
.into_iter()
39+
.map(|existing_key_json| {
40+
value::Value::<value::ScopeValue>::from_json(
41+
existing_key_json.source_key,
42+
&source_op.primary_key_type,
43+
)?
44+
.as_key()
45+
})
46+
.filter_ok(|existing_key| !keys.contains(existing_key))
47+
.collect::<Result<Vec<_>>>()?;
48+
let delete_futs = join_all(deleted_keys.iter().map(|key| {
49+
row_indexer::update_source_row_with_err_handling(
50+
plan, source_op, schema, key, true, pool, &stats,
51+
)
52+
}));
53+
join(upsert_futs, delete_futs).await;
54+
55+
Ok(stats::SourceUpdateInfo {
56+
source_name: source_name.to_string(),
57+
stats,
58+
})
59+
}
60+
61+
pub async fn update(
62+
plan: &plan::ExecutionPlan,
63+
schema: &schema::DataSchema,
64+
pool: &PgPool,
65+
) -> Result<stats::IndexUpdateInfo> {
66+
let source_update_stats = try_join_all(
67+
plan.source_ops
68+
.iter()
69+
.map(|source_op| async move {
70+
update_source(source_op.name.as_str(), plan, source_op, schema, pool).await
71+
})
72+
.collect::<Vec<_>>(),
73+
)
74+
.await?;
75+
Ok(stats::IndexUpdateInfo {
76+
sources: source_update_stats,
77+
})
78+
}

src/execution/stats.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use crate::prelude::*;
2+
3+
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
4+
5+
#[derive(Debug, Serialize, Default)]
6+
pub struct UpdateStats {
7+
pub num_skipped: AtomicUsize,
8+
pub num_insertions: AtomicUsize,
9+
pub num_deletions: AtomicUsize,
10+
pub num_repreocesses: AtomicUsize,
11+
pub num_errors: AtomicUsize,
12+
}
13+
14+
impl std::fmt::Display for UpdateStats {
15+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16+
let num_skipped = self.num_skipped.load(Relaxed);
17+
if num_skipped > 0 {
18+
write!(f, "{} rows skipped", num_skipped)?;
19+
}
20+
21+
let num_insertions = self.num_insertions.load(Relaxed);
22+
let num_deletions = self.num_deletions.load(Relaxed);
23+
let num_reprocesses = self.num_repreocesses.load(Relaxed);
24+
let num_source_rows = num_insertions + num_deletions + num_reprocesses;
25+
if num_source_rows > 0 {
26+
if num_skipped > 0 {
27+
write!(f, ", ")?;
28+
}
29+
write!(f, "{num_source_rows} source rows processed",)?;
30+
31+
let num_errors = self.num_errors.load(Relaxed);
32+
if num_errors > 0 {
33+
write!(f, " with {num_errors} ERRORS",)?;
34+
}
35+
write!(
36+
f,
37+
": {num_insertions} added, {num_deletions} removed, {num_reprocesses} repocessed",
38+
)?;
39+
}
40+
Ok(())
41+
}
42+
}
43+
44+
#[derive(Debug, Serialize)]
45+
pub struct SourceUpdateInfo {
46+
pub source_name: String,
47+
pub stats: UpdateStats,
48+
}
49+
50+
impl std::fmt::Display for SourceUpdateInfo {
51+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52+
write!(f, "{}: {}", self.source_name, self.stats)
53+
}
54+
}
55+
56+
#[derive(Debug, Serialize)]
57+
pub struct IndexUpdateInfo {
58+
pub sources: Vec<SourceUpdateInfo>,
59+
}
60+
61+
impl std::fmt::Display for IndexUpdateInfo {
62+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63+
for source in self.sources.iter() {
64+
writeln!(f, "{}", source)?;
65+
}
66+
Ok(())
67+
}
68+
}

0 commit comments

Comments
 (0)