Skip to content

Commit 967148f

Browse files
authored
refactor: extract common logic to deal with include/exclude patterns (#862)
1 parent 981a86f commit 967148f

File tree

6 files changed

+132
-110
lines changed

6 files changed

+132
-110
lines changed

src/ops/sources/amazon_s3.rs

Lines changed: 5 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ use crate::fields_value;
22
use async_stream::try_stream;
33
use aws_config::BehaviorVersion;
44
use aws_sdk_s3::Client;
5-
use globset::{Glob, GlobSet, GlobSetBuilder};
65
use std::sync::Arc;
76
use urlencoding;
87

8+
use super::shared::pattern_matcher::PatternMatcher;
99
use crate::base::field_attrs;
1010
use crate::ops::sdk::*;
1111

@@ -51,26 +51,10 @@ struct Executor {
5151
bucket_name: String,
5252
prefix: Option<String>,
5353
binary: bool,
54-
included_glob_set: Option<GlobSet>,
55-
excluded_glob_set: Option<GlobSet>,
54+
pattern_matcher: PatternMatcher,
5655
sqs_context: Option<Arc<SqsContext>>,
5756
}
5857

59-
impl Executor {
60-
fn is_excluded(&self, key: &str) -> bool {
61-
self.excluded_glob_set
62-
.as_ref()
63-
.is_some_and(|glob_set| glob_set.is_match(key))
64-
}
65-
66-
fn is_file_included(&self, key: &str) -> bool {
67-
self.included_glob_set
68-
.as_ref()
69-
.is_none_or(|glob_set| glob_set.is_match(key))
70-
&& !self.is_excluded(key)
71-
}
72-
}
73-
7458
fn datetime_to_ordinal(dt: &aws_sdk_s3::primitives::DateTime) -> Ordinal {
7559
Ordinal(Some((dt.as_nanos() / 1000) as i64))
7660
}
@@ -100,15 +84,7 @@ impl SourceExecutor for Executor {
10084
if let Some(key) = obj.key() {
10185
// Only include files (not folders)
10286
if key.ends_with('/') { continue; }
103-
let include = self.included_glob_set
104-
.as_ref()
105-
.map(|gs| gs.is_match(key))
106-
.unwrap_or(true);
107-
let exclude = self.excluded_glob_set
108-
.as_ref()
109-
.map(|gs| gs.is_match(key))
110-
.unwrap_or(false);
111-
if include && !exclude {
87+
if self.pattern_matcher.is_file_included(key) {
11288
batch.push(PartialSourceRowMetadata {
11389
key: KeyValue::Str(key.to_string().into()),
11490
key_aux_info: serde_json::Value::Null,
@@ -137,7 +113,7 @@ impl SourceExecutor for Executor {
137113
options: &SourceExecutorGetOptions,
138114
) -> Result<PartialSourceRowData> {
139115
let key_str = key.str_value()?;
140-
if !self.is_file_included(key_str) {
116+
if !self.pattern_matcher.is_file_included(key_str) {
141117
return Ok(PartialSourceRowData {
142118
value: Some(SourceValue::NonExistence),
143119
ordinal: Some(Ordinal::unavailable()),
@@ -349,8 +325,7 @@ impl SourceFactoryBase for Factory {
349325
bucket_name: spec.bucket_name,
350326
prefix: spec.prefix,
351327
binary: spec.binary,
352-
included_glob_set: spec.included_patterns.map(build_glob_set).transpose()?,
353-
excluded_glob_set: spec.excluded_patterns.map(build_glob_set).transpose()?,
328+
pattern_matcher: PatternMatcher::new(spec.included_patterns, spec.excluded_patterns)?,
354329
sqs_context: spec.sqs_queue_url.map(|url| {
355330
Arc::new(SqsContext {
356331
client: aws_sdk_sqs::Client::new(&config),
@@ -360,11 +335,3 @@ impl SourceFactoryBase for Factory {
360335
}))
361336
}
362337
}
363-
364-
fn build_glob_set(patterns: Vec<String>) -> Result<GlobSet> {
365-
let mut builder = GlobSetBuilder::new();
366-
for pattern in patterns {
367-
builder.add(Glob::new(pattern.as_str())?);
368-
}
369-
Ok(builder.build()?)
370-
}

src/ops/sources/azure_blob.rs

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ use azure_identity::{DefaultAzureCredential, TokenCredentialOptions};
55
use azure_storage::StorageCredentials;
66
use azure_storage_blobs::prelude::*;
77
use futures::StreamExt;
8-
use globset::{Glob, GlobSet, GlobSetBuilder};
98
use std::sync::Arc;
109

10+
use super::shared::pattern_matcher::PatternMatcher;
1111
use crate::base::field_attrs;
1212
use crate::ops::sdk::*;
1313

@@ -31,23 +31,7 @@ struct Executor {
3131
container_name: String,
3232
prefix: Option<String>,
3333
binary: bool,
34-
included_glob_set: Option<GlobSet>,
35-
excluded_glob_set: Option<GlobSet>,
36-
}
37-
38-
impl Executor {
39-
fn is_excluded(&self, key: &str) -> bool {
40-
self.excluded_glob_set
41-
.as_ref()
42-
.is_some_and(|glob_set| glob_set.is_match(key))
43-
}
44-
45-
fn is_file_included(&self, key: &str) -> bool {
46-
self.included_glob_set
47-
.as_ref()
48-
.is_none_or(|glob_set| glob_set.is_match(key))
49-
&& !self.is_excluded(key)
50-
}
34+
pattern_matcher: PatternMatcher,
5135
}
5236

5337
fn datetime_to_ordinal(dt: &time::OffsetDateTime) -> Ordinal {
@@ -89,7 +73,7 @@ impl SourceExecutor for Executor {
8973
// Only include files (not directories)
9074
if key.ends_with('/') { continue; }
9175

92-
if self.is_file_included(key) {
76+
if self.pattern_matcher.is_file_included(key) {
9377
let ordinal = Some(datetime_to_ordinal(&blob.properties.last_modified));
9478
batch.push(PartialSourceRowMetadata {
9579
key: KeyValue::Str(key.clone().into()),
@@ -119,7 +103,7 @@ impl SourceExecutor for Executor {
119103
options: &SourceExecutorGetOptions,
120104
) -> Result<PartialSourceRowData> {
121105
let key_str = key.str_value()?;
122-
if !self.is_file_included(key_str) {
106+
if !self.pattern_matcher.is_file_included(key_str) {
123107
return Ok(PartialSourceRowData {
124108
value: Some(SourceValue::NonExistence),
125109
ordinal: Some(Ordinal::unavailable()),
@@ -237,16 +221,7 @@ impl SourceFactoryBase for Factory {
237221
container_name: spec.container_name,
238222
prefix: spec.prefix,
239223
binary: spec.binary,
240-
included_glob_set: spec.included_patterns.map(build_glob_set).transpose()?,
241-
excluded_glob_set: spec.excluded_patterns.map(build_glob_set).transpose()?,
224+
pattern_matcher: PatternMatcher::new(spec.included_patterns, spec.excluded_patterns)?,
242225
}))
243226
}
244227
}
245-
246-
fn build_glob_set(patterns: Vec<String>) -> Result<GlobSet> {
247-
let mut builder = GlobSetBuilder::new();
248-
for pattern in patterns {
249-
builder.add(Glob::new(pattern.as_str())?);
250-
}
251-
Ok(builder.build()?)
252-
}

src/ops/sources/local_file.rs

Lines changed: 18 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use async_stream::try_stream;
2-
use globset::{Glob, GlobSet, GlobSetBuilder};
32
use log::warn;
43
use std::borrow::Cow;
5-
use std::path::Path;
64
use std::{path::PathBuf, sync::Arc};
75

6+
use super::shared::pattern_matcher::PatternMatcher;
87
use crate::base::field_attrs;
98
use crate::{fields_value, ops::sdk::*};
109

@@ -19,23 +18,7 @@ pub struct Spec {
1918
struct Executor {
2019
root_path: PathBuf,
2120
binary: bool,
22-
included_glob_set: Option<GlobSet>,
23-
excluded_glob_set: Option<GlobSet>,
24-
}
25-
26-
impl Executor {
27-
fn is_excluded(&self, path: impl AsRef<Path> + Copy) -> bool {
28-
self.excluded_glob_set
29-
.as_ref()
30-
.is_some_and(|glob_set| glob_set.is_match(path))
31-
}
32-
33-
fn is_file_included(&self, path: impl AsRef<Path> + Copy) -> bool {
34-
self.included_glob_set
35-
.as_ref()
36-
.is_none_or(|glob_set| glob_set.is_match(path))
37-
&& !self.is_excluded(path)
38-
}
21+
pattern_matcher: PatternMatcher,
3922
}
4023

4124
#[async_trait]
@@ -57,26 +40,25 @@ impl SourceExecutor for Executor {
5740
for _ in 0..root_component_size {
5841
path_components.next();
5942
}
60-
let relative_path = path_components.as_path();
43+
let Some(relative_path) = path_components.as_path().to_str() else {
44+
warn!("Skipped ill-formed file path: {}", path.display());
45+
continue;
46+
};
6147
if path.is_dir() {
62-
if !self.is_excluded(relative_path) {
48+
if !self.pattern_matcher.is_excluded(relative_path) {
6349
new_dirs.push(Cow::Owned(path));
6450
}
65-
} else if self.is_file_included(relative_path) {
51+
} else if self.pattern_matcher.is_file_included(relative_path) {
6652
let ordinal: Option<Ordinal> = if options.include_ordinal {
6753
Some(path.metadata()?.modified()?.try_into()?)
6854
} else {
6955
None
7056
};
71-
if let Some(relative_path) = relative_path.to_str() {
72-
yield vec![PartialSourceRowMetadata {
73-
key: KeyValue::Str(relative_path.into()),
74-
key_aux_info: serde_json::Value::Null,
75-
ordinal,
76-
}];
77-
} else {
78-
warn!("Skipped ill-formed file path: {}", path.display());
79-
}
57+
yield vec![PartialSourceRowMetadata {
58+
key: KeyValue::Str(relative_path.into()),
59+
key_aux_info: serde_json::Value::Null,
60+
ordinal,
61+
}];
8062
}
8163
}
8264
dirs.extend(new_dirs.drain(..).rev());
@@ -91,7 +73,10 @@ impl SourceExecutor for Executor {
9173
_key_aux_info: &serde_json::Value,
9274
options: &SourceExecutorGetOptions,
9375
) -> Result<PartialSourceRowData> {
94-
if !self.is_file_included(key.str_value()?.as_ref()) {
76+
if !self
77+
.pattern_matcher
78+
.is_file_included(key.str_value()?.as_ref())
79+
{
9580
return Ok(PartialSourceRowData {
9681
value: Some(SourceValue::NonExistence),
9782
ordinal: Some(Ordinal::unavailable()),
@@ -173,16 +158,7 @@ impl SourceFactoryBase for Factory {
173158
Ok(Box::new(Executor {
174159
root_path: PathBuf::from(spec.path),
175160
binary: spec.binary,
176-
included_glob_set: spec.included_patterns.map(build_glob_set).transpose()?,
177-
excluded_glob_set: spec.excluded_patterns.map(build_glob_set).transpose()?,
161+
pattern_matcher: PatternMatcher::new(spec.included_patterns, spec.excluded_patterns)?,
178162
}))
179163
}
180164
}
181-
182-
fn build_glob_set(patterns: Vec<String>) -> Result<GlobSet> {
183-
let mut builder = GlobSetBuilder::new();
184-
for pattern in patterns {
185-
builder.add(Glob::new(pattern.as_str())?);
186-
}
187-
Ok(builder.build()?)
188-
}

src/ops/sources/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
pub mod shared;
2+
13
pub mod amazon_s3;
24
pub mod azure_blob;
35
pub mod google_drive;

src/ops/sources/shared/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod pattern_matcher;
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use crate::ops::sdk::*;
2+
use globset::{Glob, GlobSet, GlobSetBuilder};
3+
4+
/// Builds a GlobSet from a vector of pattern strings
5+
fn build_glob_set(patterns: Vec<String>) -> Result<GlobSet> {
6+
let mut builder = GlobSetBuilder::new();
7+
for pattern in patterns {
8+
builder.add(Glob::new(pattern.as_str())?);
9+
}
10+
Ok(builder.build()?)
11+
}
12+
13+
/// Pattern matcher that handles include and exclude patterns for files
14+
#[derive(Debug)]
15+
pub struct PatternMatcher {
16+
/// Patterns matching full path of files to be included.
17+
included_glob_set: Option<GlobSet>,
18+
/// Patterns matching full path of files and directories to be excluded.
19+
/// If a directory is excluded, all files and subdirectories within it are also excluded.
20+
excluded_glob_set: Option<GlobSet>,
21+
}
22+
23+
impl PatternMatcher {
24+
/// Create a new PatternMatcher from optional include and exclude pattern vectors
25+
pub fn new(
26+
included_patterns: Option<Vec<String>>,
27+
excluded_patterns: Option<Vec<String>>,
28+
) -> Result<Self> {
29+
let included_glob_set = included_patterns.map(build_glob_set).transpose()?;
30+
let excluded_glob_set = excluded_patterns.map(build_glob_set).transpose()?;
31+
32+
Ok(Self {
33+
included_glob_set,
34+
excluded_glob_set,
35+
})
36+
}
37+
38+
/// Check if a file or directory is excluded by the exclude patterns
39+
/// Can be called on directories to prune traversal on excluded directories.
40+
pub fn is_excluded(&self, path: &str) -> bool {
41+
self.excluded_glob_set
42+
.as_ref()
43+
.is_some_and(|glob_set| glob_set.is_match(path))
44+
}
45+
46+
/// Check if a file should be included based on both include and exclude patterns
47+
/// Should be called for each file.
48+
pub fn is_file_included(&self, path: &str) -> bool {
49+
self.included_glob_set
50+
.as_ref()
51+
.is_none_or(|glob_set| glob_set.is_match(path))
52+
&& !self.is_excluded(path)
53+
}
54+
}
55+
56+
#[cfg(test)]
57+
mod tests {
58+
use super::*;
59+
60+
#[test]
61+
fn test_pattern_matcher_no_patterns() {
62+
let matcher = PatternMatcher::new(None, None).unwrap();
63+
assert!(matcher.is_file_included("test.txt"));
64+
assert!(matcher.is_file_included("path/to/file.rs"));
65+
assert!(!matcher.is_excluded("anything"));
66+
}
67+
68+
#[test]
69+
fn test_pattern_matcher_include_only() {
70+
let matcher =
71+
PatternMatcher::new(Some(vec!["*.txt".to_string(), "*.rs".to_string()]), None).unwrap();
72+
73+
assert!(matcher.is_file_included("test.txt"));
74+
assert!(matcher.is_file_included("main.rs"));
75+
assert!(!matcher.is_file_included("image.png"));
76+
}
77+
78+
#[test]
79+
fn test_pattern_matcher_exclude_only() {
80+
let matcher =
81+
PatternMatcher::new(None, Some(vec!["*.tmp".to_string(), "*.log".to_string()]))
82+
.unwrap();
83+
84+
assert!(matcher.is_file_included("test.txt"));
85+
assert!(!matcher.is_file_included("temp.tmp"));
86+
assert!(!matcher.is_file_included("debug.log"));
87+
}
88+
89+
#[test]
90+
fn test_pattern_matcher_both_patterns() {
91+
let matcher = PatternMatcher::new(
92+
Some(vec!["*.txt".to_string()]),
93+
Some(vec!["*temp*".to_string()]),
94+
)
95+
.unwrap();
96+
97+
assert!(matcher.is_file_included("test.txt"));
98+
assert!(!matcher.is_file_included("temp.txt")); // excluded despite matching include
99+
assert!(!matcher.is_file_included("main.rs")); // doesn't match include
100+
}
101+
}

0 commit comments

Comments
 (0)