Skip to content

Commit 801ae8f

Browse files
authored
Count source rows with ERROR during indexing. (#174)
1 parent c3315b4 commit 801ae8f

File tree

1 file changed

+21
-9
lines changed

1 file changed

+21
-9
lines changed

src/execution/indexer.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
use std::collections::{HashMap, HashSet};
2-
31
use anyhow::Result;
42
use futures::future::{join_all, try_join, try_join_all};
53
use log::{debug, error};
64
use serde::Serialize;
75
use sqlx::PgPool;
6+
use std::collections::{HashMap, HashSet};
87

98
use super::db_tracking::{self, read_source_tracking_info, TrackedTargetKey};
109
use super::db_tracking_setup;
@@ -23,16 +22,23 @@ use super::evaluator::{evaluate_source_entry, ScopeValueBuilder};
2322
pub struct UpdateStats {
2423
pub num_insertions: usize,
2524
pub num_deletions: usize,
26-
pub num_updates: usize,
25+
pub num_already_exists: usize,
26+
pub num_errors: usize,
2727
}
2828

2929
impl std::fmt::Display for UpdateStats {
3030
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31+
let num_source_rows = self.num_insertions + self.num_deletions + self.num_already_exists;
32+
write!(f, "{num_source_rows} source rows processed",)?;
33+
if self.num_errors > 0 {
34+
write!(f, " with {} ERRORS", self.num_errors)?;
35+
}
3136
write!(
3237
f,
33-
"{} added, {} removed, {} updated",
34-
self.num_insertions, self.num_deletions, self.num_updates
35-
)
38+
": {} added, {} removed, {} already exists",
39+
self.num_insertions, self.num_deletions, self.num_already_exists
40+
)?;
41+
Ok(())
3642
}
3743
}
3844

@@ -588,21 +594,27 @@ async fn update_source(
588594
}
589595
}
590596

591-
join_all(all_keys_set.into_iter().map(|key| async move {
597+
let num_errors = join_all(all_keys_set.into_iter().map(|key| async move {
592598
let result = update_source_entry(plan, source_op_idx, schema, &key, pool).await;
593599
if let Err(e) = result {
594600
error!("Error indexing source row: {}", e);
595601
debug!("Detailed error: {:?}", e);
602+
1
603+
} else {
604+
0
596605
}
597606
}))
598-
.await;
607+
.await
608+
.iter()
609+
.sum();
599610

600611
Ok(SourceUpdateInfo {
601612
source_name: source_name.to_string(),
602613
stats: UpdateStats {
603614
num_insertions: num_new_keys - num_updates,
604615
num_deletions,
605-
num_updates,
616+
num_already_exists: num_updates,
617+
num_errors,
606618
},
607619
})
608620
}

0 commit comments

Comments
 (0)