-
Notifications
You must be signed in to change notification settings - Fork 1.7k
[WIP] Upgrade to arrow/parquet 57.0.0 #17888
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Many of the current failures are due because this used to work: select arrow_cast('2021-01-01T00:00:00', 'Timestamp(Nanosecond, Some("-05:00"))' or SELECT arrow_cast(secs, 'Timestamp(Millisecond, None)') FROM t After the arrow 57 upgrade it fails with errors like
# arrow_typeof_timestamp
query T
SELECT arrow_typeof(now()::timestamp)
----
Timestamp(ns) I believe the problem is that the format of the timezone has changed into I think what we need to do is support both formats for backwards compatibility. I will work on an upstream issue |
ed43cc0
to
ee2de0c
Compare
|
||
// Create Flight client | ||
let mut client = FlightServiceClient::connect("http://localhost:50051").await?; | ||
let endpoint = Endpoint::new("http://localhost:50051")?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is due to new version of tonic
|
||
// add an initial FlightData message that sends schema | ||
let options = arrow::ipc::writer::IpcWriteOptions::default(); | ||
let mut compression_context = CompressionContext::default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
let validate = | ||
T::validate_decimal_precision(new_value, self.target_precision); | ||
let validate = T::validate_decimal_precision( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to this (get better messages)
List(Field { name: "item", data_type: List(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) List(Field { name: "item", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) List(Field { name: "item", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) | ||
List(Field { name: "item", data_type: List(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) List(Field { name: "item", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) List(Field { name: "item", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) | ||
List(Field { name: "item", data_type: List(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) List(Field { name: "item", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) List(Field { name: "item", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) | ||
List(nullable List(nullable Int64)) List(nullable Float64) List(nullable Utf8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many of the diffs in this file are related to improvements in DataType
display, tracked in this ticket
I will try and call out individual changes when I see them. Lists are way nicer now:
05)--------ProjectionExec: expr=[] | ||
06)----------CoalesceBatchesExec: target_batch_size=8192 | ||
07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([Literal { value: Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("a"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("b"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, Literal { value: Utf8View("c"), field: Field { name: "lit", data_type: Utf8View, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }]) | ||
07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([Literal { value: Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"), field: Field { name: "lit", data_type: Utf8View } }, Literal { value: Utf8View("a"), field: Field { name: "lit", data_type: Utf8View } }, Literal { value: Utf8View("b"), field: Field { name: "lit", data_type: Utf8View } }, Literal { value: Utf8View("c"), field: Field { name: "lit", data_type: Utf8View } }]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SELECT arrow_typeof(now()::timestamp) | ||
---- | ||
Timestamp(Nanosecond, None) | ||
Timestamp(ns) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
## Timestamps: Create a table | ||
|
||
statement ok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timestamp format has changed (improved!) so let's also add tests for the new format
pbjson-types = { workspace = true } | ||
prost = { workspace = true } | ||
substrait = { version = "0.58", features = ["serde"] } | ||
substrait = { version = "0.59", features = ["serde"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since prost is updated, we also must update substrait
9d06200
to
1b7b559
Compare
8ecbbed
to
d3b328b
Compare
f61623e
to
9f6a390
Compare
d5bd26e
to
7709acc
Compare
datafusion-cli/src/main.rs
Outdated
| alltypes_plain.parquet | 1851 | 10181 | 2 | page_index=false | | ||
| alltypes_tiny_pages.parquet | 454233 | 881418 | 2 | page_index=true | | ||
| lz4_raw_compressed_larger.parquet | 380836 | 2939 | 2 | page_index=false | | ||
| alltypes_plain.parquet | 1851 | 10309 | 2 | page_index=false | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know why the metadata size has increased. I will investigate
let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, \ | ||
Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"; | ||
assert_eq!(expected, arrow_schema.to_string()); | ||
insta::assert_snapshot!(arrow_schema.to_string(), @r#"Field { "c0": nullable Boolean }, Field { "c1": nullable Boolean }"#); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
many many diffs are due to the changes in formatting of Fields and DataTypes (see below)
+----------------------+ | ||
| arrow_typeof(test.l) | | ||
+----------------------+ | ||
| List(nullable Int32) | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the new display is much easier to read in my opinion
Ok, the tests are now looking good enough to test with the new thrift decoder |
7709acc
to
5e1ea80
Compare
🤖 |
🤖: Benchmark completed Details
|
🤖 |
🤖: Benchmark completed Details
|
🤖 |
🤖: Benchmark completed Details
|
56c012a
to
3d43be9
Compare
caused by | ||
Error during planning: Cannot automatically convert Null to Float16 | ||
|
||
NULL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
due to this from @chenkovsky
🤖 |
🤖: Benchmark completed Details
|
🤖 |
🤖: Benchmark completed Details
|
d7a186f
to
28452cf
Compare
async fn predicate_cache_pushdown_disable() -> datafusion_common::Result<()> { | ||
// Can disable the cache even with filter pushdown by setting the size to 0. In this case we | ||
// expect the inner records are reported but no records are read from the cache | ||
// no records are read from the cache and no metrics are reported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is due to @nuno-faria 's work to close ❤️
I was somewhat surprised that there are no metrics at all reported, but I think it makes sense as the reporting is currently only done by the cache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @alamb for handling the upgrade.
I think we could add a test to confirm that datafusion.execution.parquet.max_predicate_cache_size
now works as expected, by analyzing the explain output.
Here is a potential example:
#[tokio::test]
async fn test_disable_predicate_cache() {
let mut parquet_options = TableParquetOptions::new();
parquet_options.global.data_page_row_count_limit = 1;
parquet_options.global.write_batch_size = 1;
let tempdir = TempDir::new_in(Path::new(".")).unwrap();
let path = tempdir.path().to_str().unwrap();
let ctx = SessionContext::new();
ctx.sql("select i from generate_series(1, 1000) t(i)")
.await
.unwrap()
.write_parquet(path, DataFrameWriteOptions::new(), Some(parquet_options))
.await
.unwrap();
let regex = Regex::new(r"bytes_scanned=(\d+)").ok().unwrap();
let config = SessionConfig::new()
.set_bool("datafusion.execution.parquet.pushdown_filters", true);
let ctx = SessionContext::new_with_config(config);
// default: predicate cache is enabled
ctx.register_parquet("t", path, ParquetReadOptions::new())
.await
.unwrap();
let plan = ctx
.sql("select * from t where i = 123")
.await
.unwrap()
.explain(false, true)
.unwrap()
.to_string()
.await
.unwrap();
let captures = regex.captures(&plan).unwrap();
let bytes_scanned_default =
captures.get(1).unwrap().as_str().parse::<usize>().unwrap();
// disabling the predicate cache by setting the limit to 0
ctx.sql("set datafusion.execution.parquet.max_predicate_cache_size = 0")
.await
.unwrap()
.collect()
.await
.unwrap();
ctx.deregister_table("t").unwrap();
ctx.register_parquet("t", path, ParquetReadOptions::new())
.await
.unwrap();
let plan = ctx
.sql("select * from t where i = 123")
.await
.unwrap()
.explain(false, true)
.unwrap()
.to_string()
.await
.unwrap();
let captures = regex.captures(&plan).unwrap();
let bytes_scanned_cache_disabled =
captures.get(1).unwrap().as_str().parse::<usize>().unwrap();
// with the cache disabled, fewer data pages should be retrieved (the predicate cache can
// retrieve multiple data pages when their size is less than batch_size)
assert_eq!(bytes_scanned_default, 31405);
assert_eq!(bytes_scanned_cache_disabled, 1691);
}
| alltypes_plain.parquet | 1851 | 10181 | 2 | page_index=false | | ||
| alltypes_tiny_pages.parquet | 454233 | 881418 | 2 | page_index=true | | ||
| lz4_raw_compressed_larger.parquet | 380836 | 2939 | 2 | page_index=false | | ||
| alltypes_plain.parquet | 1851 | 7166 | 2 | page_index=false | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why the heap size of the metadata is reported to be so much smaller. I don't expect our thrift decoding work to have reduce the in-memory size of the parquet metadata 🤔
@etseidl any ideas? I can perhaps go audit the heap_size
implementations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did box some of the encryption structures...but maybe the HeapSize
impl for Box
is still wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, and FileDecryptor
in ParquetMetaData
was boxed but still not included in memory_size
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, and the column index should also be much smaller since we transposed it from an array of structs to a struct of arrays. alltypes_tiny_pages.parquet
has an insane number of pages, so any savings in the column index will be magnified.
Which issue does this PR close?
57.0.0
(October 2025) arrow-rs#7835TODO:
Rationale for this change
Upgrade to the latest arrow
Also, there are several new features in arrow-57 that I want to be able to test including Variant, arrow-avro, and a new metadata reader.
What changes are included in this PR?
Are these changes tested?
By CI
Are there any user-facing changes?
New arrow