Skip to content

Commit d66d6b9

Browse files
corwinjoyadamreeveCopilotalamb
authored
feat: Parquet modular encryption (#16351)
* Initial commit to form PR for datafusion encryption support * Add tests for encryption configuration * Apply cargo fmt * Add a roundtrip encryption test to the parquet tests. * cargo fmt * Update test to add decryption parameter to called functions. * Try to get DataFrame.write_parquet to work with encryption. Doesn't quite, column encryption is broken. * Update datafusion/datasource-parquet/src/opener.rs Co-authored-by: Adam Reeve <[email protected]> * Update datafusion/datasource-parquet/src/source.rs Co-authored-by: Adam Reeve <[email protected]> * Fix write test in parquet.rs * Simplify encryption test. Remove unused imports. * Run cargo fmt. * Further streamline roundtrip test. * Change From methods for FileEncryptionProperties and FileDecryptionProperties to use references. * Change encryption config to directly hold column keys using custom config fields. * Fix generated field names in visit for encryptor and decryptor to use "." instead of "::" * 1. Disable parallel writes with enccryption. 2. Fixed unused header warning in config.rs. 3. Fix test case in encryption.rs to call conversion to ConfigFileDecryption properties correctly. * cargo fmt * Update datafusion/common/src/file_options/parquet_writer.rs Co-authored-by: Copilot <[email protected]> * fix variables shown in information schema test. * Backout bad suggestion from copilot * Remove unused serde reference Add an example to read and write encrypted parquet files. * cargo fmt * change file_format.rs to use global encryption options in struct. * Turn off page_index for encrypted example. Get encrypted example working with filter. * Tidy up example output. * Add missing license. Run taplo format * Update configs.md by running dev/update_config_docs.sh * Cargo fmt + clippy changes. * Add filter test for encrypted files. * Cargo clippy changes. * Fix link in README.md * Add issue tag for parallel writes. * Move file encryption and decryption properties out of global options * Use config_namespace_with_hashmap for column encryption/decryption props * Remove outdated docs on crypto settings. Signed-off-by: Corwin Joy <[email protected]> * 1. Add docs for using encryption configuration. 2. Add example SQL for using encryption from CLI. 3. Fix removed variables in test for configuration information. 4. Clippy and cargo fmt. Signed-off-by: Corwin Joy <[email protected]> * Update code to add missing ParquetOpener parameter due to merge from main Signed-off-by: Corwin Joy <[email protected]> * Add CLI documentation for Parquet options and provide an encryption example Signed-off-by: Corwin Joy <[email protected]> * Use ConfigFileDecryptionProperties in ParquetReadOptions Signed-off-by: Adam Reeve <[email protected]> * Implement default for ConfigFileEncryptionProperties Signed-off-by: Corwin Joy <[email protected]> * Add sqllogictest for parquet with encryption Signed-off-by: Corwin Joy <[email protected]> * Apply prettier changes from CI Signed-off-by: Corwin Joy <[email protected]> * logical conflift * fix another logical conflict --------- Signed-off-by: Corwin Joy <[email protected]> Signed-off-by: Adam Reeve <[email protected]> Co-authored-by: Adam Reeve <[email protected]> Co-authored-by: Copilot <[email protected]> Co-authored-by: Adam Reeve <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
1 parent 9f3cc7b commit d66d6b9

File tree

23 files changed

+1088
-33
lines changed

23 files changed

+1088
-33
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ parquet = { version = "55.1.0", default-features = false, features = [
159159
"arrow",
160160
"async",
161161
"object_store",
162+
"encryption",
162163
] }
163164
pbjson = { version = "0.7.0" }
164165
pbjson-types = "0.7"

benchmarks/src/bin/dfbench.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,11 @@ pub async fn main() -> Result<()> {
6060
Options::Cancellation(opt) => opt.run().await,
6161
Options::Clickbench(opt) => opt.run().await,
6262
Options::H2o(opt) => opt.run().await,
63-
Options::Imdb(opt) => opt.run().await,
63+
Options::Imdb(opt) => Box::pin(opt.run()).await,
6464
Options::ParquetFilter(opt) => opt.run().await,
6565
Options::Sort(opt) => opt.run().await,
6666
Options::SortTpch(opt) => opt.run().await,
67-
Options::Tpch(opt) => opt.run().await,
67+
Options::Tpch(opt) => Box::pin(opt.run()).await,
6868
Options::TpchConvert(opt) => opt.run().await,
6969
}
7070
}

benchmarks/src/bin/imdb.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ pub async fn main() -> Result<()> {
5353
env_logger::init();
5454
match ImdbOpt::from_args() {
5555
ImdbOpt::Benchmark(BenchmarkSubCommandOpt::DataFusionBenchmark(opt)) => {
56-
opt.run().await
56+
Box::pin(opt.run()).await
5757
}
5858
ImdbOpt::Convert(opt) => opt.run().await,
5959
}

benchmarks/src/bin/tpch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ async fn main() -> Result<()> {
5858
env_logger::init();
5959
match TpchOpt::from_args() {
6060
TpchOpt::Benchmark(BenchmarkSubCommandOpt::DataFusionBenchmark(opt)) => {
61-
opt.run().await
61+
Box::pin(opt.run()).await
6262
}
6363
TpchOpt::Convert(opt) => opt.run().await,
6464
}

datafusion-examples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ cargo run --example dataframe
6565
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
6666
- [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros
6767
- [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates
68+
- [`parquet_encrypted.rs`](examples/parquet_encrypted.rs): Read and write encrypted Parquet files using DataFusion
6869
- [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries
6970
- [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
7071
- [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`.
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use datafusion::common::DataFusionError;
19+
use datafusion::config::TableParquetOptions;
20+
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
21+
use datafusion::logical_expr::{col, lit};
22+
use datafusion::parquet::encryption::decrypt::FileDecryptionProperties;
23+
use datafusion::parquet::encryption::encrypt::FileEncryptionProperties;
24+
use datafusion::prelude::{ParquetReadOptions, SessionContext};
25+
use tempfile::TempDir;
26+
27+
#[tokio::main]
28+
async fn main() -> datafusion::common::Result<()> {
29+
// The SessionContext is the main high level API for interacting with DataFusion
30+
let ctx = SessionContext::new();
31+
32+
// Find the local path of "alltypes_plain.parquet"
33+
let testdata = datafusion::test_util::parquet_test_data();
34+
let filename = &format!("{testdata}/alltypes_plain.parquet");
35+
36+
// Read the sample parquet file
37+
let parquet_df = ctx
38+
.read_parquet(filename, ParquetReadOptions::default())
39+
.await?;
40+
41+
// Show information from the dataframe
42+
println!(
43+
"==============================================================================="
44+
);
45+
println!("Original Parquet DataFrame:");
46+
query_dataframe(&parquet_df).await?;
47+
48+
// Setup encryption and decryption properties
49+
let (encrypt, decrypt) = setup_encryption(&parquet_df)?;
50+
51+
// Create a temporary file location for the encrypted parquet file
52+
let tmp_dir = TempDir::new()?;
53+
let tempfile = tmp_dir.path().join("alltypes_plain-encrypted.parquet");
54+
let tempfile_str = tempfile.into_os_string().into_string().unwrap();
55+
56+
// Write encrypted parquet
57+
let mut options = TableParquetOptions::default();
58+
options.crypto.file_encryption = Some((&encrypt).into());
59+
parquet_df
60+
.write_parquet(
61+
tempfile_str.as_str(),
62+
DataFrameWriteOptions::new().with_single_file_output(true),
63+
Some(options),
64+
)
65+
.await?;
66+
67+
// Read encrypted parquet
68+
let ctx: SessionContext = SessionContext::new();
69+
let read_options =
70+
ParquetReadOptions::default().file_decryption_properties((&decrypt).into());
71+
72+
let encrypted_parquet_df = ctx.read_parquet(tempfile_str, read_options).await?;
73+
74+
// Show information from the dataframe
75+
println!("\n\n===============================================================================");
76+
println!("Encrypted Parquet DataFrame:");
77+
query_dataframe(&encrypted_parquet_df).await?;
78+
79+
Ok(())
80+
}
81+
82+
// Show information from the dataframe
83+
async fn query_dataframe(df: &DataFrame) -> Result<(), DataFusionError> {
84+
// show its schema using 'describe'
85+
println!("Schema:");
86+
df.clone().describe().await?.show().await?;
87+
88+
// Select three columns and filter the results
89+
// so that only rows where id > 1 are returned
90+
println!("\nSelected rows and columns:");
91+
df.clone()
92+
.select_columns(&["id", "bool_col", "timestamp_col"])?
93+
.filter(col("id").gt(lit(5)))?
94+
.show()
95+
.await?;
96+
97+
Ok(())
98+
}
99+
100+
// Setup encryption and decryption properties
101+
fn setup_encryption(
102+
parquet_df: &DataFrame,
103+
) -> Result<(FileEncryptionProperties, FileDecryptionProperties), DataFusionError> {
104+
let schema = parquet_df.schema();
105+
let footer_key = b"0123456789012345".to_vec(); // 128bit/16
106+
let column_key = b"1234567890123450".to_vec(); // 128bit/16
107+
108+
let mut encrypt = FileEncryptionProperties::builder(footer_key.clone());
109+
let mut decrypt = FileDecryptionProperties::builder(footer_key.clone());
110+
111+
for field in schema.fields().iter() {
112+
encrypt = encrypt.with_column_key(field.name().as_str(), column_key.clone());
113+
decrypt = decrypt.with_column_key(field.name().as_str(), column_key.clone());
114+
}
115+
116+
let encrypt = encrypt.build()?;
117+
let decrypt = decrypt.build()?;
118+
Ok((encrypt, decrypt))
119+
}

datafusion/common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ base64 = "0.22.1"
5858
chrono = { workspace = true }
5959
half = { workspace = true }
6060
hashbrown = { workspace = true }
61+
hex = "0.4.3"
6162
indexmap = { workspace = true }
6263
libc = "0.2.174"
6364
log = { workspace = true }

0 commit comments

Comments
 (0)