@@ -13,9 +13,7 @@ use arrow::datatypes::SchemaRef;
1313use arrow:: record_batch:: { RecordBatch , RecordBatchOptions } ;
1414use datafusion:: error:: { DataFusionError , Result as DataFusionResult } ;
1515use datafusion:: execution:: { RecordBatchStream , SendableRecordBatchStream , TaskContext } ;
16- use datafusion:: physical_plan:: {
17- DisplayAs , DisplayFormatType , ExecutionPlan , PlanProperties ,
18- } ;
16+ use datafusion:: physical_plan:: { DisplayAs , DisplayFormatType , ExecutionPlan , PlanProperties } ;
1917use futures:: Stream ;
2018
2119/// Custom execution plan that filters out deleted rows
@@ -59,15 +57,15 @@ impl DisplayAs for DeleteFilterExec {
5957 self . file_path,
6058 self . deleted_positions. len( )
6159 )
62- }
60+ } ,
6361 DisplayFormatType :: TreeRender => {
6462 write ! (
6563 f,
6664 "DeleteFilterExec: file={}, deletes={}" ,
6765 self . file_path,
6866 self . deleted_positions. len( )
6967 )
70- }
68+ } ,
7169 }
7270 }
7371}
@@ -142,10 +140,10 @@ impl Stream for DeleteFilterStream {
142140 // Update row offset for next batch
143141 self . row_offset += batch. num_rows ( ) as i64 ;
144142 Poll :: Ready ( Some ( Ok ( filtered_batch) ) )
145- }
143+ } ,
146144 Err ( e) => Poll :: Ready ( Some ( Err ( e) ) ) ,
147145 }
148- }
146+ } ,
149147 Poll :: Ready ( Some ( Err ( e) ) ) => Poll :: Ready ( Some ( Err ( e) ) ) ,
150148 Poll :: Ready ( None ) => Poll :: Ready ( None ) ,
151149 Poll :: Pending => Poll :: Pending ,
@@ -180,20 +178,15 @@ impl DeleteFilterStream {
180178 if batch. num_columns ( ) == 0 {
181179 let mut options = RecordBatchOptions :: new ( ) ;
182180 options = options. with_row_count ( Some ( keep_indices. len ( ) ) ) ;
183- return RecordBatch :: try_new_with_options (
184- batch. schema ( ) ,
185- vec ! [ ] ,
186- & options,
187- )
188- . map_err ( |e| DataFusionError :: ArrowError ( Box :: new ( e) , None ) ) ;
181+ return RecordBatch :: try_new_with_options ( batch. schema ( ) , vec ! [ ] , & options)
182+ . map_err ( |e| DataFusionError :: ArrowError ( Box :: new ( e) , None ) ) ;
189183 }
190184
191185 // Use Arrow's take kernel to select rows
192186 use arrow:: array:: UInt32Array ;
193187 use arrow:: compute:: take;
194188
195- let indices =
196- UInt32Array :: from ( keep_indices. iter ( ) . map ( |& i| i as u32 ) . collect :: < Vec < _ > > ( ) ) ;
189+ let indices = UInt32Array :: from ( keep_indices. iter ( ) . map ( |& i| i as u32 ) . collect :: < Vec < _ > > ( ) ) ;
197190
198191 let filtered_columns: DataFusionResult < Vec < _ > > = batch
199192 . columns ( )
@@ -225,16 +218,12 @@ mod tests {
225218 #[ test]
226219 fn test_filter_batch_ignores_out_of_bounds_positions ( ) {
227220 // Create a simple RecordBatch with 4 rows
228- let schema = Arc :: new ( Schema :: new ( vec ! [
229- Field :: new( "id" , DataType :: Int32 , false ) ,
230- ] ) ) ;
221+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "id" , DataType :: Int32 , false ) ] ) ) ;
231222
232223 let id_array = Int32Array :: from ( vec ! [ 1 , 2 , 3 , 4 ] ) ;
233- let batch = RecordBatch :: try_new (
234- schema. clone ( ) ,
235- vec ! [ Arc :: new( id_array) as Arc <dyn Array >] ,
236- )
237- . unwrap ( ) ;
224+ let batch =
225+ RecordBatch :: try_new ( schema. clone ( ) , vec ! [ Arc :: new( id_array) as Arc <dyn Array >] )
226+ . unwrap ( ) ;
238227
239228 // Create delete positions: 1 (valid), 1000, 2000, 5000 (all out of bounds)
240229 // Only position 1 should actually delete a row (the row with id=2)
@@ -275,16 +264,12 @@ mod tests {
275264 #[ test]
276265 fn test_filter_batch_all_out_of_bounds_positions ( ) {
277266 // Test the edge case where ALL delete positions are beyond the file
278- let schema = Arc :: new ( Schema :: new ( vec ! [
279- Field :: new( "id" , DataType :: Int32 , false ) ,
280- ] ) ) ;
267+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "id" , DataType :: Int32 , false ) ] ) ) ;
281268
282269 let id_array = Int32Array :: from ( vec ! [ 10 , 20 , 30 ] ) ;
283- let batch = RecordBatch :: try_new (
284- schema. clone ( ) ,
285- vec ! [ Arc :: new( id_array) as Arc <dyn Array >] ,
286- )
287- . unwrap ( ) ;
270+ let batch =
271+ RecordBatch :: try_new ( schema. clone ( ) , vec ! [ Arc :: new( id_array) as Arc <dyn Array >] )
272+ . unwrap ( ) ;
288273
289274 // All positions are way beyond the 3-row file
290275 let deleted_positions: HashSet < i64 > = [ 1000 , 2000 , 3000 , 9999 ] . into_iter ( ) . collect ( ) ;
@@ -317,16 +302,15 @@ mod tests {
317302 #[ test]
318303 fn test_filter_batch_with_row_offset ( ) {
319304 // Test that row_offset is correctly considered when checking positions
320- let schema = Arc :: new ( Schema :: new ( vec ! [
321- Field :: new( "value" , DataType :: Int32 , false ) ,
322- ] ) ) ;
305+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
306+ "value" ,
307+ DataType :: Int32 ,
308+ false ,
309+ ) ] ) ) ;
323310
324311 let array = Int32Array :: from ( vec ! [ 100 , 200 , 300 , 400 ] ) ;
325- let batch = RecordBatch :: try_new (
326- schema. clone ( ) ,
327- vec ! [ Arc :: new( array) as Arc <dyn Array >] ,
328- )
329- . unwrap ( ) ;
312+ let batch =
313+ RecordBatch :: try_new ( schema. clone ( ) , vec ! [ Arc :: new( array) as Arc <dyn Array >] ) . unwrap ( ) ;
330314
331315 // Delete position 11 and 1000 (way out of bounds)
332316 // With row_offset=10, this batch contains global positions [10, 11, 12, 13]
0 commit comments