We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent e475323 commit 19591bdCopy full SHA for 19591bd
src/daft-distributed/src/pipeline_node/limit.rs
@@ -328,7 +328,10 @@ impl LimitNode {
328
329
// Update max_concurrent_tasks based on actual output
330
// Only update if we have remaining limit, and we did get some output
331
- if !limit_state.is_take_done() && total_num_rows > 0 && num_local_limits > 0 {
+ if limit_state.is_take_done() {
332
+ // Drop the input channel to cancel any input tasks
333
+ break;
334
+ } else if total_num_rows > 0 && num_local_limits > 0 {
335
let rows_per_task = total_num_rows.div_ceil(num_local_limits);
336
max_concurrent_tasks = limit_state.remaining_take().div_ceil(rows_per_task);
337
}
0 commit comments