Skip to content

Commit bfbc274

Browse files
authored
Propegate output_ordering to scan (vortex-data#4909)
Fix a bug where datafusion scans the provide `output_ordering` assume the stream itself is provided in-order, and we explicitly don't guarantee the stream we return from files is in-order. Signed-off-by: Adam Gutglick <[email protected]>
1 parent be66523 commit bfbc274

File tree

4 files changed

+92
-7
lines changed

4 files changed

+92
-7
lines changed

vortex-datafusion/src/convert/exprs.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -372,12 +372,14 @@ mod tests {
372372

373373
let result = ExprRef::try_from_df(&like_expr).unwrap();
374374

375-
assert_snapshot!(result.display_tree().to_string(), @r#"
376-
Like
377-
├── child: GetItem(text_col)
378-
│ └── Root
379-
└── pattern: Literal(value: "test%", dtype: utf8)
380-
"#);
375+
insta::allow_duplicates! {
376+
assert_snapshot!(result.display_tree().to_string(), @r#"
377+
Like
378+
├── child: GetItem(text_col)
379+
│ └── Root
380+
└── pattern: Literal(value: "test%", dtype: utf8)
381+
"#);
382+
}
381383
}
382384

383385
#[rstest]

vortex-datafusion/src/persistent/mod.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ mod tests {
4444
use datafusion::prelude::SessionContext;
4545
use datafusion_datasource::file_format::format_as_file_type;
4646
use datafusion_expr::LogicalPlanBuilder;
47+
use datafusion_physical_plan::display::DisplayableExecutionPlan;
4748
use insta::assert_snapshot;
4849
use rstest::rstest;
4950
use tempfile::{TempDir, tempdir};
@@ -188,4 +189,80 @@ mod tests {
188189

189190
Ok(())
190191
}
192+
193+
#[tokio::test]
194+
async fn create_table_ordered_by() -> anyhow::Result<()> {
195+
let dir = TempDir::new().unwrap();
196+
197+
let factory: VortexFormatFactory = VortexFormatFactory::new();
198+
let mut session_state_builder = SessionStateBuilder::new().with_default_features();
199+
register_vortex_format_factory(factory, &mut session_state_builder);
200+
let session = SessionContext::new_with_state(session_state_builder.build());
201+
202+
// Vortex
203+
session
204+
.sql(&format!(
205+
"CREATE EXTERNAL TABLE my_tbl_vx \
206+
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
207+
STORED AS vortex \
208+
WITH ORDER (c1 ASC)
209+
LOCATION '{}/vx/'",
210+
dir.path().to_str().unwrap()
211+
))
212+
.await?;
213+
214+
session
215+
.sql("INSERT INTO my_tbl_vx VALUES ('air', 5), ('balloon', 42)")
216+
.await?
217+
.collect()
218+
.await?;
219+
220+
session
221+
.sql("INSERT INTO my_tbl_vx VALUES ('zebra', 5)")
222+
.await?
223+
.collect()
224+
.await?;
225+
226+
session
227+
.sql("INSERT INTO my_tbl_vx VALUES ('texas', 2000), ('alabama', 2000)")
228+
.await?
229+
.collect()
230+
.await?;
231+
232+
let df = session
233+
.sql("SELECT * FROM my_tbl_vx ORDER BY c1 ASC limit 3")
234+
.await?;
235+
let (state, plan) = df.clone().into_parts();
236+
let physical_plan = state.create_physical_plan(&plan).await?;
237+
238+
insta::assert_snapshot!(DisplayableExecutionPlan::new(physical_plan.as_ref())
239+
.tree_render().to_string(), @r"
240+
┌───────────────────────────┐
241+
│ SortPreservingMergeExec │
242+
│ -------------------- │
243+
│ c1 ASC NULLS LASTlimit: │
244+
│ 3 │
245+
└─────────────┬─────────────┘
246+
┌─────────────┴─────────────┐
247+
│ DataSourceExec │
248+
│ -------------------- │
249+
│ files: 3 │
250+
│ format: vortex │
251+
└───────────────────────────┘
252+
");
253+
254+
let r = df.collect().await?;
255+
256+
insta::assert_snapshot!(pretty_format_batches(&r)?.to_string(), @r"
257+
+---------+------+
258+
| c1 | c2 |
259+
+---------+------+
260+
| air | 5 |
261+
| alabama | 2000 |
262+
| balloon | 42 |
263+
+---------+------+
264+
");
265+
266+
Ok(())
267+
}
191268
}

vortex-datafusion/src/persistent/opener.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ pub(crate) struct VortexOpener {
5555
pub limit: Option<usize>,
5656
pub metrics: VortexMetrics,
5757
pub layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
58+
/// Whether the query has output ordering specified
59+
pub has_output_ordering: bool,
5860
}
5961

6062
impl FileOpener for VortexOpener {
@@ -71,6 +73,7 @@ impl FileOpener for VortexOpener {
7173
let limit = self.limit;
7274
let metrics = self.metrics.clone();
7375
let layout_reader = self.layout_readers.clone();
76+
let has_output_ordering = self.has_output_ordering;
7477

7578
let projected_schema = match projection.as_ref() {
7679
None => logical_schema.clone(),
@@ -224,7 +227,7 @@ impl FileOpener for VortexOpener {
224227
.with_metrics(metrics)
225228
.with_projection(projection_expr)
226229
.with_some_filter(filter)
227-
.with_ordered(false)
230+
.with_ordered(has_output_ordering)
228231
.map(|chunk| chunk.to_struct().into_record_batch())
229232
.into_stream()
230233
.map_err(|e| {
@@ -432,6 +435,7 @@ mod tests {
432435
limit: None,
433436
metrics: Default::default(),
434437
layout_readers: Default::default(),
438+
has_output_ordering: false,
435439
};
436440

437441
// filter matches partition value
@@ -512,6 +516,7 @@ mod tests {
512516
limit: None,
513517
metrics: Default::default(),
514518
layout_readers: Default::default(),
519+
has_output_ordering: false,
515520
};
516521

517522
let filter = col("a").lt(lit(100_i32));

vortex-datafusion/src/persistent/source.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ impl FileSource for VortexSource {
148148
limit: base_config.limit,
149149
metrics: partition_metrics,
150150
layout_readers: self.layout_readers.clone(),
151+
has_output_ordering: !base_config.output_ordering.is_empty(),
151152
};
152153

153154
Arc::new(opener)

0 commit comments

Comments
 (0)