Skip to content

Commit acb6268

Browse files
committed
rebase
1 parent a603cc0 commit acb6268

File tree

17 files changed

+381
-97
lines changed

17 files changed

+381
-97
lines changed

native-engine/auron-planner/proto/auron.proto

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,8 @@ message SortExecNode {
632632

633633
message FetchLimit {
634634
// wrap into a message to make it optional
635-
uint64 limit = 1;
635+
uint32 limit = 1;
636+
uint32 offset = 2;
636637
}
637638

638639
message PhysicalRepartition {
@@ -706,7 +707,8 @@ enum AggMode {
706707

707708
message LimitExecNode {
708709
PhysicalPlanNode input = 1;
709-
uint64 limit = 2;
710+
uint32 limit = 2;
711+
uint32 offset = 3;
710712
}
711713

712714
message FFIReaderExecNode {

native-engine/auron-planner/src/planner.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -357,12 +357,12 @@ impl PhysicalPlanner {
357357
panic!("Failed to parse physical sort expressions: {}", e);
358358
});
359359

360+
let fetch = sort.fetch_limit.as_ref();
361+
let limit = fetch.map(|f| f.limit as usize);
362+
let offset = fetch.map(|f| f.offset as usize).unwrap_or(0);
363+
360364
// always preserve partitioning
361-
Ok(Arc::new(SortExec::new(
362-
input,
363-
exprs,
364-
sort.fetch_limit.as_ref().map(|limit| limit.limit as usize),
365-
)))
365+
Ok(Arc::new(SortExec::new(input, exprs, limit, offset)))
366366
}
367367
PhysicalPlanType::BroadcastJoinBuildHashMap(bhm) => {
368368
let input: Arc<dyn ExecutionPlan> = convert_box_required!(self, bhm.input)?;
@@ -560,7 +560,11 @@ impl PhysicalPlanner {
560560
}
561561
PhysicalPlanType::Limit(limit) => {
562562
let input: Arc<dyn ExecutionPlan> = convert_box_required!(self, limit.input)?;
563-
Ok(Arc::new(LimitExec::new(input, limit.limit)))
563+
Ok(Arc::new(LimitExec::new(
564+
input,
565+
limit.limit as usize,
566+
limit.offset as usize,
567+
)))
564568
}
565569
PhysicalPlanType::FfiReader(ffi_reader) => {
566570
let schema = Arc::new(convert_required!(ffi_reader.schema)?);
@@ -573,7 +577,11 @@ impl PhysicalPlanner {
573577
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
574578
let input: Arc<dyn ExecutionPlan> =
575579
convert_box_required!(self, coalesce_batches.input)?;
576-
Ok(Arc::new(LimitExec::new(input, coalesce_batches.batch_size)))
580+
Ok(Arc::new(LimitExec::new(
581+
input,
582+
coalesce_batches.batch_size as usize,
583+
0,
584+
)))
577585
}
578586
PhysicalPlanType::Expand(expand) => {
579587
let schema = Arc::new(convert_required!(expand.schema)?);

native-engine/datafusion-ext-plans/src/limit_exec.rs

Lines changed: 87 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,18 @@ use crate::common::execution_context::ExecutionContext;
4141
#[derive(Debug)]
4242
pub struct LimitExec {
4343
input: Arc<dyn ExecutionPlan>,
44-
limit: u64,
44+
limit: usize,
45+
offset: usize,
4546
pub metrics: ExecutionPlanMetricsSet,
4647
props: OnceCell<PlanProperties>,
4748
}
4849

4950
impl LimitExec {
50-
pub fn new(input: Arc<dyn ExecutionPlan>, limit: u64) -> Self {
51+
pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize, offset: usize) -> Self {
5152
Self {
5253
input,
5354
limit,
55+
offset,
5456
metrics: ExecutionPlanMetricsSet::new(),
5557
props: OnceCell::new(),
5658
}
@@ -59,7 +61,7 @@ impl LimitExec {
5961

6062
impl DisplayAs for LimitExec {
6163
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
62-
write!(f, "LimitExec(limit={})", self.limit)
64+
write!(f, "LimitExec(limit={},offset={})", self.limit, self.offset)
6365
}
6466
}
6567

@@ -95,7 +97,11 @@ impl ExecutionPlan for LimitExec {
9597
self: Arc<Self>,
9698
children: Vec<Arc<dyn ExecutionPlan>>,
9799
) -> Result<Arc<dyn ExecutionPlan>> {
98-
Ok(Arc::new(Self::new(children[0].clone(), self.limit)))
100+
Ok(Arc::new(Self::new(
101+
children[0].clone(),
102+
self.limit,
103+
self.offset,
104+
)))
99105
}
100106

101107
fn execute(
@@ -105,23 +111,27 @@ impl ExecutionPlan for LimitExec {
105111
) -> Result<SendableRecordBatchStream> {
106112
let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics);
107113
let input = exec_ctx.execute_with_input_stats(&self.input)?;
108-
execute_limit(input, self.limit, exec_ctx)
114+
if self.offset == 0 {
115+
execute_limit(input, self.limit, exec_ctx)
116+
} else {
117+
execute_limit_with_offset(input, self.limit, self.offset, exec_ctx)
118+
}
109119
}
110120

111121
fn statistics(&self) -> Result<Statistics> {
112122
Statistics::with_fetch(
113123
self.input.statistics()?,
114124
self.schema(),
115-
Some(self.limit as usize),
116-
0,
125+
Some(self.limit),
126+
self.offset,
117127
1,
118128
)
119129
}
120130
}
121131

122132
fn execute_limit(
123133
mut input: SendableRecordBatchStream,
124-
limit: u64,
134+
limit: usize,
125135
exec_ctx: Arc<ExecutionContext>,
126136
) -> Result<SendableRecordBatchStream> {
127137
Ok(exec_ctx
@@ -131,11 +141,49 @@ fn execute_limit(
131141
while remaining > 0
132142
&& let Some(mut batch) = input.next().await.transpose()?
133143
{
134-
if remaining < batch.num_rows() as u64 {
135-
batch = batch.slice(0, remaining as usize);
144+
if remaining < batch.num_rows() {
145+
batch = batch.slice(0, remaining);
146+
remaining = 0;
147+
} else {
148+
remaining -= batch.num_rows();
149+
}
150+
exec_ctx.baseline_metrics().record_output(batch.num_rows());
151+
sender.send(batch).await;
152+
}
153+
Ok(())
154+
}))
155+
}
156+
157+
fn execute_limit_with_offset(
158+
mut input: SendableRecordBatchStream,
159+
limit: usize,
160+
offset: usize,
161+
exec_ctx: Arc<ExecutionContext>,
162+
) -> Result<SendableRecordBatchStream> {
163+
Ok(exec_ctx
164+
.clone()
165+
.output_with_sender("Limit", move |sender| async move {
166+
let mut skip = offset;
167+
let mut remaining = limit - skip;
168+
while remaining > 0
169+
&& let Some(mut batch) = input.next().await.transpose()?
170+
{
171+
if skip > 0 {
172+
let rows = batch.num_rows();
173+
if skip >= rows {
174+
skip -= rows;
175+
continue;
176+
}
177+
178+
batch = batch.slice(skip, rows - skip);
179+
skip = 0;
180+
}
181+
182+
if remaining < batch.num_rows() {
183+
batch = batch.slice(0, remaining);
136184
remaining = 0;
137185
} else {
138-
remaining -= batch.num_rows() as u64;
186+
remaining -= batch.num_rows();
139187
}
140188
exec_ctx.baseline_metrics().record_output(batch.num_rows());
141189
sender.send(batch).await;
@@ -207,7 +255,7 @@ mod test {
207255
("b", &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
208256
("c", &vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4]),
209257
)?;
210-
let limit_exec = LimitExec::new(input, 2_u64);
258+
let limit_exec = LimitExec::new(input, 2, 0);
211259
let session_ctx = SessionContext::new();
212260
let task_ctx = session_ctx.task_ctx();
213261
let output = limit_exec.execute(0, task_ctx)?;
@@ -226,4 +274,31 @@ mod test {
226274
assert_eq!(row_count, Precision::Exact(2));
227275
Ok(())
228276
}
277+
278+
#[tokio::test]
279+
async fn test_limit_with_offset() -> Result<()> {
280+
let input = build_table(
281+
("a", &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
282+
("b", &vec![9, 8, 7, 6, 5, 4, 3, 2, 1, 0]),
283+
("c", &vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4]),
284+
)?;
285+
let limit_exec = LimitExec::new(input, 7, 5);
286+
let session_ctx = SessionContext::new();
287+
let task_ctx = session_ctx.task_ctx();
288+
let output = limit_exec.execute(0, task_ctx)?;
289+
let batches = common::collect(output).await?;
290+
let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
291+
292+
let expected = vec![
293+
"+---+---+---+",
294+
"| a | b | c |",
295+
"+---+---+---+",
296+
"| 5 | 4 | 0 |",
297+
"| 6 | 3 | 1 |",
298+
"+---+---+---+",
299+
];
300+
assert_batches_eq!(expected, &batches);
301+
assert_eq!(row_count, 2);
302+
Ok(())
303+
}
229304
}

0 commit comments

Comments
 (0)