From 383778397e4ec3c712f326170c9e745fc0fd1065 Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Thu, 6 Nov 2025 22:39:09 -0800 Subject: [PATCH] chore: simplification for new collector logic --- src/builder/analyzer.rs | 23 +++++++---------------- src/builder/plan.rs | 4 +--- src/execution/evaluator.rs | 16 +++++++++------- 3 files changed, 17 insertions(+), 26 deletions(-) diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index 966c1baf..ad16331e 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -899,25 +899,16 @@ impl AnalyzerContext { }; // Pre-compute field index mappings for efficient evaluation - let field_name_to_index: HashMap<&FieldName, usize> = collector_schema - .fields + let field_name_to_index: HashMap<&FieldName, usize> = input_field_names .iter() .enumerate() - .map(|(i, f)| (&f.name, i)) + .map(|(i, n)| (n, i)) .collect(); - let mut field_index_mapping: HashMap = HashMap::new(); - for (input_idx, field_name) in input_field_names.iter().enumerate() { - let collector_idx = field_name_to_index - .get(field_name) - .copied() - .ok_or_else(|| { - anyhow!( - "field `{}` not found in merged collector schema", - field_name - ) - })?; - field_index_mapping.insert(collector_idx, input_idx); - } + let field_index_mapping = collector_schema + .fields + .iter() + .map(|field| field_name_to_index.get(&field.name).copied()) + .collect::>>(); let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp { name: reactive_op_name, diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 78e05354..31ed60ce 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -2,8 +2,6 @@ use crate::base::schema::FieldSchema; use crate::base::spec::FieldName; use crate::prelude::*; -use std::collections::HashMap; - use crate::ops::interface::*; use crate::utils::fingerprint::{Fingerprint, Fingerprinter}; @@ -97,7 +95,7 @@ pub struct AnalyzedCollectOp { pub collector_schema: Arc, pub collector_ref: AnalyzedCollectorReference, /// Pre-computed mapping from collector field index to input field index. - pub field_index_mapping: HashMap, + pub field_index_mapping: Vec>, /// Fingerprinter of the collector's schema. Used to decide when to reuse auto-generated UUIDs. pub fingerprinter: Fingerprinter, } diff --git a/src/execution/evaluator.rs b/src/execution/evaluator.rs index 957e38d1..956cbdde 100644 --- a/src/execution/evaluator.rs +++ b/src/execution/evaluator.rs @@ -522,13 +522,15 @@ async fn evaluate_op_scope( .collect::>>()?; // Create field_values vector for all fields in the merged schema - let mut field_values: Vec = - vec![value::Value::Null; op.collector_schema.fields.len()]; - - // Use pre-computed field index mappings for O(1) field placement - for (&collector_idx, &input_idx) in op.field_index_mapping.iter() { - field_values[collector_idx] = input_values[input_idx].clone(); - } + let mut field_values = op + .field_index_mapping + .iter() + .map(|idx| { + idx.map_or(value::Value::Null, |input_idx| { + input_values[input_idx].clone() + }) + }) + .collect::>(); // Handle auto_uuid_field (assumed to be at position 0 for efficiency) if op.has_auto_uuid_field {