diff --git a/src/ops/sources/amazon_s3.rs b/src/ops/sources/amazon_s3.rs index 029c0df93..4b3422ae7 100644 --- a/src/ops/sources/amazon_s3.rs +++ b/src/ops/sources/amazon_s3.rs @@ -2,10 +2,10 @@ use crate::fields_value; use async_stream::try_stream; use aws_config::BehaviorVersion; use aws_sdk_s3::Client; -use globset::{Glob, GlobSet, GlobSetBuilder}; use std::sync::Arc; use urlencoding; +use super::shared::pattern_matcher::PatternMatcher; use crate::base::field_attrs; use crate::ops::sdk::*; @@ -51,26 +51,10 @@ struct Executor { bucket_name: String, prefix: Option, binary: bool, - included_glob_set: Option, - excluded_glob_set: Option, + pattern_matcher: PatternMatcher, sqs_context: Option>, } -impl Executor { - fn is_excluded(&self, key: &str) -> bool { - self.excluded_glob_set - .as_ref() - .is_some_and(|glob_set| glob_set.is_match(key)) - } - - fn is_file_included(&self, key: &str) -> bool { - self.included_glob_set - .as_ref() - .is_none_or(|glob_set| glob_set.is_match(key)) - && !self.is_excluded(key) - } -} - fn datetime_to_ordinal(dt: &aws_sdk_s3::primitives::DateTime) -> Ordinal { Ordinal(Some((dt.as_nanos() / 1000) as i64)) } @@ -100,15 +84,7 @@ impl SourceExecutor for Executor { if let Some(key) = obj.key() { // Only include files (not folders) if key.ends_with('/') { continue; } - let include = self.included_glob_set - .as_ref() - .map(|gs| gs.is_match(key)) - .unwrap_or(true); - let exclude = self.excluded_glob_set - .as_ref() - .map(|gs| gs.is_match(key)) - .unwrap_or(false); - if include && !exclude { + if self.pattern_matcher.is_file_included(key) { batch.push(PartialSourceRowMetadata { key: KeyValue::Str(key.to_string().into()), key_aux_info: serde_json::Value::Null, @@ -137,7 +113,7 @@ impl SourceExecutor for Executor { options: &SourceExecutorGetOptions, ) -> Result { let key_str = key.str_value()?; - if !self.is_file_included(key_str) { + if !self.pattern_matcher.is_file_included(key_str) { return Ok(PartialSourceRowData { value: Some(SourceValue::NonExistence), ordinal: Some(Ordinal::unavailable()), @@ -349,8 +325,7 @@ impl SourceFactoryBase for Factory { bucket_name: spec.bucket_name, prefix: spec.prefix, binary: spec.binary, - included_glob_set: spec.included_patterns.map(build_glob_set).transpose()?, - excluded_glob_set: spec.excluded_patterns.map(build_glob_set).transpose()?, + pattern_matcher: PatternMatcher::new(spec.included_patterns, spec.excluded_patterns)?, sqs_context: spec.sqs_queue_url.map(|url| { Arc::new(SqsContext { client: aws_sdk_sqs::Client::new(&config), @@ -360,11 +335,3 @@ impl SourceFactoryBase for Factory { })) } } - -fn build_glob_set(patterns: Vec) -> Result { - let mut builder = GlobSetBuilder::new(); - for pattern in patterns { - builder.add(Glob::new(pattern.as_str())?); - } - Ok(builder.build()?) -} diff --git a/src/ops/sources/azure_blob.rs b/src/ops/sources/azure_blob.rs index 19cdf0819..f5320468a 100644 --- a/src/ops/sources/azure_blob.rs +++ b/src/ops/sources/azure_blob.rs @@ -5,9 +5,9 @@ use azure_identity::{DefaultAzureCredential, TokenCredentialOptions}; use azure_storage::StorageCredentials; use azure_storage_blobs::prelude::*; use futures::StreamExt; -use globset::{Glob, GlobSet, GlobSetBuilder}; use std::sync::Arc; +use super::shared::pattern_matcher::PatternMatcher; use crate::base::field_attrs; use crate::ops::sdk::*; @@ -31,23 +31,7 @@ struct Executor { container_name: String, prefix: Option, binary: bool, - included_glob_set: Option, - excluded_glob_set: Option, -} - -impl Executor { - fn is_excluded(&self, key: &str) -> bool { - self.excluded_glob_set - .as_ref() - .is_some_and(|glob_set| glob_set.is_match(key)) - } - - fn is_file_included(&self, key: &str) -> bool { - self.included_glob_set - .as_ref() - .is_none_or(|glob_set| glob_set.is_match(key)) - && !self.is_excluded(key) - } + pattern_matcher: PatternMatcher, } fn datetime_to_ordinal(dt: &time::OffsetDateTime) -> Ordinal { @@ -89,7 +73,7 @@ impl SourceExecutor for Executor { // Only include files (not directories) if key.ends_with('/') { continue; } - if self.is_file_included(key) { + if self.pattern_matcher.is_file_included(key) { let ordinal = Some(datetime_to_ordinal(&blob.properties.last_modified)); batch.push(PartialSourceRowMetadata { key: KeyValue::Str(key.clone().into()), @@ -119,7 +103,7 @@ impl SourceExecutor for Executor { options: &SourceExecutorGetOptions, ) -> Result { let key_str = key.str_value()?; - if !self.is_file_included(key_str) { + if !self.pattern_matcher.is_file_included(key_str) { return Ok(PartialSourceRowData { value: Some(SourceValue::NonExistence), ordinal: Some(Ordinal::unavailable()), @@ -237,16 +221,7 @@ impl SourceFactoryBase for Factory { container_name: spec.container_name, prefix: spec.prefix, binary: spec.binary, - included_glob_set: spec.included_patterns.map(build_glob_set).transpose()?, - excluded_glob_set: spec.excluded_patterns.map(build_glob_set).transpose()?, + pattern_matcher: PatternMatcher::new(spec.included_patterns, spec.excluded_patterns)?, })) } } - -fn build_glob_set(patterns: Vec) -> Result { - let mut builder = GlobSetBuilder::new(); - for pattern in patterns { - builder.add(Glob::new(pattern.as_str())?); - } - Ok(builder.build()?) -} diff --git a/src/ops/sources/local_file.rs b/src/ops/sources/local_file.rs index 9620eebaa..05f457c07 100644 --- a/src/ops/sources/local_file.rs +++ b/src/ops/sources/local_file.rs @@ -1,10 +1,9 @@ use async_stream::try_stream; -use globset::{Glob, GlobSet, GlobSetBuilder}; use log::warn; use std::borrow::Cow; -use std::path::Path; use std::{path::PathBuf, sync::Arc}; +use super::shared::pattern_matcher::PatternMatcher; use crate::base::field_attrs; use crate::{fields_value, ops::sdk::*}; @@ -19,23 +18,7 @@ pub struct Spec { struct Executor { root_path: PathBuf, binary: bool, - included_glob_set: Option, - excluded_glob_set: Option, -} - -impl Executor { - fn is_excluded(&self, path: impl AsRef + Copy) -> bool { - self.excluded_glob_set - .as_ref() - .is_some_and(|glob_set| glob_set.is_match(path)) - } - - fn is_file_included(&self, path: impl AsRef + Copy) -> bool { - self.included_glob_set - .as_ref() - .is_none_or(|glob_set| glob_set.is_match(path)) - && !self.is_excluded(path) - } + pattern_matcher: PatternMatcher, } #[async_trait] @@ -57,26 +40,25 @@ impl SourceExecutor for Executor { for _ in 0..root_component_size { path_components.next(); } - let relative_path = path_components.as_path(); + let Some(relative_path) = path_components.as_path().to_str() else { + warn!("Skipped ill-formed file path: {}", path.display()); + continue; + }; if path.is_dir() { - if !self.is_excluded(relative_path) { + if !self.pattern_matcher.is_excluded(relative_path) { new_dirs.push(Cow::Owned(path)); } - } else if self.is_file_included(relative_path) { + } else if self.pattern_matcher.is_file_included(relative_path) { let ordinal: Option = if options.include_ordinal { Some(path.metadata()?.modified()?.try_into()?) } else { None }; - if let Some(relative_path) = relative_path.to_str() { - yield vec![PartialSourceRowMetadata { - key: KeyValue::Str(relative_path.into()), - key_aux_info: serde_json::Value::Null, - ordinal, - }]; - } else { - warn!("Skipped ill-formed file path: {}", path.display()); - } + yield vec![PartialSourceRowMetadata { + key: KeyValue::Str(relative_path.into()), + key_aux_info: serde_json::Value::Null, + ordinal, + }]; } } dirs.extend(new_dirs.drain(..).rev()); @@ -91,7 +73,10 @@ impl SourceExecutor for Executor { _key_aux_info: &serde_json::Value, options: &SourceExecutorGetOptions, ) -> Result { - if !self.is_file_included(key.str_value()?.as_ref()) { + if !self + .pattern_matcher + .is_file_included(key.str_value()?.as_ref()) + { return Ok(PartialSourceRowData { value: Some(SourceValue::NonExistence), ordinal: Some(Ordinal::unavailable()), @@ -173,16 +158,7 @@ impl SourceFactoryBase for Factory { Ok(Box::new(Executor { root_path: PathBuf::from(spec.path), binary: spec.binary, - included_glob_set: spec.included_patterns.map(build_glob_set).transpose()?, - excluded_glob_set: spec.excluded_patterns.map(build_glob_set).transpose()?, + pattern_matcher: PatternMatcher::new(spec.included_patterns, spec.excluded_patterns)?, })) } } - -fn build_glob_set(patterns: Vec) -> Result { - let mut builder = GlobSetBuilder::new(); - for pattern in patterns { - builder.add(Glob::new(pattern.as_str())?); - } - Ok(builder.build()?) -} diff --git a/src/ops/sources/mod.rs b/src/ops/sources/mod.rs index a7725aaf4..88d63d55f 100644 --- a/src/ops/sources/mod.rs +++ b/src/ops/sources/mod.rs @@ -1,3 +1,5 @@ +pub mod shared; + pub mod amazon_s3; pub mod azure_blob; pub mod google_drive; diff --git a/src/ops/sources/shared/mod.rs b/src/ops/sources/shared/mod.rs new file mode 100644 index 000000000..9440e4f2d --- /dev/null +++ b/src/ops/sources/shared/mod.rs @@ -0,0 +1 @@ +pub mod pattern_matcher; diff --git a/src/ops/sources/shared/pattern_matcher.rs b/src/ops/sources/shared/pattern_matcher.rs new file mode 100644 index 000000000..60ed6f98b --- /dev/null +++ b/src/ops/sources/shared/pattern_matcher.rs @@ -0,0 +1,101 @@ +use crate::ops::sdk::*; +use globset::{Glob, GlobSet, GlobSetBuilder}; + +/// Builds a GlobSet from a vector of pattern strings +fn build_glob_set(patterns: Vec) -> Result { + let mut builder = GlobSetBuilder::new(); + for pattern in patterns { + builder.add(Glob::new(pattern.as_str())?); + } + Ok(builder.build()?) +} + +/// Pattern matcher that handles include and exclude patterns for files +#[derive(Debug)] +pub struct PatternMatcher { + /// Patterns matching full path of files to be included. + included_glob_set: Option, + /// Patterns matching full path of files and directories to be excluded. + /// If a directory is excluded, all files and subdirectories within it are also excluded. + excluded_glob_set: Option, +} + +impl PatternMatcher { + /// Create a new PatternMatcher from optional include and exclude pattern vectors + pub fn new( + included_patterns: Option>, + excluded_patterns: Option>, + ) -> Result { + let included_glob_set = included_patterns.map(build_glob_set).transpose()?; + let excluded_glob_set = excluded_patterns.map(build_glob_set).transpose()?; + + Ok(Self { + included_glob_set, + excluded_glob_set, + }) + } + + /// Check if a file or directory is excluded by the exclude patterns + /// Can be called on directories to prune traversal on excluded directories. + pub fn is_excluded(&self, path: &str) -> bool { + self.excluded_glob_set + .as_ref() + .is_some_and(|glob_set| glob_set.is_match(path)) + } + + /// Check if a file should be included based on both include and exclude patterns + /// Should be called for each file. + pub fn is_file_included(&self, path: &str) -> bool { + self.included_glob_set + .as_ref() + .is_none_or(|glob_set| glob_set.is_match(path)) + && !self.is_excluded(path) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_pattern_matcher_no_patterns() { + let matcher = PatternMatcher::new(None, None).unwrap(); + assert!(matcher.is_file_included("test.txt")); + assert!(matcher.is_file_included("path/to/file.rs")); + assert!(!matcher.is_excluded("anything")); + } + + #[test] + fn test_pattern_matcher_include_only() { + let matcher = + PatternMatcher::new(Some(vec!["*.txt".to_string(), "*.rs".to_string()]), None).unwrap(); + + assert!(matcher.is_file_included("test.txt")); + assert!(matcher.is_file_included("main.rs")); + assert!(!matcher.is_file_included("image.png")); + } + + #[test] + fn test_pattern_matcher_exclude_only() { + let matcher = + PatternMatcher::new(None, Some(vec!["*.tmp".to_string(), "*.log".to_string()])) + .unwrap(); + + assert!(matcher.is_file_included("test.txt")); + assert!(!matcher.is_file_included("temp.tmp")); + assert!(!matcher.is_file_included("debug.log")); + } + + #[test] + fn test_pattern_matcher_both_patterns() { + let matcher = PatternMatcher::new( + Some(vec!["*.txt".to_string()]), + Some(vec!["*temp*".to_string()]), + ) + .unwrap(); + + assert!(matcher.is_file_included("test.txt")); + assert!(!matcher.is_file_included("temp.txt")); // excluded despite matching include + assert!(!matcher.is_file_included("main.rs")); // doesn't match include + } +}