Skip to content

Commit 89d6819

Browse files
committed
fix hot loop
1 parent 2f3b919 commit 89d6819

File tree

2 files changed

+24
-12
lines changed

2 files changed

+24
-12
lines changed

datafusion/datasource/src/file_stream/mod.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,10 @@ impl MorselQueue {
271271
}
272272

273273
/// Extend the local queue with ready morsels.
274-
pub(super) fn extend_morsels(&mut self, morsels: impl IntoIterator<Item = Box<dyn Morsel>>) {
274+
pub(super) fn extend_morsels(
275+
&mut self,
276+
morsels: impl IntoIterator<Item = Box<dyn Morsel>>,
277+
) {
275278
self.morsels.extend(morsels);
276279
}
277280

@@ -386,7 +389,8 @@ impl FileStream {
386389
// streams. Sending work through shared state in the single-stream case
387390
// changes local scheduling behavior without enabling any useful
388391
// stealing.
389-
!self.preserve_order && self.shared_file_stream_state.registered_stream_count() > 1
392+
!self.preserve_order
393+
&& self.shared_file_stream_state.registered_stream_count() > 1
390394
}
391395

392396
/// Enqueue ready planners either locally or into the shared queue.
@@ -483,7 +487,7 @@ impl FileStream {
483487
self.push_ready_morsels(plan.take_morsels());
484488
self.push_ready_planners(plan.take_planners());
485489
if let Some(io_future) = plan.take_io_future() {
486-
self.queues.push_waiting_planner(WaitingPlanner::new(
490+
self.queues.push_waiting_planner(WaitingPlanner::new(
487491
planner,
488492
io_future,
489493
io_permit
@@ -794,7 +798,14 @@ impl FileStream {
794798
};
795799
self.shared_file_stream_state
796800
.register_waker(stream_id, cx.waker());
797-
} else if self.queues.has_morsels() || self.queues.has_ready_planners() {
801+
// If the active reader just returned `Pending`, yield back to the
802+
// executor instead of looping immediately. Otherwise a reader that
803+
// needs more I/O can hot-loop inside `poll_inner` as long as there
804+
// is buffered work behind it, repeatedly polling the same pending
805+
// reader without giving the executor a chance to wake it.
806+
} else if self.reader.is_none()
807+
&& (self.queues.has_morsels() || self.queues.has_ready_planners())
808+
{
798809
continue;
799810
}
800811

@@ -1024,12 +1035,12 @@ impl FileStreamMetrics {
10241035

10251036
#[cfg(test)]
10261037
mod tests {
1038+
use crate::file_groups::FileGroup;
10271039
use crate::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
10281040
use crate::morsel::test_utils::{
10291041
IoFutureId, MockMorselSpec, MockMorselizer, MockPlanner, MorselId,
10301042
MorselObserver, PlannerId, ReturnPlanBuilder,
10311043
};
1032-
use crate::file_groups::FileGroup;
10331044
use crate::tests::make_partition;
10341045
use crate::{PartitionedFile, TableSchema};
10351046
use arrow::datatypes::Int32Type;
@@ -1428,8 +1439,7 @@ mod tests {
14281439
let batch_id = next_batch_id(&mut streams[stream_id.0]).await?;
14291440
assert!(
14301441
batch_id.is_some(),
1431-
"expected stream {:?} to produce a batch",
1432-
stream_id
1442+
"expected stream {stream_id:?} to produce a batch"
14331443
);
14341444
outputs[stream_id.0].push(batch_id.unwrap());
14351445
}
@@ -1468,7 +1478,6 @@ mod tests {
14681478
}))
14691479
}
14701480

1471-
14721481
/// Verifies the simplest morsel-driven flow: one planner produces one
14731482
/// morsel immediately, and the morsel is then scanned to completion.
14741483
#[tokio::test]
@@ -1866,7 +1875,8 @@ mod tests {
18661875
/// Verifies that an idle sibling stream can steal ready morsels even when
18671876
/// it has no local files of its own.
18681877
#[tokio::test]
1869-
async fn morsel_framework_sibling_stream_steals_when_only_one_has_files() -> Result<()> {
1878+
async fn morsel_framework_sibling_stream_steals_when_only_one_has_files() -> Result<()>
1879+
{
18701880
let test = MultiStreamMorselTest::new(2)
18711881
.with_file_in_partition(
18721882
0,
@@ -1911,8 +1921,8 @@ mod tests {
19111921
/// Verifies that a sibling stream waiting on its own file's I/O can steal
19121922
/// ready work from a faster sibling and continue making progress.
19131923
#[tokio::test]
1914-
async fn morsel_framework_sibling_stream_steals_while_own_file_waits_on_io(
1915-
) -> Result<()> {
1924+
async fn morsel_framework_sibling_stream_steals_while_own_file_waits_on_io()
1925+
-> Result<()> {
19161926
let test = MultiStreamMorselTest::new(2)
19171927
.with_file_in_partition(
19181928
0,

datafusion/datasource/src/file_stream/shared_state.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,9 @@ impl IoState {
384384

385385
match self.mode {
386386
SharedFileStreamMode::Unordered => self.can_issue_unordered(stream_id),
387-
SharedFileStreamMode::PreserveOrder => self.can_issue_preserve_order(stream_id),
387+
SharedFileStreamMode::PreserveOrder => {
388+
self.can_issue_preserve_order(stream_id)
389+
}
388390
}
389391
}
390392

0 commit comments

Comments
 (0)