Skip to content

Commit b089189

Browse files
authored
feat(fast-fp): support fast collapse without reading source data (#895)
1 parent ba3c733 commit b089189

File tree

12 files changed

+622
-431
lines changed

12 files changed

+622
-431
lines changed

src/execution/db_tracking.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ pub struct SourceTrackingInfoForPrecommit {
118118
pub staging_target_keys: sqlx::types::Json<TrackedTargetKeyForSource>,
119119

120120
pub processed_source_ordinal: Option<i64>,
121+
pub processed_source_fp: Option<Vec<u8>>,
121122
pub process_logic_fingerprint: Option<Vec<u8>>,
122123
pub process_ordinal: Option<i64>,
123124
pub target_keys: Option<sqlx::types::Json<TrackedTargetKeyForSource>>,
@@ -130,7 +131,12 @@ pub async fn read_source_tracking_info_for_precommit(
130131
db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
131132
) -> Result<Option<SourceTrackingInfoForPrecommit>> {
132133
let query_str = format!(
133-
"SELECT max_process_ordinal, staging_target_keys, processed_source_ordinal, process_logic_fingerprint, process_ordinal, target_keys FROM {} WHERE source_id = $1 AND source_key = $2",
134+
"SELECT max_process_ordinal, staging_target_keys, processed_source_ordinal, {}, process_logic_fingerprint, process_ordinal, target_keys FROM {} WHERE source_id = $1 AND source_key = $2",
135+
if db_setup.has_fast_fingerprint_column {
136+
"processed_source_fp"
137+
} else {
138+
"NULL::bytea AS processed_source_fp"
139+
},
134140
db_setup.table_name
135141
);
136142
let precommit_tracking_info = sqlx::query_as(&query_str)
@@ -282,6 +288,7 @@ pub async fn delete_source_tracking_info(
282288
pub struct TrackedSourceKeyMetadata {
283289
pub source_key: serde_json::Value,
284290
pub processed_source_ordinal: Option<i64>,
291+
pub processed_source_fp: Option<Vec<u8>>,
285292
pub process_logic_fingerprint: Option<Vec<u8>>,
286293
}
287294

@@ -303,7 +310,12 @@ impl ListTrackedSourceKeyMetadataState {
303310
pool: &'a PgPool,
304311
) -> impl Stream<Item = Result<TrackedSourceKeyMetadata, sqlx::Error>> + 'a {
305312
self.query_str = format!(
306-
"SELECT source_key, processed_source_ordinal, process_logic_fingerprint FROM {} WHERE source_id = $1",
313+
"SELECT source_key, processed_source_ordinal, {}, process_logic_fingerprint FROM {} WHERE source_id = $1",
314+
if db_setup.has_fast_fingerprint_column {
315+
"processed_source_fp"
316+
} else {
317+
"NULL::bytea AS processed_source_fp"
318+
},
307319
db_setup.table_name
308320
);
309321
sqlx::query_as(&self.query_str).bind(source_id).fetch(pool)

src/execution/dumper.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ impl<'a> Dumper<'a> {
193193

194194
let mut rows_stream = import_op.executor.list(&SourceExecutorListOptions {
195195
include_ordinal: false,
196+
include_content_version_fp: false,
196197
});
197198
while let Some(rows) = rows_stream.next().await {
198199
for row in rows?.into_iter() {

src/execution/indexing_status.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub async fn get_source_row_indexing_status(
4141
&interface::SourceExecutorGetOptions {
4242
include_value: false,
4343
include_ordinal: true,
44+
include_content_version_fp: false,
4445
},
4546
);
4647
let (last_processed, current) = try_join!(last_processed_fut, current_fut)?;

src/execution/live_updater.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{
2-
execution::{source_indexer::ProcessSourceKeyOptions, stats::UpdateStats},
2+
execution::{source_indexer::ProcessSourceKeyInput, stats::UpdateStats},
33
prelude::*,
44
};
55

@@ -200,10 +200,9 @@ impl SourceUpdateTask {
200200
SharedAckFn::ack(&shared_ack_fn).await
201201
}),
202202
pool.clone(),
203-
ProcessSourceKeyOptions {
203+
ProcessSourceKeyInput {
204204
key_aux_info: Some(change.key_aux_info),
205-
source_data: change.data,
206-
..Default::default()
205+
data: change.data,
207206
},
208207
));
209208
}

0 commit comments

Comments
 (0)