@@ -40,10 +40,11 @@ use datafusion::physical_plan::{
4040use futures:: stream:: { Stream , StreamExt } ;
4141
4242use crate :: physical_plan:: exec:: index:: IndexScanExec ;
43+ use crate :: physical_plan:: exec:: sequential_union:: SequentialUnionExec ;
4344use crate :: physical_plan:: fetcher:: RecordFetcher ;
4445use crate :: physical_plan:: joins:: try_create_index_lookup_join;
4546use crate :: physical_plan:: { create_index_schema, ROW_ID_COLUMN_NAME } ;
46- use crate :: types:: { IndexFilter , IndexFilters } ;
47+ use crate :: types:: { IndexFilter , IndexFilters , UnionMode } ;
4748use datafusion:: arrow:: datatypes:: Schema ;
4849use datafusion:: physical_plan:: aggregates:: { AggregateExec , AggregateMode , PhysicalGroupBy } ;
4950use datafusion:: physical_plan:: empty:: EmptyExec ;
@@ -68,15 +69,25 @@ pub struct RecordFetchExec {
6869 input : Arc < dyn ExecutionPlan > ,
6970 metrics : ExecutionPlanMetricsSet ,
7071 schema : SchemaRef ,
72+ /// Controls how union operations are executed for OR conditions.
73+ union_mode : UnionMode ,
7174}
7275
7376impl RecordFetchExec {
7477 /// Create a new `RecordFetchExec` plan.
78+ ///
79+ /// # Arguments
80+ /// * `indexes` - Index filters to use for scanning
81+ /// * `limit` - Optional limit on the number of rows
82+ /// * `record_fetcher` - The fetcher to retrieve records by row ID
83+ /// * `schema` - Output schema
84+ /// * `union_mode` - Controls whether OR conditions use parallel or sequential union
7585 pub fn try_new (
7686 indexes : Vec < IndexFilter > ,
7787 limit : Option < usize > ,
7888 record_fetcher : Arc < dyn RecordFetcher > ,
7989 schema : SchemaRef ,
90+ union_mode : UnionMode ,
8091 ) -> Result < Self > {
8192 if indexes. is_empty ( ) {
8293 return Err ( DataFusionError :: Plan (
@@ -91,7 +102,7 @@ impl RecordFetchExec {
91102 }
92103
93104 let input = match indexes. first ( ) {
94- Some ( index_filter) => Self :: build_scan_exec ( index_filter, limit) ?,
105+ Some ( index_filter) => Self :: build_scan_exec ( index_filter, limit, union_mode ) ?,
95106 None => {
96107 return Err ( DataFusionError :: Plan (
97108 "RecordFetchExec requires at least one index" . to_string ( ) ,
@@ -114,6 +125,7 @@ impl RecordFetchExec {
114125 input,
115126 metrics : ExecutionPlanMetricsSet :: new ( ) ,
116127 schema,
128+ union_mode,
117129 } )
118130 }
119131
@@ -179,6 +191,7 @@ impl RecordFetchExec {
179191 /// # Arguments
180192 /// * `index_filter` - The [`IndexFilter`] tree specifying which indexes to scan and how to combine them
181193 /// * `limit` - Optional limit on the number of rows to return, passed through to individual index scans
194+ /// * `union_mode` - Controls whether OR conditions use parallel or sequential union
182195 ///
183196 /// # Returns
184197 /// An [`Arc<dyn ExecutionPlan>`] that produces a stream of row IDs matching the filter criteria.
@@ -192,6 +205,7 @@ impl RecordFetchExec {
192205 fn build_scan_exec (
193206 index_filter : & IndexFilter ,
194207 limit : Option < usize > ,
208+ union_mode : UnionMode ,
195209 ) -> Result < Arc < dyn ExecutionPlan > > {
196210 match index_filter {
197211 IndexFilter :: Single { index, filter } => {
@@ -217,7 +231,7 @@ impl RecordFetchExec {
217231 IndexFilter :: And ( filters) => {
218232 let mut plans = filters
219233 . iter ( )
220- . map ( |f| Self :: build_scan_exec ( f, limit) )
234+ . map ( |f| Self :: build_scan_exec ( f, limit, union_mode ) )
221235 . collect :: < Result < Vec < _ > > > ( ) ?;
222236
223237 if plans. is_empty ( ) {
@@ -236,7 +250,7 @@ impl RecordFetchExec {
236250 IndexFilter :: Or ( filters) => {
237251 let original_plans = filters
238252 . iter ( )
239- . map ( |f| Self :: build_scan_exec ( f, limit) )
253+ . map ( |f| Self :: build_scan_exec ( f, limit, union_mode ) )
240254 . collect :: < Result < Vec < _ > > > ( ) ?;
241255
242256 if original_plans. is_empty ( ) {
@@ -284,8 +298,13 @@ impl RecordFetchExec {
284298 }
285299 }
286300
287- // Now all plans have identical schemas, UnionExec will work
288- let union_input = UnionExec :: try_new ( normalized_plans) ?;
301+ // Now all plans have identical schemas - create union based on mode
302+ let union_input: Arc < dyn ExecutionPlan > = match union_mode {
303+ UnionMode :: Parallel => UnionExec :: try_new ( normalized_plans) ?,
304+ UnionMode :: Sequential => {
305+ Arc :: new ( SequentialUnionExec :: try_new ( normalized_plans) ?)
306+ }
307+ } ;
289308
290309 // Create aggregate to deduplicate row IDs
291310 let group_expr = PhysicalGroupBy :: new_single ( vec ! [ (
@@ -377,6 +396,7 @@ impl ExecutionPlan for RecordFetchExec {
377396 input : children[ 0 ] . clone ( ) ,
378397 metrics : self . metrics . clone ( ) ,
379398 schema : self . schema . clone ( ) ,
399+ union_mode : self . union_mode ,
380400 } ) )
381401 }
382402
@@ -949,8 +969,14 @@ mod tests {
949969 #[ tokio:: test]
950970 async fn test_record_fetch_exec_no_indexes ( ) {
951971 let fetcher = Arc :: new ( MockRecordFetcher :: new ( ) ) ;
952- let err =
953- RecordFetchExec :: try_new ( vec ! [ ] , None , fetcher, Arc :: new ( Schema :: empty ( ) ) ) . unwrap_err ( ) ;
972+ let err = RecordFetchExec :: try_new (
973+ vec ! [ ] ,
974+ None ,
975+ fetcher,
976+ Arc :: new ( Schema :: empty ( ) ) ,
977+ UnionMode :: Parallel ,
978+ )
979+ . unwrap_err ( ) ;
954980 assert ! (
955981 matches!( err, DataFusionError :: Plan ( ref msg) if msg == "RecordFetchExec requires at least one index" ) ,
956982 "Unexpected error: {err:?}"
@@ -970,7 +996,13 @@ mod tests {
970996 } ] ;
971997
972998 let fetcher = Arc :: new ( MockRecordFetcher :: new ( ) ) ;
973- let exec = RecordFetchExec :: try_new ( indexes, None , fetcher, Arc :: new ( Schema :: empty ( ) ) ) ?;
999+ let exec = RecordFetchExec :: try_new (
1000+ indexes,
1001+ None ,
1002+ fetcher,
1003+ Arc :: new ( Schema :: empty ( ) ) ,
1004+ UnionMode :: Parallel ,
1005+ ) ?;
9741006
9751007 // The input plan should be just the IndexScanExec
9761008 assert_eq ! ( exec. input. name( ) , "IndexScanExec" ) ;
@@ -1004,7 +1036,13 @@ mod tests {
10041036 ] ) ] ;
10051037
10061038 let fetcher = Arc :: new ( MockRecordFetcher :: new ( ) ) ;
1007- let exec = RecordFetchExec :: try_new ( indexes, None , fetcher, Arc :: new ( Schema :: empty ( ) ) ) ?;
1039+ let exec = RecordFetchExec :: try_new (
1040+ indexes,
1041+ None ,
1042+ fetcher,
1043+ Arc :: new ( Schema :: empty ( ) ) ,
1044+ UnionMode :: Parallel ,
1045+ ) ?;
10081046
10091047 // The input plan should be a HashJoinExec
10101048 assert_eq ! ( exec. input. name( ) , "HashJoinExec" ) ;
@@ -1027,7 +1065,13 @@ mod tests {
10271065
10281066 let indexes = vec ! [ IndexFilter :: And ( indexes_vec) ] ;
10291067 let fetcher = Arc :: new ( MockRecordFetcher :: new ( ) ) ;
1030- let exec = RecordFetchExec :: try_new ( indexes, None , fetcher, Arc :: new ( Schema :: empty ( ) ) ) ?;
1068+ let exec = RecordFetchExec :: try_new (
1069+ indexes,
1070+ None ,
1071+ fetcher,
1072+ Arc :: new ( Schema :: empty ( ) ) ,
1073+ UnionMode :: Parallel ,
1074+ ) ?;
10311075
10321076 // The input plan should be a tree of HashJoinExecs
10331077 assert_eq ! ( exec. input. name( ) , "HashJoinExec" ) ;
@@ -1063,7 +1107,8 @@ mod tests {
10631107 let schema = fetcher. schema ( ) ;
10641108
10651109 // 2. Create exec plan
1066- let exec = RecordFetchExec :: try_new ( indexes, None , fetcher, schema. clone ( ) ) ?;
1110+ let exec =
1111+ RecordFetchExec :: try_new ( indexes, None , fetcher, schema. clone ( ) , UnionMode :: Parallel ) ?;
10671112
10681113 // 3. Execute and collect results
10691114 let task_ctx = Arc :: new ( TaskContext :: default ( ) ) ;
@@ -1100,7 +1145,13 @@ mod tests {
11001145 let fetcher = Arc :: new ( MockRecordFetcher :: new ( ) . with_data ( ) ) ;
11011146
11021147 // 2. Create exec plan
1103- let exec = RecordFetchExec :: try_new ( indexes, None , fetcher, Arc :: new ( Schema :: empty ( ) ) ) ?;
1148+ let exec = RecordFetchExec :: try_new (
1149+ indexes,
1150+ None ,
1151+ fetcher,
1152+ Arc :: new ( Schema :: empty ( ) ) ,
1153+ UnionMode :: Parallel ,
1154+ ) ?;
11041155
11051156 // 3. Execute and collect results
11061157 let task_ctx = Arc :: new ( TaskContext :: default ( ) ) ;
@@ -1136,7 +1187,8 @@ mod tests {
11361187 let schema = fetcher. schema ( ) ;
11371188
11381189 // 2. Create exec plan
1139- let exec = RecordFetchExec :: try_new ( indexes, None , fetcher, schema. clone ( ) ) ?;
1190+ let exec =
1191+ RecordFetchExec :: try_new ( indexes, None , fetcher, schema. clone ( ) , UnionMode :: Parallel ) ?;
11401192
11411193 // 3. Execute and collect results
11421194 let task_ctx = Arc :: new ( TaskContext :: default ( ) ) ;
@@ -1193,7 +1245,13 @@ mod tests {
11931245 let fetcher = Arc :: new ( ErrorFetcher ) ;
11941246
11951247 // 2. Create exec plan
1196- let exec = RecordFetchExec :: try_new ( indexes, None , fetcher, Arc :: new ( Schema :: empty ( ) ) ) ?;
1248+ let exec = RecordFetchExec :: try_new (
1249+ indexes,
1250+ None ,
1251+ fetcher,
1252+ Arc :: new ( Schema :: empty ( ) ) ,
1253+ UnionMode :: Parallel ,
1254+ ) ?;
11971255
11981256 // 3. Execute and expect an error
11991257 let task_ctx = Arc :: new ( TaskContext :: default ( ) ) ;
0 commit comments