Skip to content

Commit b9a3d2c

Browse files
authored
refactor: make SourceExecutor::list() async with Result return (#903)
1 parent 9e6277c commit b9a3d2c

File tree

8 files changed

+48
-40
lines changed

8 files changed

+48
-40
lines changed

src/execution/dumper.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,10 +191,13 @@ impl<'a> Dumper<'a> {
191191
Vec<(value::KeyValue, serde_json::Value)>,
192192
> = IndexMap::new();
193193

194-
let mut rows_stream = import_op.executor.list(&SourceExecutorListOptions {
195-
include_ordinal: false,
196-
include_content_version_fp: false,
197-
});
194+
let mut rows_stream = import_op
195+
.executor
196+
.list(&SourceExecutorListOptions {
197+
include_ordinal: false,
198+
include_content_version_fp: false,
199+
})
200+
.await?;
198201
while let Some(rows) = rows_stream.next().await {
199202
for row in rows?.into_iter() {
200203
let mut s = row

src/execution/source_indexer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,8 @@ impl SourceIndexingContext {
414414
.list(&interface::SourceExecutorListOptions {
415415
include_ordinal: true,
416416
include_content_version_fp: true,
417-
});
417+
})
418+
.await?;
418419
self.update_with_stream(import_op, rows_stream, pool, update_stats)
419420
.await
420421
}

src/ops/interface.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,10 @@ pub struct PartialSourceRowData {
130130
#[async_trait]
131131
pub trait SourceExecutor: Send + Sync {
132132
/// Get the list of keys for the source.
133-
fn list<'a>(
134-
&'a self,
135-
options: &'a SourceExecutorListOptions,
136-
) -> BoxStream<'a, Result<Vec<PartialSourceRowMetadata>>>;
133+
async fn list(
134+
&self,
135+
options: &SourceExecutorListOptions,
136+
) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRowMetadata>>>>;
137137

138138
// Get the value for the given key.
139139
async fn get_value(

src/ops/sources/amazon_s3.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,11 @@ fn datetime_to_ordinal(dt: &aws_sdk_s3::primitives::DateTime) -> Ordinal {
6161

6262
#[async_trait]
6363
impl SourceExecutor for Executor {
64-
fn list<'a>(
65-
&'a self,
66-
_options: &'a SourceExecutorListOptions,
67-
) -> BoxStream<'a, Result<Vec<PartialSourceRowMetadata>>> {
68-
try_stream! {
64+
async fn list(
65+
&self,
66+
_options: &SourceExecutorListOptions,
67+
) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRowMetadata>>>> {
68+
let stream = try_stream! {
6969
let mut continuation_token = None;
7070
loop {
7171
let mut req = self.client
@@ -104,7 +104,8 @@ impl SourceExecutor for Executor {
104104
break;
105105
}
106106
}
107-
}.boxed()
107+
};
108+
Ok(stream.boxed())
108109
}
109110

110111
async fn get_value(

src/ops/sources/azure_blob.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ fn datetime_to_ordinal(dt: &time::OffsetDateTime) -> Ordinal {
4040

4141
#[async_trait]
4242
impl SourceExecutor for Executor {
43-
fn list<'a>(
44-
&'a self,
45-
_options: &'a SourceExecutorListOptions,
46-
) -> BoxStream<'a, Result<Vec<PartialSourceRowMetadata>>> {
47-
try_stream! {
43+
async fn list(
44+
&self,
45+
_options: &SourceExecutorListOptions,
46+
) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRowMetadata>>>> {
47+
let stream = try_stream! {
4848
let mut continuation_token: Option<NextMarker> = None;
4949
loop {
5050
let mut list_builder = self.client
@@ -93,8 +93,8 @@ impl SourceExecutor for Executor {
9393
break;
9494
}
9595
}
96-
}
97-
.boxed()
96+
};
97+
Ok(stream.boxed())
9898
}
9999

100100
async fn get_value(

src/ops/sources/google_drive.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -288,18 +288,18 @@ fn optional_modified_time(include_ordinal: bool) -> &'static str {
288288

289289
#[async_trait]
290290
impl SourceExecutor for Executor {
291-
fn list<'a>(
292-
&'a self,
293-
options: &'a SourceExecutorListOptions,
294-
) -> BoxStream<'a, Result<Vec<PartialSourceRowMetadata>>> {
291+
async fn list(
292+
&self,
293+
options: &SourceExecutorListOptions,
294+
) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRowMetadata>>>> {
295295
let mut seen_ids = HashSet::new();
296296
let mut folder_ids = self.root_folder_ids.clone();
297297
let fields = format!(
298298
"files(id,name,mimeType,trashed{})",
299299
optional_modified_time(options.include_ordinal)
300300
);
301301
let mut new_folder_ids = Vec::new();
302-
try_stream! {
302+
let stream = try_stream! {
303303
while let Some(folder_id) = folder_ids.pop() {
304304
let mut next_page_token = None;
305305
loop {
@@ -319,8 +319,8 @@ impl SourceExecutor for Executor {
319319
}
320320
folder_ids.extend(new_folder_ids.drain(..).rev());
321321
}
322-
}
323-
.boxed()
322+
};
323+
Ok(stream.boxed())
324324
}
325325

326326
async fn get_value(

src/ops/sources/local_file.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@ struct Executor {
2323

2424
#[async_trait]
2525
impl SourceExecutor for Executor {
26-
fn list<'a>(
27-
&'a self,
28-
options: &'a SourceExecutorListOptions,
29-
) -> BoxStream<'a, Result<Vec<PartialSourceRowMetadata>>> {
26+
async fn list(
27+
&self,
28+
options: &SourceExecutorListOptions,
29+
) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRowMetadata>>>> {
3030
let root_component_size = self.root_path.components().count();
3131
let mut dirs = Vec::new();
3232
dirs.push(Cow::Borrowed(&self.root_path));
3333
let mut new_dirs = Vec::new();
34-
try_stream! {
34+
let stream = try_stream! {
3535
while let Some(dir) = dirs.pop() {
3636
let mut entries = tokio::fs::read_dir(dir.as_ref()).await?;
3737
while let Some(entry) = entries.next_entry().await? {
@@ -64,8 +64,8 @@ impl SourceExecutor for Executor {
6464
}
6565
dirs.extend(new_dirs.drain(..).rev());
6666
}
67-
}
68-
.boxed()
67+
};
68+
Ok(stream.boxed())
6969
}
7070

7171
async fn get_value(

src/service/flows.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,13 @@ pub async fn get_keys(
105105
)
106106
})?;
107107

108-
let mut rows_stream = import_op.executor.list(&SourceExecutorListOptions {
109-
include_ordinal: false,
110-
include_content_version_fp: false,
111-
});
108+
let mut rows_stream = import_op
109+
.executor
110+
.list(&SourceExecutorListOptions {
111+
include_ordinal: false,
112+
include_content_version_fp: false,
113+
})
114+
.await?;
112115
let mut keys = Vec::new();
113116
while let Some(rows) = rows_stream.next().await {
114117
keys.extend(rows?.into_iter().map(|row| (row.key, row.key_aux_info)));

0 commit comments

Comments
 (0)