Skip to content

Commit fc740a3

Browse files
authored
Change source's list() API to streaming and return ordinal. (#228)
1 parent 1401522 commit fc740a3

File tree

8 files changed

+206
-119
lines changed

8 files changed

+206
-119
lines changed

Cargo.toml

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,7 @@ sqlx = { version = "0.8.3", features = [
3131
"runtime-tokio",
3232
"uuid",
3333
] }
34-
tokio = { version = "1.44.1", features = [
35-
"macros",
36-
"rt-multi-thread",
37-
"full",
38-
"tracing",
39-
] }
34+
tokio = { version = "1.44.1", features = ["macros", "rt-multi-thread", "full", "tracing", "fs"] }
4035
tower = "0.5.2"
4136
tower-http = { version = "0.6.2", features = ["cors", "trace"] }
4237
indexmap = { version = "2.8.0", features = ["serde"] }

src/execution/dumper.rs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use anyhow::Result;
22
use futures::future::try_join_all;
3+
use futures::StreamExt;
34
use indexmap::IndexMap;
45
use itertools::Itertools;
56
use serde::ser::SerializeSeq;
@@ -14,6 +15,7 @@ use super::indexer;
1415
use super::memoization::EvaluationMemoryOptions;
1516
use crate::base::{schema, value};
1617
use crate::builder::plan::{AnalyzedSourceOp, ExecutionPlan};
18+
use crate::ops::interface::SourceExecutorListOptions;
1719
use crate::utils::yaml_ser::YamlSerializer;
1820

1921
#[derive(Debug, Clone, Deserialize)]
@@ -163,22 +165,27 @@ impl<'a> Dumper<'a> {
163165
}
164166

165167
async fn evaluate_and_dump_for_source_op(&self, source_op: &AnalyzedSourceOp) -> Result<()> {
166-
let all_keys = source_op.executor.list_keys().await?;
167-
168168
let mut keys_by_filename_prefix: IndexMap<String, Vec<value::KeyValue>> = IndexMap::new();
169-
for key in all_keys {
170-
let mut s = key
171-
.to_strs()
172-
.into_iter()
173-
.map(|s| urlencoding::encode(&s).into_owned())
174-
.join(":");
175-
s.truncate(
176-
(0..(FILENAME_PREFIX_MAX_LENGTH - source_op.name.as_str().len()))
177-
.rev()
178-
.find(|i| s.is_char_boundary(*i))
179-
.unwrap_or(0),
180-
);
181-
keys_by_filename_prefix.entry(s).or_default().push(key);
169+
170+
let mut rows_stream = source_op.executor.list(SourceExecutorListOptions {
171+
include_ordinal: false,
172+
});
173+
while let Some(rows) = rows_stream.next().await {
174+
for row in rows?.into_iter() {
175+
let mut s = row
176+
.key
177+
.to_strs()
178+
.into_iter()
179+
.map(|s| urlencoding::encode(&s).into_owned())
180+
.join(":");
181+
s.truncate(
182+
(0..(FILENAME_PREFIX_MAX_LENGTH - source_op.name.as_str().len()))
183+
.rev()
184+
.find(|i| s.is_char_boundary(*i))
185+
.unwrap_or(0),
186+
);
187+
keys_by_filename_prefix.entry(s).or_default().push(row.key);
188+
}
182189
}
183190
let output_dir = Path::new(&self.options.output_dir);
184191
let evaluate_futs =

src/execution/indexer.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::prelude::*;
22

3-
use futures::future::{join, join_all, try_join, try_join_all};
3+
use futures::future::{join, join_all, try_join_all};
44
use itertools::Itertools;
55
use log::error;
66
use serde::Serialize;
@@ -14,7 +14,9 @@ use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, StoredMemoiz
1414
use crate::base::schema;
1515
use crate::base::value::{self, FieldValues, KeyValue};
1616
use crate::builder::plan::*;
17-
use crate::ops::interface::{ExportTargetMutation, ExportTargetUpsertEntry, Ordinal};
17+
use crate::ops::interface::{
18+
ExportTargetMutation, ExportTargetUpsertEntry, Ordinal, SourceExecutorListOptions,
19+
};
1820
use crate::utils::db::WriteAction;
1921
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};
2022

@@ -638,16 +640,21 @@ async fn update_source(
638640
schema: &schema::DataSchema,
639641
pool: &PgPool,
640642
) -> Result<SourceUpdateInfo> {
641-
let (keys, existing_keys_json) = try_join(
642-
source_op.executor.list_keys(),
643-
db_tracking::list_source_tracking_keys(
644-
source_op.source_id,
645-
&plan.tracking_table_setup,
646-
pool,
647-
),
643+
let existing_keys_json = db_tracking::list_source_tracking_keys(
644+
source_op.source_id,
645+
&plan.tracking_table_setup,
646+
pool,
648647
)
649648
.await?;
650649

650+
let mut keys = Vec::new();
651+
let mut rows_stream = source_op.executor.list(SourceExecutorListOptions {
652+
include_ordinal: false,
653+
});
654+
while let Some(rows) = rows_stream.next().await {
655+
keys.extend(rows?.into_iter().map(|row| row.key));
656+
}
657+
651658
let stats = UpdateStats::default();
652659
let upsert_futs = join_all(keys.iter().map(|key| {
653660
update_source_entry_with_err_handling(plan, source_op, schema, key, false, pool, &stats)

src/ops/interface.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ pub struct SourceData<'a> {
4747
pub value: BoxFuture<'a, Result<Option<FieldValues>>>,
4848
}
4949

50+
pub struct SourceRowMetadata {
51+
pub key: KeyValue,
52+
/// None means the ordinal is unavailable.
53+
pub ordinal: Option<Ordinal>,
54+
}
55+
5056
pub struct SourceChange<'a> {
5157
/// Last update/deletion ordinal. None means unavailable.
5258
pub ordinal: Option<Ordinal>,
@@ -55,10 +61,18 @@ pub struct SourceChange<'a> {
5561
pub value: Option<BoxFuture<'a, Result<Option<FieldValues>>>>,
5662
}
5763

64+
#[derive(Debug, Default)]
65+
pub struct SourceExecutorListOptions {
66+
pub include_ordinal: bool,
67+
}
68+
5869
#[async_trait]
5970
pub trait SourceExecutor: Send + Sync {
6071
/// Get the list of keys for the source.
61-
async fn list_keys(&self) -> Result<Vec<KeyValue>>;
72+
fn list<'a>(
73+
&'a self,
74+
options: SourceExecutorListOptions,
75+
) -> BoxStream<'a, Result<Vec<SourceRowMetadata>>>;
6276

6377
// Get the value for the given key.
6478
async fn get_value(&self, key: &KeyValue) -> Result<Option<SourceData<'async_trait>>>;

src/ops/sources/google_drive.rs

Lines changed: 92 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
use std::{
2-
collections::HashMap,
2+
collections::{HashMap, HashSet},
33
sync::{Arc, LazyLock},
44
};
55

6+
use async_stream::try_stream;
67
use google_drive3::{
7-
api::Scope,
8+
api::{File, Scope},
89
yup_oauth2::{read_service_account_key, ServiceAccountAuthenticator},
910
DriveHub,
1011
};
1112
use http_body_util::BodyExt;
1213
use hyper_rustls::HttpsConnector;
1314
use hyper_util::client::legacy::connect::HttpConnector;
14-
use indexmap::IndexSet;
15-
use log::warn;
15+
use log::{trace, warn};
1616

1717
use crate::base::field_attrs;
1818
use crate::ops::sdk::*;
@@ -80,7 +80,7 @@ pub struct Spec {
8080
struct Executor {
8181
drive_hub: DriveHub<HttpsConnector<HttpConnector>>,
8282
binary: bool,
83-
root_folder_ids: Vec<String>,
83+
root_folder_ids: Vec<Arc<str>>,
8484
}
8585

8686
impl Executor {
@@ -105,7 +105,7 @@ impl Executor {
105105
Ok(Self {
106106
drive_hub,
107107
binary: spec.binary,
108-
root_folder_ids: spec.root_folder_ids,
108+
root_folder_ids: spec.root_folder_ids.into_iter().map(Arc::from).collect(),
109109
})
110110
}
111111
}
@@ -123,55 +123,60 @@ fn escape_string(s: &str) -> String {
123123
}
124124

125125
impl Executor {
126-
async fn traverse_folder(
126+
fn visit_file(
127127
&self,
128-
folder_id: &str,
129-
visited_folder_ids: &mut IndexSet<String>,
130-
result: &mut IndexSet<KeyValue>,
131-
) -> Result<()> {
132-
if !visited_folder_ids.insert(folder_id.to_string()) {
133-
return Ok(());
128+
file: File,
129+
new_folder_ids: &mut Vec<Arc<str>>,
130+
seen_ids: &mut HashSet<Arc<str>>,
131+
) -> Result<Option<SourceRowMetadata>> {
132+
if file.trashed == Some(true) {
133+
return Ok(None);
134134
}
135-
let query = format!("'{}' in parents", escape_string(folder_id));
136-
let mut next_page_token: Option<String> = None;
137-
loop {
138-
let mut list_call = self
139-
.drive_hub
140-
.files()
141-
.list()
142-
.add_scope(Scope::Readonly)
143-
.q(&query);
144-
if let Some(next_page_token) = &next_page_token {
145-
list_call = list_call.page_token(next_page_token);
146-
}
147-
let (_, files) = list_call.doit().await?;
148-
if let Some(files) = files.files {
149-
for file in files {
150-
match (file.id, file.mime_type) {
151-
(Some(id), Some(mime_type)) => {
152-
if mime_type == FOLDER_MIME_TYPE {
153-
Box::pin(self.traverse_folder(&id, visited_folder_ids, result))
154-
.await?;
155-
} else if is_supported_file_type(&mime_type) {
156-
result.insert(KeyValue::Str(Arc::from(id)));
157-
} else {
158-
warn!("Skipping file with unsupported mime type: id={id}, mime_type={mime_type}, name={:?}", file.name);
159-
}
160-
}
161-
(id, mime_type) => {
162-
warn!(
163-
"Skipping file with incomplete metadata: id={id:?}, mime_type={mime_type:?}",
164-
);
165-
}
166-
}
167-
}
168-
}
169-
next_page_token = files.next_page_token;
170-
if next_page_token.is_none() {
171-
break;
135+
let (id, mime_type) = match (file.id, file.mime_type) {
136+
(Some(id), Some(mime_type)) => (Arc::<str>::from(id), mime_type),
137+
(id, mime_type) => {
138+
warn!("Skipping file with incomplete metadata: id={id:?}, mime_type={mime_type:?}",);
139+
return Ok(None);
172140
}
141+
};
142+
if !seen_ids.insert(id.clone()) {
143+
return Ok(None);
144+
}
145+
let result = if mime_type == FOLDER_MIME_TYPE {
146+
new_folder_ids.push(id);
147+
None
148+
} else if is_supported_file_type(&mime_type) {
149+
Some(SourceRowMetadata {
150+
key: KeyValue::Str(Arc::from(id)),
151+
ordinal: file.modified_time.map(|t| t.try_into()).transpose()?,
152+
})
153+
} else {
154+
trace!("Skipping file with unsupported mime type: id={id}, mime_type={mime_type}, name={:?}", file.name);
155+
None
156+
};
157+
Ok(result)
158+
}
159+
160+
async fn list_files(
161+
&self,
162+
folder_id: &str,
163+
fields: &str,
164+
next_page_token: &mut Option<String>,
165+
) -> Result<impl Iterator<Item = File>> {
166+
let query = format!("'{}' in parents", escape_string(folder_id));
167+
let mut list_call = self
168+
.drive_hub
169+
.files()
170+
.list()
171+
.add_scope(Scope::Readonly)
172+
.q(&query)
173+
.param("fields", fields);
174+
if let Some(next_page_token) = &next_page_token {
175+
list_call = list_call.page_token(next_page_token);
173176
}
174-
Ok(())
177+
let (_, files) = list_call.doit().await?;
178+
let file_iter = files.files.into_iter().flat_map(|file| file.into_iter());
179+
Ok(file_iter)
175180
}
176181
}
177182

@@ -202,13 +207,43 @@ impl<T> ResultExt<T> for google_drive3::Result<T> {
202207

203208
#[async_trait]
204209
impl SourceExecutor for Executor {
205-
async fn list_keys(&self) -> Result<Vec<KeyValue>> {
206-
let mut result = IndexSet::new();
207-
for root_folder_id in &self.root_folder_ids {
208-
self.traverse_folder(root_folder_id, &mut IndexSet::new(), &mut result)
209-
.await?;
210+
fn list<'a>(
211+
&'a self,
212+
options: SourceExecutorListOptions,
213+
) -> BoxStream<'a, Result<Vec<SourceRowMetadata>>> {
214+
let mut seen_ids = HashSet::new();
215+
let mut folder_ids = self.root_folder_ids.clone();
216+
let fields = format!(
217+
"files(id,name,mimeType,trashed{})",
218+
if options.include_ordinal {
219+
",modifiedTime"
220+
} else {
221+
""
222+
}
223+
);
224+
let mut new_folder_ids = Vec::new();
225+
try_stream! {
226+
while let Some(folder_id) = folder_ids.pop() {
227+
let mut next_page_token = None;
228+
loop {
229+
let mut curr_rows = Vec::new();
230+
let files = self
231+
.list_files(&folder_id, &fields, &mut next_page_token)
232+
.await?;
233+
for file in files {
234+
curr_rows.extend(self.visit_file(file, &mut new_folder_ids, &mut seen_ids)?);
235+
}
236+
if !curr_rows.is_empty() {
237+
yield curr_rows;
238+
}
239+
if next_page_token.is_none() {
240+
break;
241+
}
242+
}
243+
folder_ids.extend(new_folder_ids.drain(..).rev());
244+
}
210245
}
211-
Ok(result.into_iter().collect())
246+
.boxed()
212247
}
213248

214249
async fn get_value(&self, key: &KeyValue) -> Result<Option<SourceData<'async_trait>>> {

0 commit comments

Comments
 (0)