Skip to content

Commit 73fbd48

Browse files
alambDandandan
andauthored
Upgrade DataFusion to arrow-rs/parquet 58.0.0 / object_store 0.13.0 (#19728)
## Which issue does this PR close? - Follow on to #19355 - related to apache/arrow-rs#8466 - Closes #17455 ## Rationale for this change Keep datafusion up to date (and test Arrow using DataFusion tests) ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Daniël Heres <danielheres@gmail.com>
1 parent acec058 commit 73fbd48

File tree

55 files changed

+438
-460
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+438
-460
lines changed

Cargo.lock

Lines changed: 71 additions & 79 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,19 +91,19 @@ ahash = { version = "0.8", default-features = false, features = [
9191
"runtime-rng",
9292
] }
9393
apache-avro = { version = "0.21", default-features = false }
94-
arrow = { version = "57.3.0", features = [
94+
arrow = { version = "58.0.0", features = [
9595
"prettyprint",
9696
"chrono-tz",
9797
] }
98-
arrow-buffer = { version = "57.2.0", default-features = false }
99-
arrow-flight = { version = "57.3.0", features = [
98+
arrow-buffer = { version = "58.0.0", default-features = false }
99+
arrow-flight = { version = "58.0.0", features = [
100100
"flight-sql-experimental",
101101
] }
102-
arrow-ipc = { version = "57.2.0", default-features = false, features = [
102+
arrow-ipc = { version = "58.0.0", default-features = false, features = [
103103
"lz4",
104104
] }
105-
arrow-ord = { version = "57.2.0", default-features = false }
106-
arrow-schema = { version = "57.2.0", default-features = false }
105+
arrow-ord = { version = "58.0.0", default-features = false }
106+
arrow-schema = { version = "58.0.0", default-features = false }
107107
async-trait = "0.1.89"
108108
bigdecimal = "0.4.8"
109109
bytes = "1.11"
@@ -165,9 +165,9 @@ liblzma = { version = "0.4.6", features = ["static"] }
165165
log = "^0.4"
166166
memchr = "2.8.0"
167167
num-traits = { version = "0.2" }
168-
object_store = { version = "0.12.5", default-features = false }
168+
object_store = { version = "0.13.1", default-features = false }
169169
parking_lot = "0.12"
170-
parquet = { version = "57.3.0", default-features = false, features = [
170+
parquet = { version = "58.0.0", default-features = false, features = [
171171
"arrow",
172172
"async",
173173
"object_store",

datafusion-cli/src/exec.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,7 @@ mod tests {
521521
use datafusion::common::plan_err;
522522

523523
use datafusion::prelude::SessionContext;
524+
use datafusion_common::assert_contains;
524525
use url::Url;
525526

526527
async fn create_external_table_test(location: &str, sql: &str) -> Result<()> {
@@ -714,25 +715,24 @@ mod tests {
714715
let err = create_external_table_test(location, &sql)
715716
.await
716717
.unwrap_err();
717-
assert!(err.to_string().contains("os error 2"));
718+
assert_contains!(err.to_string(), "os error 2");
718719

719720
// for service_account_key
720721
let sql = format!(
721722
"CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_key' '{service_account_key}') LOCATION '{location}'"
722723
);
723724
let err = create_external_table_test(location, &sql)
724725
.await
725-
.unwrap_err()
726-
.to_string();
727-
assert!(err.contains("No RSA key found in pem file"), "{err}");
726+
.unwrap_err();
727+
assert_contains!(err.to_string(), "Error reading pem file: no items found");
728728

729729
// for application_credentials_path
730730
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
731731
OPTIONS('gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'");
732732
let err = create_external_table_test(location, &sql)
733733
.await
734734
.unwrap_err();
735-
assert!(err.to_string().contains("os error 2"));
735+
assert_contains!(err.to_string(), "os error 2");
736736

737737
Ok(())
738738
}

datafusion-cli/src/main.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -617,8 +617,8 @@ mod tests {
617617
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
618618
+-----------------------------------+-----------------+---------------------+------+------------------+
619619
| alltypes_plain.parquet | 1851 | 8882 | 2 | page_index=false |
620-
| alltypes_tiny_pages.parquet | 454233 | 269266 | 2 | page_index=true |
621-
| lz4_raw_compressed_larger.parquet | 380836 | 1347 | 2 | page_index=false |
620+
| alltypes_tiny_pages.parquet | 454233 | 269074 | 2 | page_index=true |
621+
| lz4_raw_compressed_larger.parquet | 380836 | 1339 | 2 | page_index=false |
622622
+-----------------------------------+-----------------+---------------------+------+------------------+
623623
");
624624

@@ -648,8 +648,8 @@ mod tests {
648648
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
649649
+-----------------------------------+-----------------+---------------------+------+------------------+
650650
| alltypes_plain.parquet | 1851 | 8882 | 5 | page_index=false |
651-
| alltypes_tiny_pages.parquet | 454233 | 269266 | 2 | page_index=true |
652-
| lz4_raw_compressed_larger.parquet | 380836 | 1347 | 3 | page_index=false |
651+
| alltypes_tiny_pages.parquet | 454233 | 269074 | 2 | page_index=true |
652+
| lz4_raw_compressed_larger.parquet | 380836 | 1339 | 3 | page_index=false |
653653
+-----------------------------------+-----------------+---------------------+------+------------------+
654654
");
655655

@@ -841,8 +841,8 @@ mod tests {
841841
+---------------------+-----------+-----------------+------+
842842
| metadata_size_bytes | filename | file_size_bytes | etag |
843843
+---------------------+-----------+-----------------+------+
844-
| 212 | 0.parquet | 3645 | 0 |
845-
| 212 | 1.parquet | 3645 | 1 |
844+
| 212 | 0.parquet | 3642 | 0 |
845+
| 212 | 1.parquet | 3642 | 1 |
846846
+---------------------+-----------+-----------------+------+
847847
");
848848

datafusion-cli/src/object_storage/instrumented.rs

Lines changed: 59 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@ use datafusion::{
3636
execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
3737
};
3838
use futures::stream::{BoxStream, Stream};
39+
use futures::{StreamExt, TryStreamExt};
3940
use object_store::{
40-
GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta,
41-
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
42-
path::Path,
41+
CopyOptions, GetOptions, GetRange, GetResult, ListResult, MultipartUpload,
42+
ObjectMeta, ObjectStore, ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload,
43+
PutResult, Result, path::Path,
4344
};
4445
use parking_lot::{Mutex, RwLock};
4546
use url::Url;
@@ -230,40 +231,57 @@ impl InstrumentedObjectStore {
230231
let timestamp = Utc::now();
231232
let range = options.range.clone();
232233

234+
let head = options.head;
233235
let start = Instant::now();
234236
let ret = self.inner.get_opts(location, options).await?;
235237
let elapsed = start.elapsed();
236238

239+
let (op, size) = if head {
240+
(Operation::Head, None)
241+
} else {
242+
(
243+
Operation::Get,
244+
Some((ret.range.end - ret.range.start) as usize),
245+
)
246+
};
247+
237248
self.requests.lock().push(RequestDetails {
238-
op: Operation::Get,
249+
op,
239250
path: location.clone(),
240251
timestamp,
241252
duration: Some(elapsed),
242-
size: Some((ret.range.end - ret.range.start) as usize),
253+
size,
243254
range,
244255
extra_display: None,
245256
});
246257

247258
Ok(ret)
248259
}
249260

250-
async fn instrumented_delete(&self, location: &Path) -> Result<()> {
261+
fn instrumented_delete_stream(
262+
&self,
263+
locations: BoxStream<'static, Result<Path>>,
264+
) -> BoxStream<'static, Result<Path>> {
265+
let requests_captured = Arc::clone(&self.requests);
266+
251267
let timestamp = Utc::now();
252268
let start = Instant::now();
253-
self.inner.delete(location).await?;
254-
let elapsed = start.elapsed();
255-
256-
self.requests.lock().push(RequestDetails {
257-
op: Operation::Delete,
258-
path: location.clone(),
259-
timestamp,
260-
duration: Some(elapsed),
261-
size: None,
262-
range: None,
263-
extra_display: None,
264-
});
265-
266-
Ok(())
269+
self.inner
270+
.delete_stream(locations)
271+
.and_then(move |location| {
272+
let elapsed = start.elapsed();
273+
requests_captured.lock().push(RequestDetails {
274+
op: Operation::Delete,
275+
path: location.clone(),
276+
timestamp,
277+
duration: Some(elapsed),
278+
size: None,
279+
range: None,
280+
extra_display: None,
281+
});
282+
futures::future::ok(location)
283+
})
284+
.boxed()
267285
}
268286

269287
fn instrumented_list(
@@ -361,25 +379,6 @@ impl InstrumentedObjectStore {
361379

362380
Ok(())
363381
}
364-
365-
async fn instrumented_head(&self, location: &Path) -> Result<ObjectMeta> {
366-
let timestamp = Utc::now();
367-
let start = Instant::now();
368-
let ret = self.inner.head(location).await?;
369-
let elapsed = start.elapsed();
370-
371-
self.requests.lock().push(RequestDetails {
372-
op: Operation::Head,
373-
path: location.clone(),
374-
timestamp,
375-
duration: Some(elapsed),
376-
size: None,
377-
range: None,
378-
extra_display: None,
379-
});
380-
381-
Ok(ret)
382-
}
383382
}
384383

385384
impl fmt::Display for InstrumentedObjectStore {
@@ -429,12 +428,15 @@ impl ObjectStore for InstrumentedObjectStore {
429428
self.inner.get_opts(location, options).await
430429
}
431430

432-
async fn delete(&self, location: &Path) -> Result<()> {
431+
fn delete_stream(
432+
&self,
433+
locations: BoxStream<'static, Result<Path>>,
434+
) -> BoxStream<'static, Result<Path>> {
433435
if self.enabled() {
434-
return self.instrumented_delete(location).await;
436+
return self.instrumented_delete_stream(locations);
435437
}
436438

437-
self.inner.delete(location).await
439+
self.inner.delete_stream(locations)
438440
}
439441

440442
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
@@ -453,28 +455,24 @@ impl ObjectStore for InstrumentedObjectStore {
453455
self.inner.list_with_delimiter(prefix).await
454456
}
455457

456-
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
457-
if self.enabled() {
458-
return self.instrumented_copy(from, to).await;
459-
}
460-
461-
self.inner.copy(from, to).await
462-
}
463-
464-
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
465-
if self.enabled() {
466-
return self.instrumented_copy_if_not_exists(from, to).await;
467-
}
468-
469-
self.inner.copy_if_not_exists(from, to).await
470-
}
471-
472-
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
458+
async fn copy_opts(
459+
&self,
460+
from: &Path,
461+
to: &Path,
462+
options: CopyOptions,
463+
) -> Result<()> {
473464
if self.enabled() {
474-
return self.instrumented_head(location).await;
465+
return match options.mode {
466+
object_store::CopyMode::Create => {
467+
self.instrumented_copy_if_not_exists(from, to).await
468+
}
469+
object_store::CopyMode::Overwrite => {
470+
self.instrumented_copy(from, to).await
471+
}
472+
};
475473
}
476474

477-
self.inner.head(location).await
475+
self.inner.copy_opts(from, to, options).await
478476
}
479477
}
480478

datafusion-examples/examples/custom_data_source/adapter_serialization.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ use datafusion_proto::protobuf::{
6969
};
7070
use object_store::memory::InMemory;
7171
use object_store::path::Path;
72-
use object_store::{ObjectStore, PutPayload};
72+
use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
7373
use serde::{Deserialize, Serialize};
7474

7575
/// Example showing how to preserve custom adapter information during plan serialization.

datafusion-examples/examples/custom_data_source/csv_json_opener.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use datafusion::{
3636
use datafusion::datasource::physical_plan::FileScanConfigBuilder;
3737
use datafusion_examples::utils::datasets::ExampleDataset;
3838
use futures::StreamExt;
39-
use object_store::{ObjectStore, local::LocalFileSystem, memory::InMemory};
39+
use object_store::{ObjectStoreExt, local::LocalFileSystem, memory::InMemory};
4040

4141
/// This example demonstrates using the low level [`FileStream`] / [`FileOpener`] APIs to directly
4242
/// read data from (CSV/JSON) into Arrow RecordBatches.

datafusion-examples/examples/custom_data_source/custom_file_casts.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use datafusion_physical_expr_adapter::{
4040
};
4141
use object_store::memory::InMemory;
4242
use object_store::path::Path;
43-
use object_store::{ObjectStore, PutPayload};
43+
use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
4444

4545
// Example showing how to implement custom casting rules to adapt file schemas.
4646
// This example enforces that casts must be strictly widening: if the file type is Int64 and the table type is Int32, it will error

datafusion-examples/examples/custom_data_source/default_column_values.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use datafusion_physical_expr_adapter::{
4848
use futures::StreamExt;
4949
use object_store::memory::InMemory;
5050
use object_store::path::Path;
51-
use object_store::{ObjectStore, PutPayload};
51+
use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
5252

5353
// Metadata key for storing default values in field metadata
5454
const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
@@ -79,7 +79,7 @@ pub async fn default_column_values() -> Result<()> {
7979
let mut buf = vec![];
8080

8181
let props = WriterProperties::builder()
82-
.set_max_row_group_size(2)
82+
.set_max_row_group_row_count(Some(2))
8383
.build();
8484

8585
let mut writer =

datafusion-examples/examples/data_io/json_shredding.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use datafusion_physical_expr_adapter::{
4747
};
4848
use object_store::memory::InMemory;
4949
use object_store::path::Path;
50-
use object_store::{ObjectStore, PutPayload};
50+
use object_store::{ObjectStoreExt, PutPayload};
5151

5252
// Example showing how to implement custom filter rewriting for JSON shredding.
5353
//
@@ -76,7 +76,7 @@ pub async fn json_shredding() -> Result<()> {
7676
let mut buf = vec![];
7777

7878
let props = WriterProperties::builder()
79-
.set_max_row_group_size(2)
79+
.set_max_row_group_row_count(Some(2))
8080
.build();
8181

8282
let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props))

0 commit comments

Comments
 (0)