Skip to content

Commit 9dcf52e

Browse files
committed
refactor(executor): add max rows/bytes limit when schedule executor task
1 parent b1f122c commit 9dcf52e

File tree

1 file changed

+47
-21
lines changed
  • src/query/pipeline/core/src/processors

1 file changed

+47
-21
lines changed

src/query/pipeline/core/src/processors/port.rs

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -215,29 +215,21 @@ impl InputPort {
215215
pub fn pull_partial_data(&self, limit: &DataBlockLimit) -> Option<Result<DataBlock>> {
216216
unsafe {
217217
UpdateTrigger::update_input(&self.update_trigger);
218+
218219
// retrieve data without changing flags to prevent concurrent modifications
219-
match self.shared.swap(std::ptr::null_mut(), 0, 0) {
220-
address if address.is_null() => None,
221-
address => {
222-
let data_block = (*Box::from_raw(address)).0;
223-
if let Ok(data_block) = &data_block {
224-
let total_rows = data_block.num_rows();
225-
let total_bytes = data_block.memory_size();
226-
let rows_to_take = limit.rows_to_take(total_rows, total_bytes);
227-
// dbg!(rows_to_take, total_rows);
228-
// check if we need to split based on either rows or bytes limit
229-
if rows_to_take < total_rows {
230-
let (need, remain) = data_block.split_at(rows_to_take);
231-
let remain_data = Box::into_raw(Box::new(SharedData(Ok(remain))));
232-
// put back the remainder without changing flags
233-
// this keeps HAS_DATA set so we will have next pull to retrieve it
234-
self.shared.swap(remain_data, 0, 0);
235-
return Some(Ok(need));
236-
}
237-
}
238-
// take entire datablock and unset the flags to signal ready for next loop
220+
let address = self.shared.swap(std::ptr::null_mut(), 0, 0);
221+
if address.is_null() {
222+
self.shared.set_flags(0, HAS_DATA | NEED_DATA);
223+
return None;
224+
}
225+
226+
let shared_data = Box::from_raw(address);
227+
match shared_data.0 {
228+
Ok(data_block) => self.handle_data_block_with_limit(data_block, limit),
229+
Err(e) => {
230+
// for error cases, unset flags and return the error
239231
self.shared.set_flags(0, HAS_DATA | NEED_DATA);
240-
Some(data_block)
232+
Some(Err(e))
241233
}
242234
}
243235
}
@@ -256,6 +248,40 @@ impl InputPort {
256248
pub unsafe fn set_trigger(&self, update_trigger: *mut UpdateTrigger) {
257249
self.update_trigger.set_value(update_trigger)
258250
}
251+
252+
/// Process datablock with size limit, potentially splitting it
253+
fn handle_data_block_with_limit(
254+
&self,
255+
data_block: DataBlock,
256+
limit: &DataBlockLimit,
257+
) -> Option<Result<DataBlock>> {
258+
let total_rows = data_block.num_rows();
259+
260+
// empty datablock - consume entirely
261+
if total_rows == 0 {
262+
self.shared.set_flags(0, HAS_DATA | NEED_DATA);
263+
return Some(Ok(data_block));
264+
}
265+
266+
let total_bytes = data_block.memory_size();
267+
let rows_to_take = limit.rows_to_take(total_rows, total_bytes);
268+
269+
// no splitting needed - consume entire datablock
270+
if rows_to_take >= total_rows {
271+
self.shared.set_flags(0, HAS_DATA | NEED_DATA);
272+
return Some(Ok(data_block));
273+
}
274+
275+
// split datablock and store remainder
276+
let (taken, remainder) = data_block.split_at(rows_to_take);
277+
let remainder_ptr = Box::into_raw(Box::new(SharedData(Ok(remainder))));
278+
279+
// put back the remainder without changing flags
280+
// this keeps HAS_DATA set so we will have next pull to retrieve it
281+
self.shared.swap(remainder_ptr, 0, 0);
282+
283+
Some(Ok(taken))
284+
}
259285
}
260286

261287
pub struct OutputPort {

0 commit comments

Comments
 (0)