Skip to content

Commit bcc7d6d

Browse files
committed
limit concurrent planners
1 parent 9a1b730 commit bcc7d6d

File tree

1 file changed

+109
-0
lines changed

1 file changed

+109
-0
lines changed

datafusion/datasource/src/file_stream.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,17 @@ impl FileStream {
200200
self
201201
}
202202

203+
/// Return true if this stream already has the target number of planner I/O
204+
/// futures outstanding.
205+
///
206+
/// Child planners returned from a single parent planner should respect the
207+
/// same outstanding-I/O cap as top-level file planners. Otherwise a single
208+
/// parent plan can fan out into arbitrarily many waiting I/O futures and
209+
/// bypass `TARGET_CONCURRENT_PLANNERS`.
210+
fn planner_io_at_capacity(&self) -> bool {
211+
self.waiting_planners.len() >= TARGET_CONCURRENT_PLANNERS
212+
}
213+
203214
/// Run a planner on CPU until it either needs I/O or fully completes.
204215
///
205216
/// Any morsels produced along the way are appended to `self.morsels`. If
@@ -349,6 +360,13 @@ impl FileStream {
349360
{
350361
break;
351362
}
363+
// Apply the same outstanding-I/O cap to child planners that we
364+
// apply when admitting new files. Once enough planners are
365+
// waiting on I/O, leave any additional ready planners queued
366+
// until one of the existing I/O futures completes.
367+
if self.planner_io_at_capacity() {
368+
break;
369+
}
352370
let Some(planner) = self.ready_planners.pop_front() else {
353371
break;
354372
};
@@ -1175,6 +1193,97 @@ mod tests {
11751193
Ok(())
11761194
}
11771195

1196+
/// Verifies that child planners still respect the global outstanding-I/O
1197+
/// cap. Even if a parent planner returns three ready children, only two of
1198+
/// them should be allowed to create waiting I/O futures at once.
1199+
#[tokio::test]
1200+
async fn morsel_framework_child_planner_io_respects_global_cap() -> Result<()> {
1201+
let planner_1 = MockPlanner::builder()
1202+
.with_id(PlannerId(1))
1203+
.return_plan(ReturnPlanBuilder::new().with_io(IoFutureId(100), 1))
1204+
.return_morsel(MorselId(11), 41)
1205+
.return_none()
1206+
.build();
1207+
let planner_2 = MockPlanner::builder()
1208+
.with_id(PlannerId(2))
1209+
.return_plan(ReturnPlanBuilder::new().with_io(IoFutureId(101), 3))
1210+
.return_morsel(MorselId(12), 42)
1211+
.return_none()
1212+
.build();
1213+
let planner_3 = MockPlanner::builder()
1214+
.with_id(PlannerId(3))
1215+
.return_plan(ReturnPlanBuilder::new().with_io(IoFutureId(102), 1))
1216+
.return_morsel(MorselId(13), 43)
1217+
.return_none()
1218+
.build();
1219+
1220+
let parent_planner = MockPlanner::builder()
1221+
.with_id(PlannerId(0))
1222+
.return_plan(
1223+
ReturnPlanBuilder::new()
1224+
.with_planner(planner_1)
1225+
.with_planner(planner_2)
1226+
.with_planner(planner_3),
1227+
)
1228+
.return_none()
1229+
.build();
1230+
1231+
let test = MorselTest::new().with_file("file1.parquet", parent_planner);
1232+
1233+
insta::assert_snapshot!(test.run().await.unwrap(), @r"
1234+
----- Output Stream -----
1235+
Batch: 41
1236+
Batch: 42
1237+
Batch: 43
1238+
Done
1239+
----- File Stream Events -----
1240+
morselize_file: file1.parquet
1241+
planner_created: PlannerId(0)
1242+
planner_called: PlannerId(0)
1243+
planner_produced_child: PlannerId(0) -> PlannerId(1)
1244+
planner_produced_child: PlannerId(0) -> PlannerId(2)
1245+
planner_produced_child: PlannerId(0) -> PlannerId(3)
1246+
planner_called: PlannerId(0)
1247+
planner_called: PlannerId(1)
1248+
io_future_created: PlannerId(1), IoFutureId(100)
1249+
planner_called: PlannerId(2)
1250+
io_future_created: PlannerId(2), IoFutureId(101)
1251+
io_future_polled: PlannerId(1), IoFutureId(100)
1252+
io_future_polled: PlannerId(2), IoFutureId(101)
1253+
io_future_polled: PlannerId(1), IoFutureId(100)
1254+
io_future_resolved: PlannerId(1), IoFutureId(100)
1255+
io_future_polled: PlannerId(2), IoFutureId(101)
1256+
planner_called: PlannerId(3)
1257+
io_future_created: PlannerId(3), IoFutureId(102)
1258+
io_future_polled: PlannerId(2), IoFutureId(101)
1259+
io_future_polled: PlannerId(3), IoFutureId(102)
1260+
io_future_polled: PlannerId(2), IoFutureId(101)
1261+
io_future_resolved: PlannerId(2), IoFutureId(101)
1262+
io_future_polled: PlannerId(3), IoFutureId(102)
1263+
io_future_resolved: PlannerId(3), IoFutureId(102)
1264+
planner_called: PlannerId(1)
1265+
morsel_produced: PlannerId(1), MorselId(11)
1266+
planner_called: PlannerId(1)
1267+
planner_called: PlannerId(2)
1268+
morsel_produced: PlannerId(2), MorselId(12)
1269+
planner_called: PlannerId(2)
1270+
planner_called: PlannerId(3)
1271+
morsel_produced: PlannerId(3), MorselId(13)
1272+
planner_called: PlannerId(3)
1273+
morsel_stream_started: MorselId(11)
1274+
morsel_stream_batch_produced: MorselId(11), BatchId(41)
1275+
morsel_stream_finished: MorselId(11)
1276+
morsel_stream_started: MorselId(12)
1277+
morsel_stream_batch_produced: MorselId(12), BatchId(42)
1278+
morsel_stream_finished: MorselId(12)
1279+
morsel_stream_started: MorselId(13)
1280+
morsel_stream_batch_produced: MorselId(13), BatchId(43)
1281+
morsel_stream_finished: MorselId(13)
1282+
");
1283+
1284+
Ok(())
1285+
}
1286+
11781287
/// Verifies that `FileStream` overlaps planner I/O across multiple files
11791288
/// rather than waiting for the first file to finish before starting the
11801289
/// second.

0 commit comments

Comments
 (0)