Skip to content

Commit a45141f

Browse files
committed
perf: tune the condition about when to list with values
1 parent 0f3486c commit a45141f

File tree

8 files changed

+40
-11
lines changed

8 files changed

+40
-11
lines changed

src/execution/live_updater.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,9 @@ impl SourceUpdateTask {
242242
let live_mode = self.options.live_mode;
243243
async move {
244244
let update_stats = Arc::new(stats::UpdateStats::default());
245-
source_context.update(&pool, &update_stats).await?;
245+
source_context
246+
.update(&pool, &update_stats, /*expect_little_diff=*/ false)
247+
.await?;
246248
if update_stats.has_any_change() {
247249
status_tx.send_modify(|update| {
248250
update.source_updates_num[source_idx] += 1;
@@ -260,7 +262,9 @@ impl SourceUpdateTask {
260262
interval.tick().await;
261263

262264
let update_stats = Arc::new(stats::UpdateStats::default());
263-
source_context.update(&pool, &update_stats).await?;
265+
source_context
266+
.update(&pool, &update_stats, /*expect_little_diff=*/ true)
267+
.await?;
264268
if update_stats.has_any_change() {
265269
status_tx.send_modify(|update| {
266270
update.source_updates_num[source_idx] += 1;

src/execution/source_indexer.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ impl SourceIndexingContext {
367367
self: &Arc<Self>,
368368
pool: &PgPool,
369369
update_stats: &Arc<stats::UpdateStats>,
370+
expect_little_diff: bool,
370371
) -> Result<()> {
371372
let pending_update_fut = {
372373
let mut pending_update = self.pending_update.lock().unwrap();
@@ -383,7 +384,8 @@ impl SourceIndexingContext {
383384
let mut pending_update = slf.pending_update.lock().unwrap();
384385
*pending_update = None;
385386
}
386-
slf.update_once(&pool, &update_stats).await?;
387+
slf.update_once(&pool, &update_stats, expect_little_diff)
388+
.await?;
387389
}
388390
anyhow::Ok(())
389391
});
@@ -406,17 +408,18 @@ impl SourceIndexingContext {
406408
self: &Arc<Self>,
407409
pool: &PgPool,
408410
update_stats: &Arc<stats::UpdateStats>,
411+
expect_little_diff: bool,
409412
) -> Result<()> {
410413
let plan = self.flow.get_execution_plan().await?;
411414
let import_op = &plan.import_ops[self.source_idx];
412-
let rows_stream = import_op
413-
.executor
414-
.list(&interface::SourceExecutorReadOptions {
415-
include_ordinal: true,
416-
include_content_version_fp: true,
417-
include_value: true,
418-
})
419-
.await?;
415+
let read_options = interface::SourceExecutorReadOptions {
416+
include_ordinal: true,
417+
include_content_version_fp: true,
418+
// When only a little diff is expected and the source provides ordinal, we don't fetch values during `list()` by default,
419+
// as there's a high chance that we don't need the values at all
420+
include_value: !(expect_little_diff && import_op.executor.provides_ordinal()),
421+
};
422+
let rows_stream = import_op.executor.list(&read_options).await?;
420423
self.update_with_stream(import_op, rows_stream, pool, update_stats)
421424
.await
422425
}

src/ops/interface.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ pub trait SourceExecutor: Send + Sync {
154154
) -> Result<Option<BoxStream<'async_trait, Result<SourceChangeMessage>>>> {
155155
Ok(None)
156156
}
157+
158+
fn provides_ordinal(&self) -> bool;
157159
}
158160

159161
#[async_trait]

src/ops/sources/amazon_s3.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,10 @@ impl SourceExecutor for Executor {
188188
};
189189
Ok(Some(stream.boxed()))
190190
}
191+
192+
fn provides_ordinal(&self) -> bool {
193+
true
194+
}
191195
}
192196

193197
#[derive(Debug, Deserialize)]

src/ops/sources/azure_blob.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,10 @@ impl SourceExecutor for Executor {
166166
// Azure Blob Storage doesn't have built-in change notifications like S3+SQS
167167
Ok(None)
168168
}
169+
170+
fn provides_ordinal(&self) -> bool {
171+
true
172+
}
169173
}
170174

171175
pub struct Factory;

src/ops/sources/google_drive.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,10 @@ impl SourceExecutor for Executor {
435435
};
436436
Ok(Some(stream.boxed()))
437437
}
438+
439+
fn provides_ordinal(&self) -> bool {
440+
true
441+
}
438442
}
439443

440444
pub struct Factory;

src/ops/sources/local_file.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ impl SourceExecutor for Executor {
115115
content_version_fp: None,
116116
})
117117
}
118+
119+
fn provides_ordinal(&self) -> bool {
120+
true
121+
}
118122
}
119123

120124
pub struct Factory;

src/ops/sources/postgres.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,10 @@ impl SourceExecutor for Executor {
479479

480480
Ok(data)
481481
}
482+
483+
fn provides_ordinal(&self) -> bool {
484+
self.table_schema.ordinal_field_schema.is_some()
485+
}
482486
}
483487

484488
pub struct Factory;

0 commit comments

Comments
 (0)