Skip to content

Commit 02823a0

Browse files
committed
DataFusion 44 upgrade
1 parent 54bc304 commit 02823a0

13 files changed

Lines changed: 370 additions & 187 deletions

File tree

Cargo.lock

Lines changed: 221 additions & 131 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,27 @@
22
members = ["clade", "object_store_factory"]
33

44
[workspace.dependencies]
5-
arrow = { version = "53.2.0", features = ["test_utils"] }
6-
arrow-buffer = "53.2.0"
7-
arrow-csv = "53.2.0"
8-
arrow-flight = "53.2.0"
5+
arrow = { version = "53.3.0", features = ["test_utils"] }
6+
arrow-buffer = "53.3.0"
7+
arrow-csv = "53.3.0"
8+
arrow-flight = "53.3.0"
99
# For the JSON format support
1010
# https://github.com/apache/arrow-rs/pull/2868
1111
# https://github.com/apache/arrow-rs/pull/2724
12-
arrow-integration-test = "53.2.0"
13-
arrow-row = "53.2.0"
14-
arrow-schema = "53.2.0"
12+
arrow-integration-test = "53.3.0"
13+
arrow-row = "53.3.0"
14+
arrow-schema = "53.3.0"
1515
async-trait = "0.1.83"
1616

17-
datafusion = { version = "43.0.0", features = ["backtrace"] }
18-
datafusion-common = "43.0.0"
19-
datafusion-expr = "43.0.0"
20-
datafusion-functions-nested = "43.0.0"
17+
datafusion = { version = "44.0.0", features = ["backtrace"] }
18+
datafusion-common = "44.0.0"
19+
datafusion-expr = "44.0.0"
20+
datafusion-functions-nested = "44.0.0"
2121

2222
futures = "0.3"
2323

24-
iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "1e01b7b7b2009076941f3ec1f04340e961d4628a" }
25-
iceberg-datafusion = { git = "https://github.com/splitgraph/iceberg-rust", rev = "1e01b7b7b2009076941f3ec1f04340e961d4628a" }
24+
iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "cb5c36565b641d2309d4e96fcf6a1ee21308aa0d" }
25+
iceberg-datafusion = { git = "https://github.com/splitgraph/iceberg-rust", rev = "cb5c36565b641d2309d4e96fcf6a1ee21308aa0d" }
2626

2727
itertools = ">=0.10.0"
2828
object_store = { version = "0.11", features = ["aws", "azure", "gcp"] }
@@ -87,8 +87,8 @@ clap = { version = "4.5.21", features = [ "derive" ] }
8787
config = "0.14.0"
8888

8989
# PG wire protocol support
90-
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-43-upgrade", optional = true }
91-
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-43-upgrade", optional = true }
90+
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-44-upgrade", optional = true }
91+
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-44-upgrade", optional = true }
9292

9393
dashmap = "6.1.0"
9494

@@ -99,7 +99,7 @@ datafusion-functions-nested = { workspace = true }
9999

100100
datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true }
101101

102-
deltalake = { git = "https://github.com/splitgraph/delta-rs", rev = "eff5735698279c12ae4a3aac2afa268d168242b2", features = ["datafusion", "s3"] }
102+
deltalake = { git = "https://github.com/splitgraph/delta-rs", rev = "a639dea6289839161baae15ff368db74dbac2074", features = ["datafusion", "s3"] }
103103
fastrand = "2.2.0"
104104

105105
futures = "0.3"
@@ -133,7 +133,7 @@ rustyline = "14.0"
133133
serde = { workspace = true }
134134
serde_json = { workspace = true }
135135
sha2 = ">=0.10.1"
136-
sqlparser = { version = "0.51", features = ["visitor"] }
136+
sqlparser = { version = "0.53", features = ["visitor"] }
137137
sqlx = { version = "0.7.1", features = [ "runtime-tokio-rustls", "sqlite", "any", "uuid" ] }
138138
strum = ">=0.24"
139139
strum_macros = ">=0.24"

datafusion_remote_tables/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ arrow-schema = { workspace = true }
1919
async-trait = { workspace = true }
2020

2121
# Remote query execution for a variety of DBs
22-
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-43-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }
22+
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-44-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }
2323

2424
datafusion = { workspace = true }
2525
datafusion-common = { workspace = true }

src/config/context.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use datafusion::execution::{
1515
};
1616
use datafusion::{
1717
common::Result,
18-
execution::runtime_env::{RuntimeConfig, RuntimeEnv},
18+
execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
1919
prelude::{SessionConfig, SessionContext},
2020
};
2121
use deltalake::delta_datafusion::DeltaTableFactory;
@@ -132,7 +132,7 @@ pub fn setup_metrics(metrics: &schema::Metrics) {
132132
}
133133

134134
pub async fn build_context(cfg: schema::SeafowlConfig) -> Result<SeafowlContext> {
135-
let mut runtime_config = RuntimeConfig::new();
135+
let mut runtime_env_builder = RuntimeEnvBuilder::new();
136136

137137
let memory_pool: Arc<dyn MemoryPool> =
138138
if let Some(max_memory) = cfg.runtime.max_memory {
@@ -143,18 +143,18 @@ pub async fn build_context(cfg: schema::SeafowlConfig) -> Result<SeafowlContext>
143143
Arc::new(UnboundedMemoryPool::default())
144144
};
145145

146-
runtime_config =
147-
runtime_config.with_memory_pool(Arc::new(MemoryPoolMetrics::new(memory_pool)));
146+
runtime_env_builder = runtime_env_builder
147+
.with_memory_pool(Arc::new(MemoryPoolMetrics::new(memory_pool)));
148148

149149
if let Some(temp_dir) = &cfg.runtime.temp_dir {
150-
runtime_config = runtime_config.with_temp_file_path(temp_dir);
150+
runtime_env_builder = runtime_env_builder.with_temp_file_path(temp_dir);
151151
}
152152

153153
let session_config = SessionConfig::from_env()?
154154
.with_information_schema(true)
155155
.with_default_catalog_and_schema(DEFAULT_DB, DEFAULT_SCHEMA);
156156

157-
let runtime_env = RuntimeEnv::try_new(runtime_config)?;
157+
let runtime_env = runtime_env_builder.build()?;
158158
let state = build_state_with_table_factories(session_config, Arc::new(runtime_env));
159159
let context = SessionContext::new_with_state(state);
160160

src/context/iceberg.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use iceberg::TableCreation;
3131
use opendal;
3232
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
3333
use parquet::file::properties::WriterProperties;
34-
use tracing::info;
34+
use tracing::{info, warn};
3535
use url::Url;
3636
use uuid::Uuid;
3737

@@ -363,11 +363,13 @@ pub async fn record_batches_to_iceberg(
363363
if let Some(opendal_error) =
364364
iceberg_error_source.downcast_ref::<opendal::Error>()
365365
{
366-
if opendal_error.kind() == opendal::ErrorKind::ConditionNotMatch {
366+
if opendal_error.kind() == opendal::ErrorKind::AlreadyExists {
367+
warn!("Failed writing new metadata file {new_metadata_location} due to a concurrency error");
367368
return Err(DataLoadingError::OptimisticConcurrencyError());
368369
}
369370
}
370371
}
372+
warn!("Failed writing new metadata file {new_metadata_location} due to: {iceberg_error}");
371373
return Err(iceberg_error.into());
372374
};
373375
info!("Wrote new metadata: {:?}", new_metadata_location);

src/context/logical.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ use datafusion_expr::logical_plan::{Extension, LogicalPlan};
2626
use deltalake::DeltaTable;
2727
use itertools::Itertools;
2828
use sqlparser::ast::{
29-
AlterTableOperation, CreateFunctionBody, CreateTable as CreateTableSql,
30-
Expr as SqlExpr, Expr, Insert, ObjectType, Query, Statement, TableFactor,
31-
TableWithJoins, Value, VisitMut,
29+
AlterTableOperation, CreateFunction as CreateFunctionSql, CreateFunctionBody,
30+
CreateTable as CreateTableSql, Expr as SqlExpr, Expr, Insert, ObjectType, Query,
31+
Statement, TableFactor, TableWithJoins, Value, VisitMut,
3232
};
3333
use std::sync::Arc;
3434
use tracing::debug;
@@ -223,13 +223,13 @@ impl SeafowlContext {
223223
state.statement_to_plan(stmt).await
224224
},
225225

226-
Statement::CreateFunction {
226+
Statement::CreateFunction(CreateFunctionSql {
227227
or_replace,
228228
temporary: false,
229229
name,
230230
function_body: Some(CreateFunctionBody::AsBeforeOptions(Expr::Value(Value::SingleQuotedString(details)))),
231231
..
232-
} => {
232+
}) => {
233233
// We abuse the fact that in CREATE FUNCTION AS [class_name], class_name can be an arbitrary string
234234
// and so we can get the user to put some JSON in there
235235
let function_details: CreateFunctionDetails = serde_json::from_str(details)

src/datafusion/parser.rs

Lines changed: 81 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,12 @@ use lazy_static::lazy_static;
3131
use sqlparser::ast::{
3232
CreateFunctionBody, Expr, ObjectName, OrderByExpr, TruncateTableTarget, Value,
3333
};
34-
use sqlparser::tokenizer::{TokenWithLocation, Word};
34+
use sqlparser::tokenizer::{TokenWithSpan, Word};
3535
use sqlparser::{
36-
ast::{ColumnDef, ColumnOptionDef, Statement as SQLStatement, TableConstraint},
36+
ast::{
37+
ColumnDef, ColumnOptionDef, CreateFunction, Statement as SQLStatement,
38+
TableConstraint,
39+
},
3740
dialect::{keywords::Keyword, Dialect, GenericDialect},
3841
parser::{Parser, ParserError},
3942
tokenizer::{Token, Tokenizer},
@@ -135,7 +138,7 @@ impl<'a> DFParser<'a> {
135138
fn expected<T>(
136139
&self,
137140
expected: &str,
138-
found: TokenWithLocation,
141+
found: TokenWithSpan,
139142
) -> Result<T, ParserError> {
140143
parser_err!(format!("Expected {expected}, found: {found}"))
141144
}
@@ -225,6 +228,7 @@ impl<'a> DFParser<'a> {
225228
only: false,
226229
identity: None,
227230
cascade: None,
231+
on_cluster: None,
228232
})))
229233
}
230234

@@ -242,6 +246,7 @@ impl<'a> DFParser<'a> {
242246
only: false,
243247
identity: None,
244248
cascade: None,
249+
on_cluster: None,
245250
})))
246251
}
247252

@@ -258,23 +263,76 @@ impl<'a> DFParser<'a> {
258263
CopyToSource::Relation(table_name)
259264
};
260265

261-
self.parser.expect_keyword(Keyword::TO)?;
266+
#[derive(Default)]
267+
struct Builder {
268+
stored_as: Option<String>,
269+
target: Option<String>,
270+
partitioned_by: Option<Vec<String>>,
271+
options: Option<Vec<(String, Value)>>,
272+
}
262273

263-
let target = self.parser.parse_literal_string()?;
274+
let mut builder = Builder::default();
264275

265-
// check for options in parens
266-
let options = if self.parser.peek_token().token == Token::LParen {
267-
self.parse_value_options()?
268-
} else {
269-
vec![]
276+
loop {
277+
if let Some(keyword) = self.parser.parse_one_of_keywords(&[
278+
Keyword::STORED,
279+
Keyword::TO,
280+
Keyword::PARTITIONED,
281+
Keyword::OPTIONS,
282+
Keyword::WITH,
283+
]) {
284+
match keyword {
285+
Keyword::STORED => {
286+
self.parser.expect_keyword(Keyword::AS)?;
287+
ensure_not_set(&builder.stored_as, "STORED AS")?;
288+
builder.stored_as = Some(self.parse_file_format()?);
289+
}
290+
Keyword::TO => {
291+
ensure_not_set(&builder.target, "TO")?;
292+
builder.target = Some(self.parser.parse_literal_string()?);
293+
}
294+
Keyword::WITH => {
295+
self.parser.expect_keyword(Keyword::HEADER)?;
296+
self.parser.expect_keyword(Keyword::ROW)?;
297+
return parser_err!("WITH HEADER ROW clause is no longer in use. Please use the OPTIONS clause with 'format.has_header' set appropriately, e.g., OPTIONS ('format.has_header' 'true')");
298+
}
299+
Keyword::PARTITIONED => {
300+
self.parser.expect_keyword(Keyword::BY)?;
301+
ensure_not_set(&builder.partitioned_by, "PARTITIONED BY")?;
302+
builder.partitioned_by = Some(self.parse_partitions()?);
303+
}
304+
Keyword::OPTIONS => {
305+
ensure_not_set(&builder.options, "OPTIONS")?;
306+
builder.options = Some(self.parse_value_options()?);
307+
}
308+
_ => {
309+
unreachable!()
310+
}
311+
}
312+
} else {
313+
let token = self.parser.next_token();
314+
if token == Token::EOF || token == Token::SemiColon {
315+
break;
316+
} else {
317+
return Err(ParserError::ParserError(format!(
318+
"Unexpected token {token}"
319+
)));
320+
}
321+
}
322+
}
323+
324+
let Some(target) = builder.target else {
325+
return Err(ParserError::ParserError(
326+
"Missing TO clause in COPY statement".into(),
327+
));
270328
};
271329

272330
Ok(Statement::CopyTo(CopyToStatement {
273331
source,
274332
target,
275-
options,
276-
partitioned_by: vec![],
277-
stored_as: None,
333+
partitioned_by: builder.partitioned_by.unwrap_or(vec![]),
334+
stored_as: builder.stored_as,
335+
options: builder.options.unwrap_or(vec![]),
278336
}))
279337
}
280338

@@ -358,7 +416,7 @@ impl<'a> DFParser<'a> {
358416
self.parser.expect_keyword(Keyword::AS)?;
359417
let body = self.parse_create_function_body_string()?;
360418

361-
let create_function = SQLStatement::CreateFunction {
419+
let create_function = SQLStatement::CreateFunction(CreateFunction {
362420
or_replace,
363421
temporary,
364422
if_not_exists: false,
@@ -374,7 +432,7 @@ impl<'a> DFParser<'a> {
374432
determinism_specifier: None,
375433
options: None,
376434
remote_connection: None,
377-
};
435+
});
378436

379437
Ok(Statement::Statement(Box::from(create_function)))
380438
}
@@ -544,6 +602,10 @@ impl<'a> DFParser<'a> {
544602
&mut self,
545603
unbounded: bool,
546604
) -> Result<Statement, ParserError> {
605+
let temporary = self
606+
.parser
607+
.parse_one_of_keywords(&[Keyword::TEMP, Keyword::TEMPORARY])
608+
.is_some();
547609
self.parser.expect_keyword(Keyword::TABLE)?;
548610
let if_not_exists =
549611
self.parser
@@ -606,10 +668,10 @@ impl<'a> DFParser<'a> {
606668
// Note that mixing both names and definitions is not allowed
607669
let peeked = self.parser.peek_nth_token(2);
608670
if peeked == Token::Comma || peeked == Token::RParen {
609-
// list of column names
671+
// List of column names
610672
builder.table_partition_cols = Some(self.parse_partitions()?)
611673
} else {
612-
// list of column defs
674+
// List of column defs
613675
let (cols, cons) = self.parse_columns()?;
614676
builder.table_partition_cols = Some(
615677
cols.iter().map(|col| col.name.to_string()).collect(),
@@ -665,7 +727,7 @@ impl<'a> DFParser<'a> {
665727
table_partition_cols: builder.table_partition_cols.unwrap_or(vec![]),
666728
order_exprs: builder.order_exprs,
667729
if_not_exists,
668-
temporary: false,
730+
temporary,
669731
unbounded,
670732
options: builder.options.unwrap_or(Vec::new()),
671733
constraints,
@@ -692,7 +754,7 @@ impl<'a> DFParser<'a> {
692754
options.push((key, value));
693755
let comma = self.parser.consume_token(&Token::Comma);
694756
if self.parser.consume_token(&Token::RParen) {
695-
// allow a trailing comma, even though it's not in standard
757+
// Allow a trailing comma, even though it's not in standard
696758
break;
697759
} else if !comma {
698760
return self.expected(

src/datafusion/utils.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ pub(crate) fn convert_simple_data_type(sql_type: &SQLDataType) -> Result<DataTyp
117117
| SQLDataType::Regclass
118118
| SQLDataType::Custom(_, _)
119119
| SQLDataType::Array(_)
120-
| SQLDataType::Enum(_)
120+
| SQLDataType::Enum(_, _)
121121
| SQLDataType::Set(_)
122122
| SQLDataType::MediumInt(_)
123123
| SQLDataType::UnsignedMediumInt(_)
@@ -163,6 +163,14 @@ pub(crate) fn convert_simple_data_type(sql_type: &SQLDataType) -> Result<DataTyp
163163
| SQLDataType::Nullable(_)
164164
| SQLDataType::LowCardinality(_)
165165
| SQLDataType::Trigger
166+
| SQLDataType::TinyBlob
167+
| SQLDataType::MediumBlob
168+
| SQLDataType::LongBlob
169+
| SQLDataType::TinyText
170+
| SQLDataType::MediumText
171+
| SQLDataType::LongText
172+
| SQLDataType::Bit(_)
173+
| SQLDataType::BitVarying(_)
166174
=> not_impl_err!(
167175
"Unsupported SQL type {sql_type:?}"
168176
),

src/sync/planner.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -984,8 +984,7 @@ mod tests {
984984
" RepartitionExec: partitioning=Hash",
985985
" ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, true as __lower_rel]",
986986
" DeltaScan",
987-
" RepartitionExec: partitioning=RoundRobinBatch",
988-
" ParquetExec: file_groups={1 group: [[]]}, projection=[c1, c2]",
987+
" ParquetExec: file_groups={1 group: [[]]}, projection=[c1, c2]",
989988
" CoalesceBatchesExec: target_batch_size=8192",
990989
" RepartitionExec: partitioning=Hash",
991990
" UnnestExec",

0 commit comments

Comments
 (0)