Skip to content

Commit 9e6277c

Browse files
authored
refactor(indexer): extract the method to handle row stream for reuse (#902)
1 parent c394925 commit 9e6277c

File tree

1 file changed

+12
-1
lines changed

1 file changed

+12
-1
lines changed

src/execution/source_indexer.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,12 +409,23 @@ impl SourceIndexingContext {
409409
) -> Result<()> {
410410
let plan = self.flow.get_execution_plan().await?;
411411
let import_op = &plan.import_ops[self.source_idx];
412-
let mut rows_stream = import_op
412+
let rows_stream = import_op
413413
.executor
414414
.list(&interface::SourceExecutorListOptions {
415415
include_ordinal: true,
416416
include_content_version_fp: true,
417417
});
418+
self.update_with_stream(import_op, rows_stream, pool, update_stats)
419+
.await
420+
}
421+
422+
async fn update_with_stream(
423+
self: &Arc<Self>,
424+
import_op: &plan::AnalyzedImportOp,
425+
mut rows_stream: BoxStream<'_, Result<Vec<interface::PartialSourceRowMetadata>>>,
426+
pool: &PgPool,
427+
update_stats: &Arc<stats::UpdateStats>,
428+
) -> Result<()> {
418429
let mut join_set = JoinSet::new();
419430
let scan_generation = {
420431
let mut state = self.state.lock().unwrap();

0 commit comments

Comments
 (0)