Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 7 additions & 16 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,25 +899,16 @@ impl AnalyzerContext {
};

// Pre-compute field index mappings for efficient evaluation
let field_name_to_index: HashMap<&FieldName, usize> = collector_schema
.fields
let field_name_to_index: HashMap<&FieldName, usize> = input_field_names
.iter()
.enumerate()
.map(|(i, f)| (&f.name, i))
.map(|(i, n)| (n, i))
.collect();
let mut field_index_mapping: HashMap<usize, usize> = HashMap::new();
for (input_idx, field_name) in input_field_names.iter().enumerate() {
let collector_idx = field_name_to_index
.get(field_name)
.copied()
.ok_or_else(|| {
anyhow!(
"field `{}` not found in merged collector schema",
field_name
)
})?;
field_index_mapping.insert(collector_idx, input_idx);
}
let field_index_mapping = collector_schema
.fields
.iter()
.map(|field| field_name_to_index.get(&field.name).copied())
.collect::<Vec<Option<usize>>>();

let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp {
name: reactive_op_name,
Expand Down
4 changes: 1 addition & 3 deletions src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use crate::base::schema::FieldSchema;
use crate::base::spec::FieldName;
use crate::prelude::*;

use std::collections::HashMap;

use crate::ops::interface::*;
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};

Expand Down Expand Up @@ -97,7 +95,7 @@ pub struct AnalyzedCollectOp {
pub collector_schema: Arc<schema::CollectorSchema>,
pub collector_ref: AnalyzedCollectorReference,
/// Pre-computed mapping from collector field index to input field index.
pub field_index_mapping: HashMap<usize, usize>,
pub field_index_mapping: Vec<Option<usize>>,
/// Fingerprinter of the collector's schema. Used to decide when to reuse auto-generated UUIDs.
pub fingerprinter: Fingerprinter,
}
Expand Down
16 changes: 9 additions & 7 deletions src/execution/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,15 @@ async fn evaluate_op_scope(
.collect::<Result<Vec<_>>>()?;

// Create field_values vector for all fields in the merged schema
let mut field_values: Vec<value::Value> =
vec![value::Value::Null; op.collector_schema.fields.len()];

// Use pre-computed field index mappings for O(1) field placement
for (&collector_idx, &input_idx) in op.field_index_mapping.iter() {
field_values[collector_idx] = input_values[input_idx].clone();
}
let mut field_values = op
.field_index_mapping
.iter()
.map(|idx| {
idx.map_or(value::Value::Null, |input_idx| {
input_values[input_idx].clone()
})
})
.collect::<Vec<_>>();

// Handle auto_uuid_field (assumed to be at position 0 for efficiency)
if op.has_auto_uuid_field {
Expand Down
Loading