Skip to content

Commit 27715d9

Browse files
committed
Merge branch 'main' into flaky-16180
2 parents 99fa33a + 2a7f64a commit 27715d9

File tree

42 files changed

+3777
-195
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+3777
-195
lines changed

.github/workflows/rust.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,28 @@ jobs:
476476
POSTGRES_HOST: postgres
477477
POSTGRES_PORT: ${{ job.services.postgres.ports[5432] }}
478478

479+
sqllogictest-substrait:
480+
name: "Run sqllogictest in Substrait round-trip mode"
481+
needs: linux-build-lib
482+
runs-on: ubuntu-latest
483+
container:
484+
image: amd64/rust
485+
steps:
486+
- uses: actions/checkout@v4
487+
with:
488+
submodules: true
489+
fetch-depth: 1
490+
- name: Setup Rust toolchain
491+
uses: ./.github/actions/setup-builder
492+
with:
493+
rust-version: stable
494+
- name: Run sqllogictest
495+
# TODO: Right now several tests are failing in Substrait round-trip mode, so this
496+
# command cannot be run for all the .slt files. Run it for just one that works (limit.slt)
497+
# until most of the tickets in https://github.com/apache/datafusion/issues/16248 are addressed
498+
# and this command can be run without filters.
499+
run: cargo test --test sqllogictests -- --substrait-round-trip limit.slt
500+
479501
# Temporarily commenting out the Windows flow, the reason is enormously slow running build
480502
# Waiting for new Windows 2025 github runner
481503
# Details: https://github.com/apache/datafusion/issues/13726

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,5 +216,5 @@ unnecessary_lazy_evaluations = "warn"
216216
uninlined_format_args = "warn"
217217

218218
[workspace.lints.rust]
219-
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)"] }
219+
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)", "cfg(tarpaulin_include)"] }
220220
unused_qualifications = "deny"

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 5 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,8 @@ pub(crate) mod test_util {
107107
mod tests {
108108

109109
use std::fmt::{self, Display, Formatter};
110-
use std::pin::Pin;
111110
use std::sync::atomic::{AtomicUsize, Ordering};
112111
use std::sync::Arc;
113-
use std::task::{Context, Poll};
114112
use std::time::Duration;
115113

116114
use crate::datasource::file_format::parquet::test_util::store_parquet;
@@ -120,7 +118,7 @@ mod tests {
120118
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
121119

122120
use arrow::array::RecordBatch;
123-
use arrow_schema::{Schema, SchemaRef};
121+
use arrow_schema::Schema;
124122
use datafusion_catalog::Session;
125123
use datafusion_common::cast::{
126124
as_binary_array, as_binary_view_array, as_boolean_array, as_float32_array,
@@ -140,7 +138,7 @@ mod tests {
140138
};
141139
use datafusion_execution::object_store::ObjectStoreUrl;
142140
use datafusion_execution::runtime_env::RuntimeEnv;
143-
use datafusion_execution::{RecordBatchStream, TaskContext};
141+
use datafusion_execution::TaskContext;
144142
use datafusion_expr::dml::InsertOp;
145143
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
146144
use datafusion_physical_plan::{collect, ExecutionPlan};
@@ -153,7 +151,7 @@ mod tests {
153151
use async_trait::async_trait;
154152
use datafusion_datasource::file_groups::FileGroup;
155153
use futures::stream::BoxStream;
156-
use futures::{Stream, StreamExt};
154+
use futures::StreamExt;
157155
use insta::assert_snapshot;
158156
use log::error;
159157
use object_store::local::LocalFileSystem;
@@ -169,6 +167,8 @@ mod tests {
169167
use parquet::format::FileMetaData;
170168
use tokio::fs::File;
171169

170+
use crate::test_util::bounded_stream;
171+
172172
enum ForceViews {
173173
Yes,
174174
No,
@@ -1662,43 +1662,4 @@ mod tests {
16621662

16631663
Ok(())
16641664
}
1665-
1666-
/// Creates an bounded stream for testing purposes.
1667-
fn bounded_stream(
1668-
batch: RecordBatch,
1669-
limit: usize,
1670-
) -> datafusion_execution::SendableRecordBatchStream {
1671-
Box::pin(BoundedStream {
1672-
count: 0,
1673-
limit,
1674-
batch,
1675-
})
1676-
}
1677-
1678-
struct BoundedStream {
1679-
limit: usize,
1680-
count: usize,
1681-
batch: RecordBatch,
1682-
}
1683-
1684-
impl Stream for BoundedStream {
1685-
type Item = Result<RecordBatch>;
1686-
1687-
fn poll_next(
1688-
mut self: Pin<&mut Self>,
1689-
_cx: &mut Context<'_>,
1690-
) -> Poll<Option<Self::Item>> {
1691-
if self.count >= self.limit {
1692-
return Poll::Ready(None);
1693-
}
1694-
self.count += 1;
1695-
Poll::Ready(Some(Ok(self.batch.clone())))
1696-
}
1697-
}
1698-
1699-
impl RecordBatchStream for BoundedStream {
1700-
fn schema(&self) -> SchemaRef {
1701-
self.batch.schema()
1702-
}
1703-
}
17041665
}

datafusion/core/src/test_util/mod.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ pub mod parquet;
2222

2323
pub mod csv;
2424

25+
use futures::Stream;
2526
use std::any::Any;
2627
use std::collections::HashMap;
2728
use std::fs::File;
2829
use std::io::Write;
2930
use std::path::Path;
3031
use std::sync::Arc;
32+
use std::task::{Context, Poll};
3133

3234
use crate::catalog::{TableProvider, TableProviderFactory};
3335
use crate::dataframe::DataFrame;
@@ -38,11 +40,13 @@ use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
3840
use crate::physical_plan::ExecutionPlan;
3941
use crate::prelude::{CsvReadOptions, SessionContext};
4042

43+
use crate::execution::SendableRecordBatchStream;
4144
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
4245
use arrow::record_batch::RecordBatch;
4346
use datafusion_catalog::Session;
4447
use datafusion_common::TableReference;
4548
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
49+
use std::pin::Pin;
4650

4751
use async_trait::async_trait;
4852

@@ -52,6 +56,8 @@ use tempfile::TempDir;
5256
pub use datafusion_common::test_util::parquet_test_data;
5357
pub use datafusion_common::test_util::{arrow_test_data, get_data_dir};
5458

59+
use crate::execution::RecordBatchStream;
60+
5561
/// Scan an empty data source, mainly used in tests
5662
pub fn scan_empty(
5763
name: Option<&str>,
@@ -234,3 +240,44 @@ pub fn register_unbounded_file_with_ordering(
234240
ctx.register_table(table_name, Arc::new(StreamTable::new(Arc::new(config))))?;
235241
Ok(())
236242
}
243+
244+
/// Creates a bounded stream that emits the same record batch a specified number of times.
245+
/// This is useful for testing purposes.
246+
pub fn bounded_stream(
247+
record_batch: RecordBatch,
248+
limit: usize,
249+
) -> SendableRecordBatchStream {
250+
Box::pin(BoundedStream {
251+
record_batch,
252+
count: 0,
253+
limit,
254+
})
255+
}
256+
257+
struct BoundedStream {
258+
record_batch: RecordBatch,
259+
count: usize,
260+
limit: usize,
261+
}
262+
263+
impl Stream for BoundedStream {
264+
type Item = Result<RecordBatch, crate::error::DataFusionError>;
265+
266+
fn poll_next(
267+
mut self: Pin<&mut Self>,
268+
_cx: &mut Context<'_>,
269+
) -> Poll<Option<Self::Item>> {
270+
if self.count >= self.limit {
271+
Poll::Ready(None)
272+
} else {
273+
self.count += 1;
274+
Poll::Ready(Some(Ok(self.record_batch.clone())))
275+
}
276+
}
277+
}
278+
279+
impl RecordBatchStream for BoundedStream {
280+
fn schema(&self) -> SchemaRef {
281+
self.record_batch.schema()
282+
}
283+
}

datafusion/expr-common/src/groups_accumulator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use arrow::array::{ArrayRef, BooleanArray};
2121
use datafusion_common::{not_impl_err, Result};
2222

2323
/// Describes how many rows should be emitted during grouping.
24-
#[derive(Debug, Clone, Copy)]
24+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2525
pub enum EmitTo {
2626
/// Emit all groups
2727
All,

datafusion/ffi/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ arrow-schema = { workspace = true }
4444
async-ffi = { version = "0.5.0", features = ["abi_stable"] }
4545
async-trait = { workspace = true }
4646
datafusion = { workspace = true, default-features = false }
47+
datafusion-functions-aggregate-common = { workspace = true }
4748
datafusion-proto = { workspace = true }
49+
datafusion-proto-common = { workspace = true }
4850
futures = { workspace = true }
4951
log = { workspace = true }
5052
prost = { workspace = true }
@@ -56,3 +58,4 @@ doc-comment = { workspace = true }
5658

5759
[features]
5860
integration-tests = []
61+
tarpaulin_include = [] # Exists only to prevent warnings on stable and still have accurate coverage

datafusion/ffi/src/arrow_wrappers.rs

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use abi_stable::StableAbi;
2121
use arrow::{
2222
array::{make_array, ArrayRef},
2323
datatypes::{Schema, SchemaRef},
24-
ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema},
24+
error::ArrowError,
25+
ffi::{from_ffi, to_ffi, FFI_ArrowArray, FFI_ArrowSchema},
2526
};
2627
use log::error;
2728

@@ -44,16 +45,19 @@ impl From<SchemaRef> for WrappedSchema {
4445
WrappedSchema(ffi_schema)
4546
}
4647
}
48+
/// Some functions are expected to always succeed, like getting the schema from a TableProvider.
49+
/// Since going through the FFI always has the potential to fail, we need to catch these errors,
50+
/// give the user a warning, and return some kind of result. In this case we default to an
51+
/// empty schema.
52+
#[cfg(not(tarpaulin_include))]
53+
fn catch_df_schema_error(e: ArrowError) -> Schema {
54+
error!("Unable to convert from FFI_ArrowSchema to DataFusion Schema in FFI_PlanProperties. {e}");
55+
Schema::empty()
56+
}
4757

4858
impl From<WrappedSchema> for SchemaRef {
4959
fn from(value: WrappedSchema) -> Self {
50-
let schema = match Schema::try_from(&value.0) {
51-
Ok(s) => s,
52-
Err(e) => {
53-
error!("Unable to convert from FFI_ArrowSchema to DataFusion Schema in FFI_PlanProperties. {e}");
54-
Schema::empty()
55-
}
56-
};
60+
let schema = Schema::try_from(&value.0).unwrap_or_else(catch_df_schema_error);
5761
Arc::new(schema)
5862
}
5963
}
@@ -71,11 +75,22 @@ pub struct WrappedArray {
7175
}
7276

7377
impl TryFrom<WrappedArray> for ArrayRef {
74-
type Error = arrow::error::ArrowError;
78+
type Error = ArrowError;
7579

7680
fn try_from(value: WrappedArray) -> Result<Self, Self::Error> {
7781
let data = unsafe { from_ffi(value.array, &value.schema.0)? };
7882

7983
Ok(make_array(data))
8084
}
8185
}
86+
87+
impl TryFrom<&ArrayRef> for WrappedArray {
88+
type Error = ArrowError;
89+
90+
fn try_from(array: &ArrayRef) -> Result<Self, Self::Error> {
91+
let (array, schema) = to_ffi(&array.to_data())?;
92+
let schema = WrappedSchema(schema);
93+
94+
Ok(WrappedArray { array, schema })
95+
}
96+
}

datafusion/ffi/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ pub mod schema_provider;
3434
pub mod session_config;
3535
pub mod table_provider;
3636
pub mod table_source;
37+
pub mod udaf;
3738
pub mod udf;
3839
pub mod udtf;
40+
pub mod udwf;
3941
pub mod util;
4042
pub mod volatility;
4143

datafusion/ffi/src/plan_properties.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,10 @@ impl From<FFI_EmissionType> for EmissionType {
300300

301301
#[cfg(test)]
302302
mod tests {
303-
use datafusion::physical_plan::Partitioning;
303+
use datafusion::{
304+
physical_expr::{LexOrdering, PhysicalSortExpr},
305+
physical_plan::Partitioning,
306+
};
304307

305308
use super::*;
306309

@@ -311,8 +314,13 @@ mod tests {
311314
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
312315

313316
let original_props = PlanProperties::new(
314-
EquivalenceProperties::new(schema),
315-
Partitioning::UnknownPartitioning(3),
317+
EquivalenceProperties::new(Arc::clone(&schema)).with_reorder(
318+
LexOrdering::new(vec![PhysicalSortExpr {
319+
expr: datafusion::physical_plan::expressions::col("a", &schema)?,
320+
options: Default::default(),
321+
}]),
322+
),
323+
Partitioning::RoundRobinBatch(3),
316324
EmissionType::Incremental,
317325
Boundedness::Bounded,
318326
);

0 commit comments

Comments
 (0)