Skip to content

Commit dbfb457

Browse files
authored
Merge branch 'main' into main
2 parents 6a31005 + 72493e7 commit dbfb457

File tree

7 files changed

+239
-37
lines changed

7 files changed

+239
-37
lines changed

docs/docs/sources/googledrive.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ The spec takes the following fields:
3030
* `root_folder_ids` (`list[str]`): a list of Google Drive folder IDs to import files from.
3131
* `binary` (`bool`, optional): whether reading files as binary (instead of text).
3232
* `recent_changes_poll_interval` (`datetime.timedelta`, optional): when set, this source provides a change capture mechanism by polling Google Drive for recent modified files periodically.
33+
* `included_patterns` (`list[str]`, optional): a list of glob patterns to include files, e.g. `["*.txt", "docs/**/*.md"]`. If not specified, all files will be included.
34+
* `excluded_patterns` (`list[str]`, optional): a list of glob patterns to exclude files, e.g. `["tmp", "**/node_modules"]`. Any file or directory matching these patterns will be excluded even if they match `included_patterns`. If not specified, no files will be excluded.
3335

3436
:::info
3537

python/cocoindex/sources/_engine_builtin_specs.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,15 @@ class GoogleDrive(op.SourceSpec):
3535
service_account_credential_path: str
3636
root_folder_ids: list[str]
3737
binary: bool = False
38+
39+
# If provided, only files matching these patterns will be included.
40+
# See https://docs.rs/globset/latest/globset/index.html#syntax for the syntax of the patterns.
41+
included_patterns: list[str] | None = None
42+
43+
# If provided, files matching these patterns will be excluded.
44+
# See https://docs.rs/globset/latest/globset/index.html#syntax for the syntax of the patterns.
45+
excluded_patterns: list[str] | None = None
46+
3847
recent_changes_poll_interval: datetime.timedelta | None = None
3948
max_file_size: int | None = None
4049

src/builder/analyzer.rs

Lines changed: 134 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -255,14 +255,88 @@ fn try_merge_collector_schemas(
255255
schema1: &CollectorSchema,
256256
schema2: &CollectorSchema,
257257
) -> Result<CollectorSchema> {
258-
let fields = try_merge_fields_schemas(&schema1.fields, &schema2.fields)?;
258+
let schema1_fields = &schema1.fields;
259+
let schema2_fields = &schema2.fields;
260+
261+
// Create a map from field name to index in schema1
262+
let field_map: HashMap<FieldName, usize> = schema1_fields
263+
.iter()
264+
.enumerate()
265+
.map(|(i, f)| (f.name.clone(), i))
266+
.collect();
267+
268+
let mut output_fields = Vec::new();
269+
let mut next_field_id_1 = 0;
270+
let mut next_field_id_2 = 0;
271+
272+
for (idx, field) in schema2_fields.iter().enumerate() {
273+
if let Some(&idx1) = field_map.get(&field.name) {
274+
if idx1 < next_field_id_1 {
275+
api_bail!(
276+
"Common fields are expected to have consistent order across different `collect()` calls, but got different orders between fields '{}' and '{}'",
277+
field.name,
278+
schema1_fields[next_field_id_1 - 1].name
279+
);
280+
}
281+
// Add intervening fields from schema1
282+
for i in next_field_id_1..idx1 {
283+
output_fields.push(schema1_fields[i].clone());
284+
}
285+
// Add intervening fields from schema2
286+
for i in next_field_id_2..idx {
287+
output_fields.push(schema2_fields[i].clone());
288+
}
289+
// Merge the field
290+
let merged_type =
291+
try_make_common_value_type(&schema1_fields[idx1].value_type, &field.value_type)?;
292+
output_fields.push(FieldSchema {
293+
name: field.name.clone(),
294+
value_type: merged_type,
295+
description: None,
296+
});
297+
next_field_id_1 = idx1 + 1;
298+
next_field_id_2 = idx + 1;
299+
// Fields not in schema1 and not UUID are added at the end
300+
}
301+
}
302+
303+
// Add remaining fields from schema1
304+
for i in next_field_id_1..schema1_fields.len() {
305+
output_fields.push(schema1_fields[i].clone());
306+
}
307+
308+
// Add remaining fields from schema2
309+
for i in next_field_id_2..schema2_fields.len() {
310+
output_fields.push(schema2_fields[i].clone());
311+
}
312+
313+
// Handle auto_uuid_field_idx
314+
let auto_uuid_field_idx = match (schema1.auto_uuid_field_idx, schema2.auto_uuid_field_idx) {
315+
(Some(idx1), Some(idx2)) => {
316+
let name1 = &schema1_fields[idx1].name;
317+
let name2 = &schema2_fields[idx2].name;
318+
if name1 == name2 {
319+
// Find the position of the auto_uuid field in the merged output
320+
output_fields.iter().position(|f| &f.name == name1)
321+
} else {
322+
api_bail!(
323+
"Generated UUID fields must have the same name across different `collect()` calls, got different names: '{}' vs '{}'",
324+
name1,
325+
name2
326+
);
327+
}
328+
}
329+
(Some(_), None) | (None, Some(_)) => {
330+
api_bail!(
331+
"The generated UUID field, once present for one `collect()`, must be consistently present for other `collect()` calls for the same collector"
332+
);
333+
}
334+
(None, None) => None,
335+
};
336+
259337
Ok(CollectorSchema {
260-
fields,
261-
auto_uuid_field_idx: if schema1.auto_uuid_field_idx == schema2.auto_uuid_field_idx {
262-
schema1.auto_uuid_field_idx
263-
} else {
264-
None
265-
},
338+
fields: output_fields,
339+
auto_uuid_field_idx,
266340
})
267341
}
268342

@@ -704,11 +778,14 @@ impl AnalyzerContext {
704778
op_scope: &Arc<OpScope>,
705779
reactive_op: &NamedSpec<ReactiveOpSpec>,
706780
) -> Result<BoxFuture<'static, Result<AnalyzedReactiveOp>>> {
707-
let result_fut = match &reactive_op.spec {
781+
let op_scope_clone = op_scope.clone();
782+
let reactive_op_clone = reactive_op.clone();
783+
let reactive_op_name = reactive_op.name.clone();
784+
let result_fut = match reactive_op_clone.spec {
708785
ReactiveOpSpec::Transform(op) => {
709786
let input_field_schemas =
710787
analyze_input_fields(&op.inputs, op_scope).with_context(|| {
711-
format!("Preparing inputs for transform op: {}", reactive_op.name)
788+
format!("Preparing inputs for transform op: {}", reactive_op_name)
712789
})?;
713790
let spec = serde_json::Value::Object(op.op.spec.clone());
714791

@@ -725,8 +802,8 @@ impl AnalyzerContext {
725802
.with(&output_enriched_type.without_attrs())?;
726803
let output_type = output_enriched_type.typ.clone();
727804
let output =
728-
op_scope.add_op_output(reactive_op.name.clone(), output_enriched_type)?;
729-
let op_name = reactive_op.name.clone();
805+
op_scope.add_op_output(reactive_op_name.clone(), output_enriched_type)?;
806+
let op_name = reactive_op_name.clone();
730807
async move {
731808
trace!("Start building executor for transform op `{op_name}`");
732809
let executor = executor.await.with_context(|| {
@@ -777,10 +854,10 @@ impl AnalyzerContext {
777854
.lock()
778855
.unwrap()
779856
.sub_scopes
780-
.insert(reactive_op.name.clone(), Arc::new(sub_op_scope_schema));
857+
.insert(reactive_op_name.clone(), Arc::new(sub_op_scope_schema));
781858
analyzed_op_scope_fut
782859
};
783-
let op_name = reactive_op.name.clone();
860+
let op_name = reactive_op_name.clone();
784861

785862
let concur_control_options =
786863
foreach_op.execution_options.get_concur_control_options();
@@ -800,22 +877,52 @@ impl AnalyzerContext {
800877
}
801878

802879
ReactiveOpSpec::Collect(op) => {
803-
let (struct_mapping, fields_schema) = analyze_struct_mapping(&op.input, op_scope)?;
880+
let (struct_mapping, fields_schema) =
881+
analyze_struct_mapping(&op.input, &op_scope_clone)?;
804882
let has_auto_uuid_field = op.auto_uuid_field.is_some();
805883
let fingerprinter = Fingerprinter::default().with(&fields_schema)?;
806-
let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp {
807-
name: reactive_op.name.clone(),
808-
has_auto_uuid_field,
809-
input: struct_mapping,
810-
collector_ref: add_collector(
811-
&op.scope_name,
812-
op.collector_name.clone(),
813-
CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()),
814-
op_scope,
815-
)?,
816-
fingerprinter,
817-
});
818-
async move { Ok(collect_op) }.boxed()
884+
let input_field_names: Vec<FieldName> =
885+
fields_schema.iter().map(|f| f.name.clone()).collect();
886+
let collector_ref = add_collector(
887+
&op.scope_name,
888+
op.collector_name.clone(),
889+
CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()),
890+
&op_scope_clone,
891+
)?;
892+
async move {
893+
// Get the merged collector schema after adding
894+
let collector_schema: Arc<CollectorSchema> = {
895+
let scope = find_scope(&op.scope_name, &op_scope_clone)?.1;
896+
let states = scope.states.lock().unwrap();
897+
let collector = states.collectors.get(&op.collector_name).unwrap();
898+
collector.schema.clone()
899+
};
900+
901+
// Pre-compute field index mappings for efficient evaluation
902+
let field_name_to_index: HashMap<&FieldName, usize> = input_field_names
903+
.iter()
904+
.enumerate()
905+
.map(|(i, n)| (n, i))
906+
.collect();
907+
let field_index_mapping = collector_schema
908+
.fields
909+
.iter()
910+
.map(|field| field_name_to_index.get(&field.name).copied())
911+
.collect::<Vec<Option<usize>>>();
912+
913+
let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp {
914+
name: reactive_op_name,
915+
has_auto_uuid_field,
916+
input: struct_mapping,
917+
input_field_names,
918+
collector_schema,
919+
collector_ref,
920+
field_index_mapping,
921+
fingerprinter,
922+
});
923+
Ok(collect_op)
924+
}
925+
.boxed()
819926
}
820927
};
821928
Ok(result_fut)

src/builder/plan.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::base::schema::FieldSchema;
2+
use crate::base::spec::FieldName;
23
use crate::prelude::*;
34

45
use crate::ops::interface::*;
@@ -90,7 +91,11 @@ pub struct AnalyzedCollectOp {
9091
pub name: String,
9192
pub has_auto_uuid_field: bool,
9293
pub input: AnalyzedStructMapping,
94+
pub input_field_names: Vec<FieldName>,
95+
pub collector_schema: Arc<schema::CollectorSchema>,
9396
pub collector_ref: AnalyzedCollectorReference,
97+
/// Pre-computed mapping from collector field index to input field index.
98+
pub field_index_mapping: Vec<Option<usize>>,
9499
/// Fingerprinter of the collector's schema. Used to decide when to reuse auto-generated UUIDs.
95100
pub fingerprinter: Fingerprinter,
96101
}

src/execution/evaluator.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,43 @@ async fn evaluate_op_scope(
515515
let collector_entry = scoped_entries
516516
.headn(op.collector_ref.scope_up_level as usize)
517517
.ok_or_else(|| anyhow::anyhow!("Collector level out of bound"))?;
518+
519+
// Assemble input values
520+
let input_values: Vec<value::Value> =
521+
assemble_input_values(&op.input.fields, scoped_entries)
522+
.collect::<Result<Vec<_>>>()?;
523+
524+
// Create field_values vector for all fields in the merged schema
525+
let mut field_values = op
526+
.field_index_mapping
527+
.iter()
528+
.map(|idx| {
529+
idx.map_or(value::Value::Null, |input_idx| {
530+
input_values[input_idx].clone()
531+
})
532+
})
533+
.collect::<Vec<_>>();
534+
535+
// Handle auto_uuid_field (assumed to be at position 0 for efficiency)
536+
if op.has_auto_uuid_field {
537+
if let Some(uuid_idx) = op.collector_schema.auto_uuid_field_idx {
538+
let uuid = memory.next_uuid(
539+
op.fingerprinter
540+
.clone()
541+
.with(
542+
&field_values
543+
.iter()
544+
.enumerate()
545+
.filter(|(i, _)| *i != uuid_idx)
546+
.map(|(_, v)| v)
547+
.collect::<Vec<_>>(),
548+
)?
549+
.into_fingerprint(),
550+
)?;
551+
field_values[uuid_idx] = value::Value::Basic(value::BasicValue::Uuid(uuid));
552+
}
553+
}
554+
518555
{
519556
let mut collected_records = collector_entry.collected_values
520557
[op.collector_ref.local.collector_idx as usize]

src/ops/sources/google_drive.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use super::shared::pattern_matcher::PatternMatcher;
12
use chrono::Duration;
23
use google_drive3::{
34
DriveHub,
@@ -60,6 +61,8 @@ pub struct Spec {
6061
root_folder_ids: Vec<String>,
6162
recent_changes_poll_interval: Option<std::time::Duration>,
6263
max_file_size: Option<i64>,
64+
included_patterns: Option<Vec<String>>,
65+
excluded_patterns: Option<Vec<String>>,
6366
}
6467

6568
struct Executor {
@@ -68,6 +71,7 @@ struct Executor {
6871
root_folder_ids: IndexSet<Arc<str>>,
6972
recent_updates_poll_interval: Option<std::time::Duration>,
7073
max_file_size: Option<i64>,
74+
pattern_matcher: PatternMatcher,
7175
}
7276

7377
impl Executor {
@@ -95,6 +99,7 @@ impl Executor {
9599
root_folder_ids: spec.root_folder_ids.into_iter().map(Arc::from).collect(),
96100
recent_updates_poll_interval: spec.recent_changes_poll_interval,
97101
max_file_size: spec.max_file_size,
102+
pattern_matcher: PatternMatcher::new(spec.included_patterns, spec.excluded_patterns)?,
98103
})
99104
}
100105
}
@@ -314,6 +319,9 @@ impl SourceExecutor for Executor {
314319
.list_files(&folder_id, &fields, &mut next_page_token)
315320
.await?;
316321
for file in files {
322+
if !file.name.as_deref().is_some_and(|name| self.pattern_matcher.is_file_included(name)){
323+
continue
324+
}
317325
if let Some(max_size) = self.max_file_size
318326
&& let Some(file_size) = file.size
319327
&& file_size > max_size {
@@ -365,17 +373,21 @@ impl SourceExecutor for Executor {
365373
});
366374
}
367375
};
368-
// Check file size limit
369-
if let Some(max_size) = self.max_file_size
370-
&& let Some(file_size) = file.size
371-
&& file_size > max_size
376+
if !file
377+
.name
378+
.as_deref()
379+
.is_some_and(|name| self.pattern_matcher.is_file_included(name))
372380
{
373381
return Ok(PartialSourceRowData {
374382
value: Some(SourceValue::NonExistence),
375383
ordinal: Some(Ordinal::unavailable()),
376384
content_version_fp: None,
377385
});
378386
}
387+
// Check file size limit
388+
if let Some(max_size) = self.max_file_size
389+
&& let Some(file_size) = file.size
390+
&& file_size > max_size
379391
let ordinal = if options.include_ordinal {
380392
file.modified_time.map(|t| t.try_into()).transpose()?
381393
} else {

0 commit comments

Comments
 (0)