Skip to content

Commit 82d25ea

Browse files
authored
Extract tail batch slicing into shared pipeline helper (#22)
Consolidate the off-by-one-sensitive tail slicing logic into a single tail_batches implementation and reuse it from both REPL and CLI tail paths so behavior stays consistent. Made-with: Cursor
1 parent 65026fe commit 82d25ea

File tree

3 files changed

+35
-45
lines changed

3 files changed

+35
-45
lines changed

src/bin/datu/commands/tail.rs

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use datu::pipeline::orc::ReadOrcStep;
1515
use datu::pipeline::parquet::ReadParquetStep;
1616
use datu::pipeline::read_to_batches;
1717
use datu::pipeline::record_batch_filter::SelectColumnsStep;
18+
use datu::pipeline::tail_batches;
1819
use datu::utils::parse_select_columns;
1920
use orc_rust::reader::metadata::read_metadata;
2021
use parquet::file::metadata::ParquetMetaDataReader;
@@ -73,29 +74,7 @@ async fn tail_from_reader(
7374
let batches: Vec<arrow::record_batch::RecordBatch> = reader
7475
.map(|b| b.map_err(Error::ArrowError).map_err(Into::into))
7576
.collect::<Result<Vec<_>>>()?;
76-
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
77-
let number = number.min(total_rows);
78-
let skip = total_rows.saturating_sub(number);
79-
80-
let mut tail_batches = Vec::new();
81-
let mut rows_emitted = 0usize;
82-
let mut rows_skipped = 0usize;
83-
for batch in batches {
84-
let batch_rows = batch.num_rows();
85-
if rows_skipped + batch_rows <= skip {
86-
rows_skipped += batch_rows;
87-
continue;
88-
}
89-
let start_in_batch = skip.saturating_sub(rows_skipped);
90-
rows_skipped += start_in_batch;
91-
let take = (number - rows_emitted).min(batch_rows - start_in_batch);
92-
if take == 0 {
93-
break;
94-
}
95-
let slice = batch.slice(start_in_batch, take);
96-
tail_batches.push(slice);
97-
rows_emitted += take;
98-
}
77+
let tail_batches = tail_batches(batches, number);
9978

10079
let reader_step: RecordBatchReaderSource =
10180
Box::new(VecRecordBatchReaderSource::new(tail_batches));

src/cli/repl.rs

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::pipeline::display::write_record_batches_as_csv;
1212
use crate::pipeline::read_to_batches;
1313
use crate::pipeline::select;
1414
use crate::pipeline::select::ColumnSpec;
15+
use crate::pipeline::tail_batches;
1516
use crate::pipeline::write_batches;
1617

1718
/// A planned pipeline stage with validated, extracted arguments.
@@ -162,28 +163,7 @@ impl ReplPipelineBuilder {
162163
let batches = self.batches.take().ok_or_else(|| {
163164
Error::GenericError("tail requires a preceding read in the pipe".to_string())
164165
})?;
165-
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
166-
let number = n.min(total_rows);
167-
let skip = total_rows.saturating_sub(number);
168-
169-
let mut result = Vec::new();
170-
let mut rows_emitted = 0usize;
171-
let mut rows_skipped = 0usize;
172-
for batch in batches {
173-
let batch_rows = batch.num_rows();
174-
if rows_skipped + batch_rows <= skip {
175-
rows_skipped += batch_rows;
176-
continue;
177-
}
178-
let start_in_batch = skip.saturating_sub(rows_skipped);
179-
rows_skipped += start_in_batch;
180-
let take = (number - rows_emitted).min(batch_rows - start_in_batch);
181-
if take == 0 {
182-
break;
183-
}
184-
result.push(batch.slice(start_in_batch, take));
185-
rows_emitted += take;
186-
}
166+
let result = tail_batches(batches, n);
187167
self.batches = Some(result);
188168
Ok(())
189169
}

src/pipeline.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,37 @@ pub async fn read_to_batches(
169169
Ok(reader.into_batches())
170170
}
171171

172+
/// Takes the last `n` rows from a sequence of record batches.
173+
pub fn tail_batches(
174+
batches: Vec<arrow::record_batch::RecordBatch>,
175+
n: usize,
176+
) -> Vec<arrow::record_batch::RecordBatch> {
177+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
178+
let number = n.min(total_rows);
179+
let skip = total_rows.saturating_sub(number);
180+
181+
let mut result = Vec::new();
182+
let mut rows_emitted = 0usize;
183+
let mut rows_skipped = 0usize;
184+
for batch in batches {
185+
let batch_rows = batch.num_rows();
186+
if rows_skipped + batch_rows <= skip {
187+
rows_skipped += batch_rows;
188+
continue;
189+
}
190+
let start_in_batch = skip.saturating_sub(rows_skipped);
191+
rows_skipped += start_in_batch;
192+
let take = (number - rows_emitted).min(batch_rows - start_in_batch);
193+
if take == 0 {
194+
break;
195+
}
196+
result.push(batch.slice(start_in_batch, take));
197+
rows_emitted += take;
198+
}
199+
200+
result
201+
}
202+
172203
/// Writes record batches to output file. Used by REPL.
173204
pub async fn write_batches(
174205
batches: Vec<arrow::record_batch::RecordBatch>,

0 commit comments

Comments
 (0)