Skip to content

Commit 4dc1a4f

Browse files
adamreevealamb
andauthored
Make EncryptionFactory async (#17342)
Co-authored-by: Andrew Lamb <[email protected]>
1 parent e8d56e1 commit 4dc1a4f

File tree

7 files changed

+96
-40
lines changed

7 files changed

+96
-40
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-examples/examples/parquet_encrypted_with_kms.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
1919
use arrow_schema::SchemaRef;
20+
use async_trait::async_trait;
2021
use base64::Engine;
2122
use datafusion::common::extensions_options;
2223
use datafusion::config::{EncryptionFactoryOptions, TableParquetOptions};
@@ -211,6 +212,7 @@ struct TestEncryptionFactory {}
211212

212213
/// `EncryptionFactory` is a DataFusion trait for types that generate
213214
/// file encryption and decryption properties.
215+
#[async_trait]
214216
impl EncryptionFactory for TestEncryptionFactory {
215217
/// Generate file encryption properties to use when writing a Parquet file.
216218
/// The `schema` is provided so that it may be used to dynamically configure
@@ -219,7 +221,7 @@ impl EncryptionFactory for TestEncryptionFactory {
219221
/// but other implementations may want to use this to compute an
220222
/// AAD prefix for the file, or to allow use of external key material
221223
/// (where key metadata is stored in a JSON file alongside Parquet files).
222-
fn get_file_encryption_properties(
224+
async fn get_file_encryption_properties(
223225
&self,
224226
options: &EncryptionFactoryOptions,
225227
schema: &SchemaRef,
@@ -262,7 +264,7 @@ impl EncryptionFactory for TestEncryptionFactory {
262264
/// Generate file decryption properties to use when reading a Parquet file.
263265
/// Rather than provide the AES keys directly for decryption, we set a `KeyRetriever`
264266
/// that can determine the keys using the encryption metadata.
265-
fn get_file_decryption_properties(
267+
async fn get_file_decryption_properties(
266268
&self,
267269
_options: &EncryptionFactoryOptions,
268270
_file_path: &Path,

datafusion/core/tests/parquet/encryption.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use arrow::array::{ArrayRef, Int32Array, StringArray};
2121
use arrow::record_batch::RecordBatch;
2222
use arrow_schema::{DataType, SchemaRef};
23+
use async_trait::async_trait;
2324
use datafusion::dataframe::DataFrameWriteOptions;
2425
use datafusion::datasource::listing::ListingOptions;
2526
use datafusion::prelude::{ParquetReadOptions, SessionContext};
@@ -175,7 +176,9 @@ async fn round_trip_parquet_with_encryption_factory() {
175176
// Crypto factory should have generated one key per partition file
176177
assert_eq!(encryption_factory.encryption_keys.lock().unwrap().len(), 3);
177178

178-
verify_table_encrypted(tmpdir.path(), &encryption_factory).unwrap();
179+
verify_table_encrypted(tmpdir.path(), &encryption_factory)
180+
.await
181+
.unwrap();
179182

180183
// Registering table without decryption properties should fail
181184
let table_path = format!("file://{}/", tmpdir.path().to_str().unwrap());
@@ -255,7 +258,7 @@ async fn round_trip_parquet_with_encryption_factory() {
255258
assert_batches_sorted_eq!(expected, &table);
256259
}
257260

258-
fn verify_table_encrypted(
261+
async fn verify_table_encrypted(
259262
table_path: &Path,
260263
encryption_factory: &Arc<MockEncryptionFactory>,
261264
) -> datafusion_common::Result<()> {
@@ -267,7 +270,7 @@ fn verify_table_encrypted(
267270
if path.is_dir() {
268271
directories.push(path);
269272
} else {
270-
verify_file_encrypted(&path, encryption_factory)?;
273+
verify_file_encrypted(&path, encryption_factory).await?;
271274
files_visited += 1;
272275
}
273276
}
@@ -276,7 +279,7 @@ fn verify_table_encrypted(
276279
Ok(())
277280
}
278281

279-
fn verify_file_encrypted(
282+
async fn verify_file_encrypted(
280283
file_path: &Path,
281284
encryption_factory: &Arc<MockEncryptionFactory>,
282285
) -> datafusion_common::Result<()> {
@@ -296,7 +299,8 @@ fn verify_file_encrypted(
296299

297300
let object_path = object_store::path::Path::from(file_path_str);
298301
let decryption_properties = encryption_factory
299-
.get_file_decryption_properties(&options, &object_path)?
302+
.get_file_decryption_properties(&options, &object_path)
303+
.await?
300304
.unwrap();
301305

302306
let reader_options =
@@ -325,8 +329,9 @@ struct MockEncryptionFactory {
325329
pub counter: AtomicU8,
326330
}
327331

332+
#[async_trait]
328333
impl EncryptionFactory for MockEncryptionFactory {
329-
fn get_file_encryption_properties(
334+
async fn get_file_encryption_properties(
330335
&self,
331336
config: &EncryptionFactoryOptions,
332337
_schema: &SchemaRef,
@@ -344,7 +349,7 @@ impl EncryptionFactory for MockEncryptionFactory {
344349
Ok(Some(encryption_properties))
345350
}
346351

347-
fn get_file_decryption_properties(
352+
async fn get_file_decryption_properties(
348353
&self,
349354
config: &EncryptionFactoryOptions,
350355
file_path: &object_store::path::Path,

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ fn clear_metadata(
302302
}
303303

304304
#[cfg(feature = "parquet_encryption")]
305-
fn get_file_decryption_properties(
305+
async fn get_file_decryption_properties(
306306
state: &dyn Session,
307307
options: &TableParquetOptions,
308308
file_path: &Path,
@@ -314,10 +314,12 @@ fn get_file_decryption_properties(
314314
Some(factory_id) => {
315315
let factory =
316316
state.runtime_env().parquet_encryption_factory(factory_id)?;
317-
factory.get_file_decryption_properties(
318-
&options.crypto.factory_options,
319-
file_path,
320-
)?
317+
factory
318+
.get_file_decryption_properties(
319+
&options.crypto.factory_options,
320+
file_path,
321+
)
322+
.await?
321323
}
322324
None => None,
323325
},
@@ -326,7 +328,7 @@ fn get_file_decryption_properties(
326328
}
327329

328330
#[cfg(not(feature = "parquet_encryption"))]
329-
fn get_file_decryption_properties(
331+
async fn get_file_decryption_properties(
330332
_state: &dyn Session,
331333
_options: &TableParquetOptions,
332334
_file_path: &Path,
@@ -379,7 +381,8 @@ impl FileFormat for ParquetFormat {
379381
state,
380382
&self.options,
381383
&object.location,
382-
)?;
384+
)
385+
.await?;
383386
let result = DFParquetMetadata::new(store.as_ref(), object)
384387
.with_metadata_size_hint(self.metadata_size_hint())
385388
.with_decryption_properties(file_decryption_properties.as_ref())
@@ -437,7 +440,8 @@ impl FileFormat for ParquetFormat {
437440
object: &ObjectMeta,
438441
) -> Result<Statistics> {
439442
let file_decryption_properties =
440-
get_file_decryption_properties(state, &self.options, &object.location)?;
443+
get_file_decryption_properties(state, &self.options, &object.location)
444+
.await?;
441445
let file_metadata_cache =
442446
state.runtime_env().cache_manager.get_file_metadata_cache();
443447
DFParquetMetadata::new(store, object)
@@ -1119,7 +1123,7 @@ impl ParquetSink {
11191123

11201124
/// Create writer properties based upon configuration settings,
11211125
/// including partitioning and the inclusion of arrow schema metadata.
1122-
fn create_writer_props(
1126+
async fn create_writer_props(
11231127
&self,
11241128
runtime: &Arc<RuntimeEnv>,
11251129
path: &Path,
@@ -1147,7 +1151,8 @@ impl ParquetSink {
11471151
&parquet_opts,
11481152
schema,
11491153
path,
1150-
)?;
1154+
)
1155+
.await?;
11511156
Ok(builder.build())
11521157
}
11531158

@@ -1188,7 +1193,7 @@ impl ParquetSink {
11881193
}
11891194

11901195
#[cfg(feature = "parquet_encryption")]
1191-
fn set_writer_encryption_properties(
1196+
async fn set_writer_encryption_properties(
11921197
builder: WriterPropertiesBuilder,
11931198
runtime: &Arc<RuntimeEnv>,
11941199
parquet_opts: &TableParquetOptions,
@@ -1208,7 +1213,8 @@ fn set_writer_encryption_properties(
12081213
&parquet_opts.crypto.factory_options,
12091214
schema,
12101215
path,
1211-
)?;
1216+
)
1217+
.await?;
12121218
if let Some(file_encryption_properties) = file_encryption_properties {
12131219
return Ok(
12141220
builder.with_file_encryption_properties(file_encryption_properties)
@@ -1219,7 +1225,7 @@ fn set_writer_encryption_properties(
12191225
}
12201226

12211227
#[cfg(not(feature = "parquet_encryption"))]
1222-
fn set_writer_encryption_properties(
1228+
async fn set_writer_encryption_properties(
12231229
builder: WriterPropertiesBuilder,
12241230
_runtime: &Arc<RuntimeEnv>,
12251231
_parquet_opts: &TableParquetOptions,
@@ -1269,7 +1275,7 @@ impl FileSink for ParquetSink {
12691275
};
12701276

12711277
while let Some((path, mut rx)) = file_stream_rx.recv().await {
1272-
let parquet_props = self.create_writer_props(&runtime, &path)?;
1278+
let parquet_props = self.create_writer_props(&runtime, &path).await?;
12731279
if !allow_single_file_parallelism {
12741280
let mut writer = self
12751281
.create_async_arrow_writer(

datafusion/datasource-parquet/src/opener.rs

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ impl FileOpener for ParquetOpener {
112112
fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result<FileOpenFuture> {
113113
let file_range = file_meta.range.clone();
114114
let extensions = file_meta.extensions.clone();
115-
let file_location = file_meta.location();
115+
let file_location = file_meta.location().clone();
116116
let file_name = file_location.to_string();
117117
let file_metrics =
118118
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);
@@ -152,16 +152,18 @@ impl FileOpener for ParquetOpener {
152152
let mut predicate_file_schema = Arc::clone(&self.logical_file_schema);
153153

154154
let mut enable_page_index = self.enable_page_index;
155-
let file_decryption_properties =
156-
self.get_file_decryption_properties(file_location)?;
157-
158-
// For now, page index does not work with encrypted files. See:
159-
// https://github.com/apache/arrow-rs/issues/7629
160-
if file_decryption_properties.is_some() {
161-
enable_page_index = false;
162-
}
155+
let encryption_context = self.get_encryption_context();
163156

164157
Ok(Box::pin(async move {
158+
let file_decryption_properties = encryption_context
159+
.get_file_decryption_properties(&file_location)
160+
.await?;
161+
// For now, page index does not work with encrypted files. See:
162+
// https://github.com/apache/arrow-rs/issues/7629
163+
if file_decryption_properties.is_some() {
164+
enable_page_index = false;
165+
}
166+
165167
// Prune this file using the file level statistics and partition values.
166168
// Since dynamic filters may have been updated since planning it is possible that we are able
167169
// to prune files now that we couldn't prune at planning time.
@@ -508,9 +510,30 @@ where
508510
}
509511
}
510512

513+
#[derive(Default)]
514+
struct EncryptionContext {
515+
#[cfg(feature = "parquet_encryption")]
516+
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
517+
#[cfg(feature = "parquet_encryption")]
518+
encryption_factory: Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)>,
519+
}
520+
511521
#[cfg(feature = "parquet_encryption")]
512-
impl ParquetOpener {
513-
fn get_file_decryption_properties(
522+
impl EncryptionContext {
523+
fn new(
524+
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
525+
encryption_factory: Option<(
526+
Arc<dyn EncryptionFactory>,
527+
EncryptionFactoryOptions,
528+
)>,
529+
) -> Self {
530+
Self {
531+
file_decryption_properties,
532+
encryption_factory,
533+
}
534+
}
535+
536+
async fn get_file_decryption_properties(
514537
&self,
515538
file_location: &object_store::path::Path,
516539
) -> Result<Option<Arc<FileDecryptionProperties>>> {
@@ -520,7 +543,8 @@ impl ParquetOpener {
520543
}
521544
None => match &self.encryption_factory {
522545
Some((encryption_factory, encryption_config)) => Ok(encryption_factory
523-
.get_file_decryption_properties(encryption_config, file_location)?
546+
.get_file_decryption_properties(encryption_config, file_location)
547+
.await?
524548
.map(Arc::new)),
525549
None => Ok(None),
526550
},
@@ -529,12 +553,27 @@ impl ParquetOpener {
529553
}
530554

531555
#[cfg(not(feature = "parquet_encryption"))]
532-
impl ParquetOpener {
533-
fn get_file_decryption_properties(
556+
impl EncryptionContext {
557+
async fn get_file_decryption_properties(
534558
&self,
535559
_file_location: &object_store::path::Path,
536560
) -> Result<Option<Arc<FileDecryptionProperties>>> {
537-
Ok(self.file_decryption_properties.clone())
561+
Ok(None)
562+
}
563+
}
564+
565+
impl ParquetOpener {
566+
#[cfg(feature = "parquet_encryption")]
567+
fn get_encryption_context(&self) -> EncryptionContext {
568+
EncryptionContext::new(
569+
self.file_decryption_properties.clone(),
570+
self.encryption_factory.clone(),
571+
)
572+
}
573+
574+
#[cfg(not(feature = "parquet_encryption"))]
575+
fn get_encryption_context(&self) -> EncryptionContext {
576+
EncryptionContext::default()
538577
}
539578
}
540579

datafusion/execution/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ parquet_encryption = [
4444

4545
[dependencies]
4646
arrow = { workspace = true }
47+
async-trait = { workspace = true }
4748
dashmap = { workspace = true }
4849
datafusion-common = { workspace = true, default-features = true }
4950
datafusion-expr = { workspace = true }

datafusion/execution/src/parquet_encryption.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use arrow::datatypes::SchemaRef;
19+
use async_trait::async_trait;
1920
use dashmap::DashMap;
2021
use datafusion_common::config::EncryptionFactoryOptions;
2122
use datafusion_common::error::Result;
@@ -32,17 +33,18 @@ use std::sync::Arc;
3233
/// For example usage, see the [`parquet_encrypted_with_kms` example].
3334
///
3435
/// [`parquet_encrypted_with_kms` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/parquet_encrypted_with_kms.rs
36+
#[async_trait]
3537
pub trait EncryptionFactory: Send + Sync + std::fmt::Debug + 'static {
3638
/// Generate file encryption properties to use when writing a Parquet file.
37-
fn get_file_encryption_properties(
39+
async fn get_file_encryption_properties(
3840
&self,
3941
config: &EncryptionFactoryOptions,
4042
schema: &SchemaRef,
4143
file_path: &Path,
4244
) -> Result<Option<FileEncryptionProperties>>;
4345

4446
/// Generate file decryption properties to use when reading a Parquet file.
45-
fn get_file_decryption_properties(
47+
async fn get_file_decryption_properties(
4648
&self,
4749
config: &EncryptionFactoryOptions,
4850
file_path: &Path,

0 commit comments

Comments
 (0)