Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/execution/dumper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,13 @@ impl<'a> Dumper<'a> {
Vec<(value::KeyValue, serde_json::Value)>,
> = IndexMap::new();

let mut rows_stream = import_op.executor.list(&SourceExecutorListOptions {
include_ordinal: false,
include_content_version_fp: false,
});
let mut rows_stream = import_op
.executor
.list(&SourceExecutorListOptions {
include_ordinal: false,
include_content_version_fp: false,
})
.await?;
while let Some(rows) = rows_stream.next().await {
for row in rows?.into_iter() {
let mut s = row
Expand Down
3 changes: 2 additions & 1 deletion src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,8 @@ impl SourceIndexingContext {
.list(&interface::SourceExecutorListOptions {
include_ordinal: true,
include_content_version_fp: true,
});
})
.await?;
self.update_with_stream(import_op, rows_stream, pool, update_stats)
.await
}
Expand Down
8 changes: 4 additions & 4 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ pub struct PartialSourceRowData {
#[async_trait]
pub trait SourceExecutor: Send + Sync {
/// Get the list of keys for the source.
fn list<'a>(
&'a self,
options: &'a SourceExecutorListOptions,
) -> BoxStream<'a, Result<Vec<PartialSourceRowMetadata>>>;
async fn list(
&self,
options: &SourceExecutorListOptions,
) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRowMetadata>>>>;

// Get the value for the given key.
async fn get_value(
Expand Down
13 changes: 7 additions & 6 deletions src/ops/sources/amazon_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ fn datetime_to_ordinal(dt: &aws_sdk_s3::primitives::DateTime) -> Ordinal {

#[async_trait]
impl SourceExecutor for Executor {
fn list<'a>(
&'a self,
_options: &'a SourceExecutorListOptions,
) -> BoxStream<'a, Result<Vec<PartialSourceRowMetadata>>> {
try_stream! {
async fn list(
&self,
_options: &SourceExecutorListOptions,
) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRowMetadata>>>> {
let stream = try_stream! {
let mut continuation_token = None;
loop {
let mut req = self.client
Expand Down Expand Up @@ -104,7 +104,8 @@ impl SourceExecutor for Executor {
break;
}
}
}.boxed()
};
Ok(stream.boxed())
}

async fn get_value(
Expand Down
14 changes: 7 additions & 7 deletions src/ops/sources/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ fn datetime_to_ordinal(dt: &time::OffsetDateTime) -> Ordinal {

#[async_trait]
impl SourceExecutor for Executor {
fn list<'a>(
&'a self,
_options: &'a SourceExecutorListOptions,
) -> BoxStream<'a, Result<Vec<PartialSourceRowMetadata>>> {
try_stream! {
async fn list(
&self,
_options: &SourceExecutorListOptions,
) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRowMetadata>>>> {
let stream = try_stream! {
let mut continuation_token: Option<NextMarker> = None;
loop {
let mut list_builder = self.client
Expand Down Expand Up @@ -93,8 +93,8 @@ impl SourceExecutor for Executor {
break;
}
}
}
.boxed()
};
Ok(stream.boxed())
}

async fn get_value(
Expand Down
14 changes: 7 additions & 7 deletions src/ops/sources/google_drive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,18 +288,18 @@ fn optional_modified_time(include_ordinal: bool) -> &'static str {

#[async_trait]
impl SourceExecutor for Executor {
fn list<'a>(
&'a self,
options: &'a SourceExecutorListOptions,
) -> BoxStream<'a, Result<Vec<PartialSourceRowMetadata>>> {
async fn list(
&self,
options: &SourceExecutorListOptions,
) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRowMetadata>>>> {
let mut seen_ids = HashSet::new();
let mut folder_ids = self.root_folder_ids.clone();
let fields = format!(
"files(id,name,mimeType,trashed{})",
optional_modified_time(options.include_ordinal)
);
let mut new_folder_ids = Vec::new();
try_stream! {
let stream = try_stream! {
while let Some(folder_id) = folder_ids.pop() {
let mut next_page_token = None;
loop {
Expand All @@ -319,8 +319,8 @@ impl SourceExecutor for Executor {
}
folder_ids.extend(new_folder_ids.drain(..).rev());
}
}
.boxed()
};
Ok(stream.boxed())
}

async fn get_value(
Expand Down
14 changes: 7 additions & 7 deletions src/ops/sources/local_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ struct Executor {

#[async_trait]
impl SourceExecutor for Executor {
fn list<'a>(
&'a self,
options: &'a SourceExecutorListOptions,
) -> BoxStream<'a, Result<Vec<PartialSourceRowMetadata>>> {
async fn list(
&self,
options: &SourceExecutorListOptions,
) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRowMetadata>>>> {
let root_component_size = self.root_path.components().count();
let mut dirs = Vec::new();
dirs.push(Cow::Borrowed(&self.root_path));
let mut new_dirs = Vec::new();
try_stream! {
let stream = try_stream! {
while let Some(dir) = dirs.pop() {
let mut entries = tokio::fs::read_dir(dir.as_ref()).await?;
while let Some(entry) = entries.next_entry().await? {
Expand Down Expand Up @@ -64,8 +64,8 @@ impl SourceExecutor for Executor {
}
dirs.extend(new_dirs.drain(..).rev());
}
}
.boxed()
};
Ok(stream.boxed())
}

async fn get_value(
Expand Down
11 changes: 7 additions & 4 deletions src/service/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,13 @@ pub async fn get_keys(
)
})?;

let mut rows_stream = import_op.executor.list(&SourceExecutorListOptions {
include_ordinal: false,
include_content_version_fp: false,
});
let mut rows_stream = import_op
.executor
.list(&SourceExecutorListOptions {
include_ordinal: false,
include_content_version_fp: false,
})
.await?;
let mut keys = Vec::new();
while let Some(rows) = rows_stream.next().await {
keys.extend(rows?.into_iter().map(|row| (row.key, row.key_aux_info)));
Expand Down
Loading