Skip to content

Commit 1a91011

Browse files
author
prabhath004
committed
feat: add max_file_size support to AzureBlob source
1 parent 7827f98 commit 1a91011

File tree

3 files changed

+37
-2
lines changed

3 files changed

+37
-2
lines changed

docs/docs/sources/azureblob.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,19 @@ The spec takes the following fields:
6363
* `excluded_patterns` (`list[str]`, optional): a list of glob patterns to exclude files, e.g. `["*.tmp", "**/*.log"]`.
6464
Any file or directory matching these patterns will be excluded even if they match `included_patterns`.
6565
If not specified, no files will be excluded.
66-
* `sas_token` (`cocoindex.TransientAuthEntryReference[str]`, optional): a SAS token for authentication.
67-
* `account_access_key` (`cocoindex.TransientAuthEntryReference[str]`, optional): an account access key for authentication.
6866

6967
:::info
7068

7169
`included_patterns` and `excluded_patterns` are using Unix-style glob syntax. See [globset syntax](https://docs.rs/globset/latest/globset/index.html#syntax) for the details.
7270

7371
:::
7472

73+
* `max_file_size` (`int`, optional): if provided, files exceeding this size in bytes will be treated as non-existent and skipped during processing.
74+
This is useful to avoid processing large files that are not relevant to your use case, such as videos or backups.
75+
If not specified, no size limit is applied.
76+
* `sas_token` (`cocoindex.TransientAuthEntryReference[str]`, optional): a SAS token for authentication.
77+
* `account_access_key` (`cocoindex.TransientAuthEntryReference[str]`, optional): an account access key for authentication.
78+
7579
### Schema
7680

7781
The output is a [*KTable*](/docs/core/data_types#ktable) with the following sub fields:

python/cocoindex/sources/_engine_builtin_specs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class AzureBlob(op.SourceSpec):
7878
binary: bool = False
7979
included_patterns: list[str] | None = None
8080
excluded_patterns: list[str] | None = None
81+
max_file_size: int | None = None
8182

8283
sas_token: TransientAuthEntryReference[str] | None = None
8384
account_access_key: TransientAuthEntryReference[str] | None = None

src/ops/sources/azure_blob.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub struct Spec {
1919
binary: bool,
2020
included_patterns: Option<Vec<String>>,
2121
excluded_patterns: Option<Vec<String>>,
22+
max_file_size: Option<i64>,
2223

2324
/// SAS token for authentication. Takes precedence over account_access_key.
2425
sas_token: Option<AuthEntryReference<String>>,
@@ -32,6 +33,7 @@ struct Executor {
3233
prefix: Option<String>,
3334
binary: bool,
3435
pattern_matcher: PatternMatcher,
36+
max_file_size: Option<i64>,
3537
}
3638

3739
fn datetime_to_ordinal(dt: &time::OffsetDateTime) -> Ordinal {
@@ -73,6 +75,15 @@ impl SourceExecutor for Executor {
7375
// Only include files (not directories)
7476
if key.ends_with('/') { continue; }
7577

78+
// Check file size limit
79+
if let Some(max_size) = self.max_file_size {
80+
if let Some(size) = blob.properties.content_length {
81+
if size > max_size {
82+
continue;
83+
}
84+
}
85+
}
86+
7687
if self.pattern_matcher.is_file_included(key) {
7788
let ordinal = Some(datetime_to_ordinal(&blob.properties.last_modified));
7889
batch.push(PartialSourceRow {
@@ -115,6 +126,24 @@ impl SourceExecutor for Executor {
115126
});
116127
}
117128

129+
// Check file size limit
130+
if let Some(max_size) = self.max_file_size {
131+
let blob_client = self
132+
.client
133+
.container_client(&self.container_name)
134+
.blob_client(key_str.as_ref());
135+
let properties = blob_client.get_properties().await?;
136+
if let Some(size) = properties.blob.properties.content_length {
137+
if size > max_size {
138+
return Ok(PartialSourceRowData {
139+
value: Some(SourceValue::NonExistence),
140+
ordinal: Some(Ordinal::unavailable()),
141+
content_version_fp: None,
142+
});
143+
}
144+
}
145+
}
146+
118147
let blob_client = self
119148
.client
120149
.container_client(&self.container_name)
@@ -238,6 +267,7 @@ impl SourceFactoryBase for Factory {
238267
prefix: spec.prefix,
239268
binary: spec.binary,
240269
pattern_matcher: PatternMatcher::new(spec.included_patterns, spec.excluded_patterns)?,
270+
max_file_size: spec.max_file_size,
241271
}))
242272
}
243273
}

0 commit comments

Comments
 (0)