From 850351bbe9c260cc42b2edf2af52d60f56573e18 Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Fri, 22 Aug 2025 11:10:39 -0700 Subject: [PATCH] refactor(indexer): extract the method to handle row stream for reuse --- src/execution/source_indexer.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index c5955cd8c..d9e4f6b95 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -409,12 +409,23 @@ impl SourceIndexingContext { ) -> Result<()> { let plan = self.flow.get_execution_plan().await?; let import_op = &plan.import_ops[self.source_idx]; - let mut rows_stream = import_op + let rows_stream = import_op .executor .list(&interface::SourceExecutorListOptions { include_ordinal: true, include_content_version_fp: true, }); + self.update_with_stream(import_op, rows_stream, pool, update_stats) + .await + } + + async fn update_with_stream( + self: &Arc, + import_op: &plan::AnalyzedImportOp, + mut rows_stream: BoxStream<'_, Result>>, + pool: &PgPool, + update_stats: &Arc, + ) -> Result<()> { let mut join_set = JoinSet::new(); let scan_generation = { let mut state = self.state.lock().unwrap();