Skip to content

Commit 5112de2

Browse files
pfparsonsadamreeve
andauthored
GH-31869: [Python][Parquet] Implement external key material features in Python (#48009)
### Rationale for this change Enables external key material and rotation for individual parquet files in PyArrow. This change does not address any parquet dataset encryption functionality. ### What changes are included in this PR? This PR enables external key material for parquet encryption from PyArrow: Optional parquet_file_path and FileSystem paramters to CryptoFactory - mirroring the interface for CryptoFactory in C++ 1. Exposes the rotate_master_keys method of CryptoFactory 2. Adds Cython classes for FileKeyMaterialStore, FileSystemKeyMaterialStore, and KeyMaterial - but does not expose these from PyArrow encryption. I included these changes only so that a unit test may verify an external store without leaking the implementation details for the store into the test. ### Are these changes tested? Yes - I've modified an existing test (previously marked pytest.xfail) to do a basic read write test and verify creation of the external key material store and added a test for CryptoFactory.rotate_master_keys. ### Are there any user-facing changes? 1. Users may optionally supply a parquet file path and FileSystem to CryptoFactory methods that provide en/decryption_properties. Doing so in conjunction with setting EncryptionConfiguration.internal_key_material=False enables external key material from pyarrow. 2. PyArrow CryptoFactory now has a rotate_master_keys method exposing key rotation functionality from C++ CryptoFactory. * GitHub Issue: #31869 Lead-authored-by: Patrick Parsons <[email protected]> Co-authored-by: Adam Reeve <[email protected]> Signed-off-by: Adam Reeve <[email protected]>
1 parent af7fafd commit 5112de2

File tree

10 files changed

+519
-45
lines changed

10 files changed

+519
-45
lines changed

cpp/src/parquet/encryption/file_system_key_material_store.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
#include "parquet/encryption/file_system_key_material_store.h"
2727
#include "parquet/encryption/key_material.h"
28-
#include "parquet/exception.h"
2928

3029
namespace parquet::encryption {
3130

cpp/src/parquet/encryption/file_system_key_material_store.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "arrow/filesystem/filesystem.h"
2525

2626
#include "parquet/encryption/file_key_material_store.h"
27+
#include "parquet/exception.h"
2728

2829
namespace parquet::encryption {
2930

@@ -59,6 +60,9 @@ class PARQUET_EXPORT FileSystemKeyMaterialStore : public FileKeyMaterialStore {
5960
LoadKeyMaterialMap();
6061
}
6162
auto found = key_material_map_.find(key_id_in_file);
63+
if (found == key_material_map_.end()) {
64+
throw ParquetException("Invalid key id");
65+
}
6266
return found->second;
6367
}
6468

python/pyarrow/_parquet_encryption.pxd

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@ cdef class KmsConnectionConfig(_Weakrefable):
4949
@staticmethod
5050
cdef wrap(const CKmsConnectionConfig& config)
5151

52+
cdef class KeyMaterial(_Weakrefable):
53+
cdef shared_ptr[CKeyMaterial] key_material
54+
55+
@staticmethod
56+
cdef inline KeyMaterial wrap(shared_ptr[CKeyMaterial] key_material)
57+
58+
cdef class FileSystemKeyMaterialStore(_Weakrefable):
59+
cdef shared_ptr[CFileSystemKeyMaterialStore] store
5260

5361
cdef shared_ptr[CCryptoFactory] pyarrow_unwrap_cryptofactory(object crypto_factory) except *
5462
cdef shared_ptr[CKmsConnectionConfig] pyarrow_unwrap_kmsconnectionconfig(object kmsconnectionconfig) except *

python/pyarrow/_parquet_encryption.pyx

Lines changed: 213 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ from cython.operator cimport dereference as deref
2525

2626
from pyarrow.includes.common cimport *
2727
from pyarrow.includes.libarrow cimport *
28+
from pyarrow.lib cimport check_status
2829
from pyarrow.lib cimport _Weakrefable
2930
from pyarrow.lib import tobytes, frombytes
30-
31+
from pyarrow._fs cimport FileSystem
32+
from pyarrow.fs import _resolve_filesystem_and_path
3133

3234
cdef ParquetCipher cipher_from_name(name):
3335
name = name.upper()
@@ -364,6 +366,13 @@ cdef void _cb_create_kms_client(
364366
out[0] = (<KmsClient> result).unwrap()
365367

366368

369+
cdef inline shared_ptr[CFileSystem] _unwrap_fs(filesystem: FileSystem | None):
370+
if isinstance(filesystem, FileSystem):
371+
return filesystem.unwrap()
372+
else:
373+
return <shared_ptr[CFileSystem]>nullptr
374+
375+
367376
cdef class CryptoFactory(_Weakrefable):
368377
""" A factory that produces the low-level FileEncryptionProperties and
369378
FileDecryptionProperties objects, from the high-level parameters."""
@@ -402,7 +411,9 @@ cdef class CryptoFactory(_Weakrefable):
402411

403412
def file_encryption_properties(self,
404413
KmsConnectionConfig kms_connection_config,
405-
EncryptionConfiguration encryption_config):
414+
EncryptionConfiguration encryption_config,
415+
parquet_file_path=None,
416+
FileSystem filesystem=None):
406417
"""Create file encryption properties.
407418
408419
Parameters
@@ -413,6 +424,17 @@ cdef class CryptoFactory(_Weakrefable):
413424
encryption_config : EncryptionConfiguration
414425
Configuration of the encryption, such as which columns to encrypt
415426
427+
parquet_file_path : str, pathlib.Path, or None, default None
428+
Path to the parquet file to be encrypted. Only required when the
429+
internal_key_material attribute of EncryptionConfiguration is set
430+
to False. Used to derive the path for storing key material
431+
specific to this parquet file.
432+
433+
filesystem : FileSystem or None, default None
434+
Used only when internal_key_material is set to False on
435+
EncryptionConfiguration. If None, the file system will be inferred
436+
based on parquet_file_path.
437+
416438
Returns
417439
-------
418440
file_encryption_properties : FileEncryptionProperties
@@ -421,19 +443,33 @@ cdef class CryptoFactory(_Weakrefable):
421443
cdef:
422444
CResult[shared_ptr[CFileEncryptionProperties]] \
423445
file_encryption_properties_result
446+
c_string c_parquet_file_path
447+
shared_ptr[CFileSystem] c_filesystem
448+
449+
filesystem, parquet_file_path = _resolve_filesystem_and_path(
450+
parquet_file_path, filesystem)
451+
if parquet_file_path is not None:
452+
c_parquet_file_path = tobytes(parquet_file_path)
453+
else:
454+
c_parquet_file_path = tobytes("")
455+
c_filesystem = _unwrap_fs(filesystem)
456+
424457
with nogil:
425458
file_encryption_properties_result = \
426459
self.factory.get().SafeGetFileEncryptionProperties(
427460
deref(kms_connection_config.unwrap().get()),
428-
deref(encryption_config.unwrap().get()))
461+
deref(encryption_config.unwrap().get()),
462+
c_parquet_file_path, c_filesystem)
429463
file_encryption_properties = GetResultValue(
430464
file_encryption_properties_result)
431465
return FileEncryptionProperties.wrap(file_encryption_properties)
432466

433467
def file_decryption_properties(
434468
self,
435469
KmsConnectionConfig kms_connection_config,
436-
DecryptionConfiguration decryption_config=None):
470+
DecryptionConfiguration decryption_config=None,
471+
parquet_file_path=None,
472+
FileSystem filesystem=None):
437473
"""Create file decryption properties.
438474
439475
Parameters
@@ -445,6 +481,15 @@ cdef class CryptoFactory(_Weakrefable):
445481
Configuration of the decryption, such as cache timeout.
446482
Can be None.
447483
484+
parquet_file_path : str, pathlib.Path, or None, default None
485+
Path to the parquet file to be decrypted. Only required when
486+
the parquet file uses external key material. Used to derive
487+
the path to the external key material file.
488+
489+
filesystem : FileSystem or None, default None
490+
Used only when the parquet file uses external key material. If
491+
None, the file system will be inferred based on parquet_file_path.
492+
448493
Returns
449494
-------
450495
file_decryption_properties : FileDecryptionProperties
@@ -454,6 +499,17 @@ cdef class CryptoFactory(_Weakrefable):
454499
CDecryptionConfiguration c_decryption_config
455500
CResult[shared_ptr[CFileDecryptionProperties]] \
456501
c_file_decryption_properties
502+
c_string c_parquet_file_path
503+
shared_ptr[CFileSystem] c_filesystem
504+
505+
filesystem, parquet_file_path = _resolve_filesystem_and_path(
506+
parquet_file_path, filesystem)
507+
if parquet_file_path is not None:
508+
c_parquet_file_path = tobytes(parquet_file_path)
509+
else:
510+
c_parquet_file_path = tobytes("")
511+
c_filesystem = _unwrap_fs(filesystem)
512+
457513
if decryption_config is None:
458514
c_decryption_config = CDecryptionConfiguration()
459515
else:
@@ -462,7 +518,7 @@ cdef class CryptoFactory(_Weakrefable):
462518
c_file_decryption_properties = \
463519
self.factory.get().SafeGetFileDecryptionProperties(
464520
deref(kms_connection_config.unwrap().get()),
465-
c_decryption_config)
521+
c_decryption_config, c_parquet_file_path, c_filesystem)
466522
file_decryption_properties = GetResultValue(
467523
c_file_decryption_properties)
468524
return FileDecryptionProperties.wrap(file_decryption_properties)
@@ -473,9 +529,161 @@ cdef class CryptoFactory(_Weakrefable):
473529
def remove_cache_entries_for_all_tokens(self):
474530
self.factory.get().RemoveCacheEntriesForAllTokens()
475531

532+
def rotate_master_keys(
533+
self,
534+
KmsConnectionConfig kms_connection_config,
535+
parquet_file_path,
536+
FileSystem filesystem=None,
537+
double_wrapping=True,
538+
cache_lifetime_seconds=600):
539+
""" Rotates master encryption keys for a Parquet file that uses
540+
external key material.
541+
542+
Parameters
543+
----------
544+
kms_connection_config : KmsConnectionConfig
545+
Configuration of connection to KMS
546+
547+
parquet_file_path : str or pathlib.Path
548+
Path to a parquet file using external key material.
549+
550+
filesystem : FileSystem or None, default None
551+
Used only when the parquet file uses external key material. If
552+
None, the file system will be inferred based on parquet_file_path.
553+
554+
double_wrapping : bool, default True
555+
In the single wrapping mode, encrypts data encryption keys with
556+
new master keys. In the double wrapping mode, generates new
557+
KEKs (key encryption keys) and uses these to encrypt the data keys,
558+
and encrypts the KEKs with the new master keys.
559+
560+
cache_lifetime_seconds : int or float, default 600
561+
During key rotation, KMS Client and Key Encryption Keys will be
562+
cached for this duration.
563+
"""
564+
cdef:
565+
c_string c_parquet_file_path
566+
shared_ptr[CFileSystem] c_filesystem
567+
568+
if parquet_file_path != "":
569+
filesystem, parquet_file_path = _resolve_filesystem_and_path(
570+
parquet_file_path, filesystem)
571+
572+
c_parquet_file_path = tobytes(parquet_file_path)
573+
c_filesystem = _unwrap_fs(filesystem)
574+
575+
status = self.factory.get().SafeRotateMasterKeys(
576+
deref(kms_connection_config.unwrap().get()),
577+
c_parquet_file_path,
578+
c_filesystem,
579+
double_wrapping,
580+
cache_lifetime_seconds)
581+
582+
check_status(status)
583+
476584
cdef inline shared_ptr[CPyCryptoFactory] unwrap(self):
477585
return self.factory
478586

587+
cdef class KeyMaterial(_Weakrefable):
588+
589+
@property
590+
def is_footer_key(self):
591+
return self.key_material.get().is_footer_key()
592+
593+
@property
594+
def is_double_wrapped(self):
595+
return self.key_material.get().is_double_wrapped()
596+
597+
@property
598+
def master_key_id(self):
599+
return frombytes(self.key_material.get().master_key_id())
600+
601+
@property
602+
def wrapped_dek(self):
603+
return frombytes(self.key_material.get().wrapped_dek())
604+
605+
@property
606+
def kek_id(self):
607+
return frombytes(self.key_material.get().kek_id())
608+
609+
@property
610+
def wrapped_kek(self):
611+
return frombytes(self.key_material.get().wrapped_kek())
612+
613+
@property
614+
def kms_instance_id(self):
615+
return frombytes(self.key_material.get().kms_instance_id())
616+
617+
@property
618+
def kms_instance_url(self):
619+
return frombytes(self.key_material.get().kms_instance_url())
620+
621+
@staticmethod
622+
cdef inline KeyMaterial wrap(shared_ptr[CKeyMaterial] key_material):
623+
wrapper = KeyMaterial()
624+
wrapper.key_material = key_material
625+
return wrapper
626+
627+
@staticmethod
628+
def parse(
629+
const c_string key_material_string):
630+
cdef:
631+
shared_ptr[CKeyMaterial] c_key_material
632+
c_key_material = make_shared[CKeyMaterial](move(
633+
CKeyMaterial.Parse(key_material_string)
634+
))
635+
return KeyMaterial.wrap(c_key_material)
636+
637+
cdef class FileSystemKeyMaterialStore(_Weakrefable):
638+
639+
def get_key_material(self, key_id):
640+
cdef:
641+
c_string c_key_id = tobytes(key_id)
642+
c_string c_key_material_string
643+
644+
c_key_material_string = self.store.get().GetKeyMaterial(c_key_id)
645+
if c_key_material_string.empty():
646+
raise KeyError("Invalid key id")
647+
return KeyMaterial.parse(c_key_material_string)
648+
649+
def get_key_id_set(self):
650+
return self.store.get().GetKeyIDSet()
651+
652+
@classmethod
653+
def for_file(cls, parquet_file_path,
654+
FileSystem filesystem=None):
655+
"""Creates a FileSystemKeyMaterialStore for a parquet file that
656+
was created with external key material.
657+
658+
Parameters
659+
----------
660+
parquet_file_path : str or pathlib.Path
661+
Path to a parquet file using external key material.
662+
663+
filesystem : FileSystem, default None
664+
FileSystem where the parquet file is located. If None,
665+
will be inferred based on parquet_file_path.
666+
667+
Returns
668+
-------
669+
FileSystemKeyMaterialStore
670+
A FileSystemKeyMaterialStore wrapping the external key material.
671+
"""
672+
cdef:
673+
c_string c_parquet_file_path
674+
shared_ptr[CFileSystem] c_filesystem
675+
shared_ptr[CFileSystemKeyMaterialStore] c_store
676+
FileSystemKeyMaterialStore store = cls()
677+
678+
filesystem, parquet_file_path = _resolve_filesystem_and_path(
679+
parquet_file_path, filesystem)
680+
c_parquet_file_path = tobytes(parquet_file_path)
681+
c_filesystem = _unwrap_fs(filesystem)
682+
683+
c_store = CFileSystemKeyMaterialStore.Make(
684+
c_parquet_file_path, c_filesystem, False)
685+
store.store = c_store
686+
return store
479687

480688
cdef shared_ptr[CCryptoFactory] pyarrow_unwrap_cryptofactory(object crypto_factory) except *:
481689
if isinstance(crypto_factory, CryptoFactory):

0 commit comments

Comments
 (0)