diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs index 5bd52822a..410458c95 100644 --- a/src/execution/dumper.rs +++ b/src/execution/dumper.rs @@ -191,10 +191,13 @@ impl<'a> Dumper<'a> { Vec<(value::KeyValue, serde_json::Value)>, > = IndexMap::new(); - let mut rows_stream = import_op.executor.list(&SourceExecutorListOptions { - include_ordinal: false, - include_content_version_fp: false, - }); + let mut rows_stream = import_op + .executor + .list(&SourceExecutorListOptions { + include_ordinal: false, + include_content_version_fp: false, + }) + .await?; while let Some(rows) = rows_stream.next().await { for row in rows?.into_iter() { let mut s = row diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index d9e4f6b95..2f08b79bf 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -414,7 +414,8 @@ impl SourceIndexingContext { .list(&interface::SourceExecutorListOptions { include_ordinal: true, include_content_version_fp: true, - }); + }) + .await?; self.update_with_stream(import_op, rows_stream, pool, update_stats) .await } diff --git a/src/ops/interface.rs b/src/ops/interface.rs index d22d42b36..3dfe88848 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -130,10 +130,10 @@ pub struct PartialSourceRowData { #[async_trait] pub trait SourceExecutor: Send + Sync { /// Get the list of keys for the source. - fn list<'a>( - &'a self, - options: &'a SourceExecutorListOptions, - ) -> BoxStream<'a, Result>>; + async fn list( + &self, + options: &SourceExecutorListOptions, + ) -> Result>>>; // Get the value for the given key. async fn get_value( diff --git a/src/ops/sources/amazon_s3.rs b/src/ops/sources/amazon_s3.rs index 765a277c5..6ff11b98e 100644 --- a/src/ops/sources/amazon_s3.rs +++ b/src/ops/sources/amazon_s3.rs @@ -61,11 +61,11 @@ fn datetime_to_ordinal(dt: &aws_sdk_s3::primitives::DateTime) -> Ordinal { #[async_trait] impl SourceExecutor for Executor { - fn list<'a>( - &'a self, - _options: &'a SourceExecutorListOptions, - ) -> BoxStream<'a, Result>> { - try_stream! { + async fn list( + &self, + _options: &SourceExecutorListOptions, + ) -> Result>>> { + let stream = try_stream! { let mut continuation_token = None; loop { let mut req = self.client @@ -104,7 +104,8 @@ impl SourceExecutor for Executor { break; } } - }.boxed() + }; + Ok(stream.boxed()) } async fn get_value( diff --git a/src/ops/sources/azure_blob.rs b/src/ops/sources/azure_blob.rs index b54e6e644..583ca3b9e 100644 --- a/src/ops/sources/azure_blob.rs +++ b/src/ops/sources/azure_blob.rs @@ -40,11 +40,11 @@ fn datetime_to_ordinal(dt: &time::OffsetDateTime) -> Ordinal { #[async_trait] impl SourceExecutor for Executor { - fn list<'a>( - &'a self, - _options: &'a SourceExecutorListOptions, - ) -> BoxStream<'a, Result>> { - try_stream! { + async fn list( + &self, + _options: &SourceExecutorListOptions, + ) -> Result>>> { + let stream = try_stream! { let mut continuation_token: Option = None; loop { let mut list_builder = self.client @@ -93,8 +93,8 @@ impl SourceExecutor for Executor { break; } } - } - .boxed() + }; + Ok(stream.boxed()) } async fn get_value( diff --git a/src/ops/sources/google_drive.rs b/src/ops/sources/google_drive.rs index 1fe0f2dd3..6f0cc8029 100644 --- a/src/ops/sources/google_drive.rs +++ b/src/ops/sources/google_drive.rs @@ -288,10 +288,10 @@ fn optional_modified_time(include_ordinal: bool) -> &'static str { #[async_trait] impl SourceExecutor for Executor { - fn list<'a>( - &'a self, - options: &'a SourceExecutorListOptions, - ) -> BoxStream<'a, Result>> { + async fn list( + &self, + options: &SourceExecutorListOptions, + ) -> Result>>> { let mut seen_ids = HashSet::new(); let mut folder_ids = self.root_folder_ids.clone(); let fields = format!( @@ -299,7 +299,7 @@ impl SourceExecutor for Executor { optional_modified_time(options.include_ordinal) ); let mut new_folder_ids = Vec::new(); - try_stream! { + let stream = try_stream! { while let Some(folder_id) = folder_ids.pop() { let mut next_page_token = None; loop { @@ -319,8 +319,8 @@ impl SourceExecutor for Executor { } folder_ids.extend(new_folder_ids.drain(..).rev()); } - } - .boxed() + }; + Ok(stream.boxed()) } async fn get_value( diff --git a/src/ops/sources/local_file.rs b/src/ops/sources/local_file.rs index 84f8ed18c..72b1ad38b 100644 --- a/src/ops/sources/local_file.rs +++ b/src/ops/sources/local_file.rs @@ -23,15 +23,15 @@ struct Executor { #[async_trait] impl SourceExecutor for Executor { - fn list<'a>( - &'a self, - options: &'a SourceExecutorListOptions, - ) -> BoxStream<'a, Result>> { + async fn list( + &self, + options: &SourceExecutorListOptions, + ) -> Result>>> { let root_component_size = self.root_path.components().count(); let mut dirs = Vec::new(); dirs.push(Cow::Borrowed(&self.root_path)); let mut new_dirs = Vec::new(); - try_stream! { + let stream = try_stream! { while let Some(dir) = dirs.pop() { let mut entries = tokio::fs::read_dir(dir.as_ref()).await?; while let Some(entry) = entries.next_entry().await? { @@ -64,8 +64,8 @@ impl SourceExecutor for Executor { } dirs.extend(new_dirs.drain(..).rev()); } - } - .boxed() + }; + Ok(stream.boxed()) } async fn get_value( diff --git a/src/service/flows.rs b/src/service/flows.rs index 52839ab1d..03971a0f9 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -105,10 +105,13 @@ pub async fn get_keys( ) })?; - let mut rows_stream = import_op.executor.list(&SourceExecutorListOptions { - include_ordinal: false, - include_content_version_fp: false, - }); + let mut rows_stream = import_op + .executor + .list(&SourceExecutorListOptions { + include_ordinal: false, + include_content_version_fp: false, + }) + .await?; let mut keys = Vec::new(); while let Some(rows) = rows_stream.next().await { keys.extend(rows?.into_iter().map(|row| (row.key, row.key_aux_info)));