Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/sources/googledrive.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ The spec takes the following fields:
In reality, configure them based on your requirement: how fresh do you need the target index to be?

:::
* `max_file_size` (`int`, optional): when set, any source file exceeding the limit (in bytes) will be ignored.

### Schema

Expand Down
1 change: 1 addition & 0 deletions python/cocoindex/sources/_engine_builtin_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class GoogleDrive(op.SourceSpec):
root_folder_ids: list[str]
binary: bool = False
recent_changes_poll_interval: datetime.timedelta | None = None
max_file_size: int | None = None


@dataclass
Expand Down
27 changes: 25 additions & 2 deletions src/ops/sources/google_drive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ pub struct Spec {
binary: bool,
root_folder_ids: Vec<String>,
recent_changes_poll_interval: Option<std::time::Duration>,
max_file_size: Option<i64>,
}

struct Executor {
drive_hub: DriveHub<HttpsConnector<HttpConnector>>,
binary: bool,
root_folder_ids: IndexSet<Arc<str>>,
recent_updates_poll_interval: Option<std::time::Duration>,
max_file_size: Option<i64>,
}

impl Executor {
Expand All @@ -92,6 +94,7 @@ impl Executor {
binary: spec.binary,
root_folder_ids: spec.root_folder_ids.into_iter().map(Arc::from).collect(),
recent_updates_poll_interval: spec.recent_changes_poll_interval,
max_file_size: spec.max_file_size,
})
}
}
Expand Down Expand Up @@ -298,7 +301,7 @@ impl SourceExecutor for Executor {
let mut seen_ids = HashSet::new();
let mut folder_ids = self.root_folder_ids.clone();
let fields = format!(
"files(id,name,mimeType,trashed{})",
"files(id,name,mimeType,trashed,size{})",
optional_modified_time(options.include_ordinal)
);
let mut new_folder_ids = Vec::new();
Expand All @@ -311,6 +314,14 @@ impl SourceExecutor for Executor {
.list_files(&folder_id, &fields, &mut next_page_token)
.await?;
for file in files {
if let Some(max_size) = self.max_file_size {
if let Some(file_size) = file.size {
if file_size > max_size {
// Skip files over the specified limit
continue;
}
}
}
curr_rows.extend(self.visit_file(file, &mut new_folder_ids, &mut seen_ids)?);
}
if !curr_rows.is_empty() {
Expand All @@ -334,7 +345,7 @@ impl SourceExecutor for Executor {
) -> Result<PartialSourceRowData> {
let file_id = key.single_part()?.str_value()?;
let fields = format!(
"id,name,mimeType,trashed{}",
"id,name,mimeType,trashed,size{}",
optional_modified_time(options.include_ordinal)
);
let resp = self
Expand All @@ -356,6 +367,18 @@ impl SourceExecutor for Executor {
});
}
};
// Check file size limit
if let Some(max_size) = self.max_file_size {
if let Some(file_size) = file.size {
if file_size > max_size {
return Ok(PartialSourceRowData {
value: Some(SourceValue::NonExistence),
ordinal: Some(Ordinal::unavailable()),
content_version_fp: None,
});
}
}
}
let ordinal = if options.include_ordinal {
file.modified_time.map(|t| t.try_into()).transpose()?
} else {
Expand Down
Loading