Skip to content

Commit a559c3d

Browse files
authored
error on unsupported datafusion configs (#2258)
1 parent 674fa7d commit a559c3d

File tree

3 files changed

+50
-22
lines changed

3 files changed

+50
-22
lines changed

vortex-datafusion/src/lib.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,22 @@ const SUPPORTED_BINARY_OPS: &[Operator] = &[
2828
];
2929

3030
fn supported_data_types(dt: DataType) -> bool {
31+
use DataType::*;
3132
let is_supported = dt.is_integer()
3233
|| dt.is_floating()
3334
|| dt.is_null()
34-
|| dt == DataType::Boolean
35-
|| dt == DataType::Binary
36-
|| dt == DataType::Utf8
37-
|| dt == DataType::Binary
38-
|| dt == DataType::BinaryView
39-
|| dt == DataType::Utf8View
40-
|| dt == DataType::Date32
41-
|| dt == DataType::Date64
4235
|| matches!(
4336
dt,
44-
DataType::Timestamp(_, _) | DataType::Time32(_) | DataType::Time64(_)
37+
Boolean
38+
| Utf8
39+
| Utf8View
40+
| Binary
41+
| BinaryView
42+
| Date32
43+
| Date64
44+
| Timestamp(_, _)
45+
| Time32(_)
46+
| Time64(_)
4547
);
4648

4749
if !is_supported {

vortex-datafusion/src/persistent/execution.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,7 @@ impl ExecutionPlan for VortexExec {
114114
let object_store = context
115115
.runtime_env()
116116
.object_store(&self.file_scan_config.object_store_url)?;
117-
118117
let file_schema = self.file_scan_config.file_schema.clone();
119-
120118
let projection = self.file_scan_config.projection.as_ref().map(|projection| {
121119
projection
122120
.iter()

vortex-datafusion/src/persistent/format.rs

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,13 @@ pub struct VortexFormat {
4646
/// Options to configure the [`VortexFormat`].
4747
#[derive(Debug)]
4848
pub struct VortexFormatOptions {
49-
/// The number of concurrency tasks used to infer the schema of multiple Vortex files.
50-
pub concurrent_infer_schema_ops: usize,
5149
/// The size of the in-memory [`vortex_file::FileLayout`] cache.
5250
pub cache_size_mb: usize,
5351
}
5452

5553
impl Default for VortexFormatOptions {
5654
fn default() -> Self {
57-
Self {
58-
concurrent_infer_schema_ops: 64,
59-
cache_size_mb: 256,
60-
}
55+
Self { cache_size_mb: 256 }
6156
}
6257
}
6358

@@ -127,6 +122,11 @@ impl VortexFormat {
127122
opts,
128123
}
129124
}
125+
126+
/// Return the format specific configuration
127+
pub fn options(&self) -> &VortexFormatOptions {
128+
&self.opts
129+
}
130130
}
131131

132132
#[async_trait]
@@ -153,24 +153,31 @@ impl FileFormat for VortexFormat {
153153

154154
async fn infer_schema(
155155
&self,
156-
_state: &SessionState,
156+
state: &SessionState,
157157
store: &Arc<dyn ObjectStore>,
158158
objects: &[ObjectMeta],
159159
) -> DFResult<SchemaRef> {
160-
let file_schemas = stream::iter(objects.iter().cloned())
160+
let mut file_schemas = stream::iter(objects.iter().cloned())
161161
.map(|o| {
162162
let store = store.clone();
163163
let cache = self.file_layout_cache.clone();
164164
async move {
165165
let file_layout = cache.try_get(&o, store).await?;
166166
let inferred_schema = infer_schema(file_layout.dtype())?;
167-
VortexResult::Ok(inferred_schema)
167+
VortexResult::Ok((o.location, inferred_schema))
168168
}
169169
})
170-
.buffered(self.opts.concurrent_infer_schema_ops)
170+
.buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
171171
.try_collect::<Vec<_>>()
172172
.await?;
173173

174+
// Get consistent order of schemas for `Schema::try_merge`, as some filesystems don't have deterministic listing orders
175+
file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
176+
let file_schemas = file_schemas
177+
.into_iter()
178+
.map(|(_, schema)| schema)
179+
.collect::<Vec<_>>();
180+
174181
let schema = Arc::new(Schema::try_merge(file_schemas)?);
175182

176183
Ok(schema)
@@ -272,6 +279,23 @@ impl FileFormat for VortexFormat {
272279
) -> DFResult<Arc<dyn ExecutionPlan>> {
273280
let metrics = ExecutionPlanMetricsSet::new();
274281

282+
if file_scan_config
283+
.file_groups
284+
.iter()
285+
.flatten()
286+
.any(|f| f.range.is_some())
287+
{
288+
return not_impl_err!("File level partitioning isn't implemented yet for Vortex");
289+
}
290+
291+
if file_scan_config.limit.is_some() {
292+
return not_impl_err!("Limit isn't implemented yet for Vortex");
293+
}
294+
295+
if !file_scan_config.table_partition_cols.is_empty() {
296+
return not_impl_err!("Hive style partitioning isn't implemented yet for Vortex");
297+
}
298+
275299
let exec = VortexExec::try_new(
276300
file_scan_config,
277301
metrics,
@@ -292,7 +316,11 @@ impl FileFormat for VortexFormat {
292316
order_requirements: Option<LexRequirement>,
293317
) -> DFResult<Arc<dyn ExecutionPlan>> {
294318
if conf.insert_op != InsertOp::Append {
295-
return not_impl_err!("Overwrites are not implemented yet for Parquet");
319+
return not_impl_err!("Overwrites are not implemented yet for Vortex");
320+
}
321+
322+
if !conf.table_partition_cols.is_empty() {
323+
return not_impl_err!("Hive style partitioning isn't implemented yet for Vortex");
296324
}
297325

298326
let sink_schema = conf.output_schema().clone();

0 commit comments

Comments
 (0)