diff --git a/docs/docs/sources/amazons3.md b/docs/docs/sources/amazons3.md index 09a28a1e..bda77297 100644 --- a/docs/docs/sources/amazons3.md +++ b/docs/docs/sources/amazons3.md @@ -131,6 +131,9 @@ The spec takes the following fields: ::: +* `max_file_size` (`int`, optional): if provided, files exceeding this size in bytes will be treated as non-existent and skipped during processing. + This is useful to avoid processing large files that are not relevant to your use case, such as videos or backups. + If not specified, no size limit is applied. * `sqs_queue_url` (`str`, optional): if provided, the source will receive change event notifications from Amazon S3 via this SQS queue. :::info diff --git a/python/cocoindex/sources/_engine_builtin_specs.py b/python/cocoindex/sources/_engine_builtin_specs.py index 6c90307b..4549ca08 100644 --- a/python/cocoindex/sources/_engine_builtin_specs.py +++ b/python/cocoindex/sources/_engine_builtin_specs.py @@ -55,6 +55,7 @@ class AmazonS3(op.SourceSpec): binary: bool = False included_patterns: list[str] | None = None excluded_patterns: list[str] | None = None + max_file_size: int | None = None sqs_queue_url: str | None = None redis: RedisNotification | None = None diff --git a/src/ops/sources/amazon_s3.rs b/src/ops/sources/amazon_s3.rs index 8476347e..1cd8a93f 100644 --- a/src/ops/sources/amazon_s3.rs +++ b/src/ops/sources/amazon_s3.rs @@ -34,6 +34,7 @@ pub struct Spec { binary: bool, included_patterns: Option>, excluded_patterns: Option>, + max_file_size: Option, sqs_queue_url: Option, redis: Option, } @@ -82,6 +83,7 @@ struct Executor { prefix: Option, binary: bool, pattern_matcher: PatternMatcher, + max_file_size: Option, sqs_context: Option>, redis_context: Option>, } @@ -115,6 +117,14 @@ impl SourceExecutor for Executor { if let Some(key) = obj.key() { // Only include files (not folders) if key.ends_with('/') { continue; } + // Check file size limit + if let Some(max_size) = self.max_file_size { + if let Some(size) = obj.size() { + if size > max_size { + continue; + } + } + } if self.pattern_matcher.is_file_included(key) { batch.push(PartialSourceRow { key: KeyValue::from_single_part(key.to_string()), @@ -156,6 +166,25 @@ impl SourceExecutor for Executor { content_version_fp: None, }); } + // Check file size limit + if let Some(max_size) = self.max_file_size { + let head_result = self + .client + .head_object() + .bucket(&self.bucket_name) + .key(key_str.as_ref()) + .send() + .await?; + if let Some(size) = head_result.content_length() { + if size > max_size { + return Ok(PartialSourceRowData { + value: Some(SourceValue::NonExistence), + ordinal: Some(Ordinal::unavailable()), + content_version_fp: None, + }); + } + } + } let resp = self .client .get_object() @@ -457,6 +486,7 @@ impl SourceFactoryBase for Factory { prefix: spec.prefix, binary: spec.binary, pattern_matcher: PatternMatcher::new(spec.included_patterns, spec.excluded_patterns)?, + max_file_size: spec.max_file_size, sqs_context: spec.sqs_queue_url.map(|url| { Arc::new(SqsContext { client: aws_sdk_sqs::Client::new(&config),