Skip to content

Commit 0fea3e1

Browse files
authored
GH-3338: Support encrypted files for Parquet CLI commands (#3339)
1 parent 9e231dc commit 0fea3e1

File tree

5 files changed

+275
-29
lines changed

5 files changed

+275
-29
lines changed

parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,22 @@
4949
import org.apache.hadoop.fs.FileSystem;
5050
import org.apache.hadoop.fs.LocalFileSystem;
5151
import org.apache.hadoop.fs.Path;
52+
import org.apache.parquet.HadoopReadOptions;
5253
import org.apache.parquet.avro.AvroParquetReader;
5354
import org.apache.parquet.avro.AvroReadSupport;
5455
import org.apache.parquet.cli.json.AvroJsonReader;
5556
import org.apache.parquet.cli.util.Formats;
5657
import org.apache.parquet.cli.util.GetClassLoader;
5758
import org.apache.parquet.cli.util.Schemas;
5859
import org.apache.parquet.cli.util.SeekableFSDataInputStream;
60+
import org.apache.parquet.crypto.DecryptionKeyRetriever;
61+
import org.apache.parquet.crypto.FileDecryptionProperties;
5962
import org.apache.parquet.example.data.Group;
6063
import org.apache.parquet.hadoop.ParquetFileReader;
6164
import org.apache.parquet.hadoop.ParquetReader;
6265
import org.apache.parquet.hadoop.example.GroupReadSupport;
66+
import org.apache.parquet.hadoop.util.HadoopInputFile;
67+
import org.apache.parquet.io.InputFile;
6368
import org.slf4j.Logger;
6469

6570
public abstract class BaseCommand implements Command, Configurable {
@@ -360,6 +365,97 @@ private static List<URL> urls(List<String> jars, List<String> dirs) throws Malfo
360365
return urls;
361366
}
362367

368+
protected ParquetFileReader createParquetFileReader(String source) throws IOException {
369+
InputFile in = HadoopInputFile.fromPath(qualifiedPath(source), getConf());
370+
371+
HadoopReadOptions.Builder optionsBuilder = HadoopReadOptions.builder(getConf());
372+
FileDecryptionProperties decryptionProperties = createFileDecryptionProperties();
373+
if (decryptionProperties != null) {
374+
optionsBuilder.withDecryption(decryptionProperties);
375+
}
376+
377+
return ParquetFileReader.open(in, optionsBuilder.build());
378+
}
379+
380+
protected FileDecryptionProperties createFileDecryptionProperties() {
381+
Configuration conf = getConf();
382+
String footerKeyHex = conf.get("parquet.encryption.footer.key");
383+
String columnKeysHex = conf.get("parquet.encryption.column.keys");
384+
385+
if (footerKeyHex == null && columnKeysHex == null) {
386+
return null;
387+
}
388+
389+
ConfigurableKeyRetriever keyRetriever = new ConfigurableKeyRetriever();
390+
FileDecryptionProperties.Builder builder =
391+
FileDecryptionProperties.builder().withPlaintextFilesAllowed();
392+
393+
byte[] footerKey = hexToBytes(footerKeyHex);
394+
builder.withFooterKey(footerKey);
395+
396+
parseAndSetColumnKeys(columnKeysHex, keyRetriever);
397+
builder.withKeyRetriever(keyRetriever);
398+
399+
return builder.build();
400+
}
401+
402+
private void parseAndSetColumnKeys(String columnKeysStr, ConfigurableKeyRetriever keyRetriever) {
403+
String[] keyToColumns = columnKeysStr.split(";");
404+
for (int i = 0; i < keyToColumns.length; ++i) {
405+
final String curKeyToColumns = keyToColumns[i].trim();
406+
if (curKeyToColumns.isEmpty()) {
407+
continue;
408+
}
409+
410+
String[] parts = curKeyToColumns.split(":");
411+
if (parts.length != 2) {
412+
console.warn(
413+
"Incorrect key to columns mapping in parquet.encryption.column.keys: [{}]", curKeyToColumns);
414+
continue;
415+
}
416+
417+
String columnKeyId = parts[0].trim();
418+
String columnNamesStr = parts[1].trim();
419+
String[] columnNames = columnNamesStr.split(",");
420+
421+
byte[] columnKeyBytes = hexToBytes(columnKeyId);
422+
423+
for (int j = 0; j < columnNames.length; ++j) {
424+
final String columnName = columnNames[j].trim();
425+
keyRetriever.putKey(columnName, columnKeyBytes);
426+
console.debug("Added decryption key for column: {}", columnName);
427+
}
428+
}
429+
}
430+
431+
private byte[] hexToBytes(String hex) {
432+
433+
if (hex.startsWith("0x") || hex.startsWith("0X")) {
434+
hex = hex.substring(2);
435+
}
436+
437+
int len = hex.length();
438+
byte[] data = new byte[len / 2];
439+
for (int i = 0; i < len; i += 2) {
440+
data[i / 2] = (byte) ((Character.digit(hex.charAt(i), 16) << 4) + Character.digit(hex.charAt(i + 1), 16));
441+
}
442+
return data;
443+
}
444+
445+
private static class ConfigurableKeyRetriever implements DecryptionKeyRetriever {
446+
private final Map<String, byte[]> keyMap = new java.util.HashMap<>();
447+
448+
public void putKey(String keyId, byte[] keyBytes) {
449+
keyMap.put(keyId, keyBytes);
450+
}
451+
452+
@Override
453+
public byte[] getKey(byte[] keyMetaData) {
454+
String keyId = new String(keyMetaData, StandardCharsets.UTF_8);
455+
return keyMap.get(keyId);
456+
}
457+
}
458+
363459
protected <D> Iterable<D> openDataFile(final String source, Schema projection) throws IOException {
364460
Formats.Format format = Formats.detectFormat(open(source));
365461
switch (format) {

parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.apache.parquet.column.Encoding;
4444
import org.apache.parquet.column.EncodingStats;
4545
import org.apache.parquet.column.statistics.Statistics;
46-
import org.apache.parquet.format.converter.ParquetMetadataConverter;
4746
import org.apache.parquet.hadoop.ParquetFileReader;
4847
import org.apache.parquet.hadoop.metadata.BlockMetaData;
4948
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -70,32 +69,33 @@ public int run() throws IOException {
7069
Preconditions.checkArgument(targets.size() == 1, "Cannot process multiple Parquet files.");
7170

7271
String source = targets.get(0);
73-
ParquetMetadata footer =
74-
ParquetFileReader.readFooter(getConf(), qualifiedPath(source), ParquetMetadataConverter.NO_FILTER);
75-
76-
console.info("\nFile path: {}", source);
77-
console.info("Created by: {}", footer.getFileMetaData().getCreatedBy());
78-
79-
Map<String, String> kv = footer.getFileMetaData().getKeyValueMetaData();
80-
if (kv != null && !kv.isEmpty()) {
81-
console.info("Properties:");
82-
String format = " %" + maxSize(kv.keySet()) + "s: %s";
83-
for (Map.Entry<String, String> entry : kv.entrySet()) {
84-
console.info(String.format(format, entry.getKey(), entry.getValue()));
72+
try (ParquetFileReader reader = createParquetFileReader(source)) {
73+
ParquetMetadata footer = reader.getFooter();
74+
75+
console.info("\nFile path: {}", source);
76+
console.info("Created by: {}", footer.getFileMetaData().getCreatedBy());
77+
78+
Map<String, String> kv = footer.getFileMetaData().getKeyValueMetaData();
79+
if (kv != null && !kv.isEmpty()) {
80+
console.info("Properties:");
81+
String format = " %" + maxSize(kv.keySet()) + "s: %s";
82+
for (Map.Entry<String, String> entry : kv.entrySet()) {
83+
console.info(String.format(format, entry.getKey(), entry.getValue()));
84+
}
85+
} else {
86+
console.info("Properties: (none)");
8587
}
86-
} else {
87-
console.info("Properties: (none)");
88-
}
8988

90-
MessageType schema = footer.getFileMetaData().getSchema();
91-
console.info("Schema:\n{}", schema);
89+
MessageType schema = footer.getFileMetaData().getSchema();
90+
console.info("Schema:\n{}", schema);
9291

93-
List<BlockMetaData> rowGroups = footer.getBlocks();
94-
for (int index = 0, n = rowGroups.size(); index < n; index += 1) {
95-
printRowGroup(console, index, rowGroups.get(index), schema);
96-
}
92+
List<BlockMetaData> rowGroups = footer.getBlocks();
93+
for (int index = 0, n = rowGroups.size(); index < n; index += 1) {
94+
printRowGroup(console, index, rowGroups.get(index), schema);
95+
}
9796

98-
console.info("");
97+
console.info("");
98+
}
9999

100100
return 0;
101101
}

parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowBloomFilterCommand.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@
3434
import org.apache.parquet.hadoop.ParquetFileReader;
3535
import org.apache.parquet.hadoop.metadata.BlockMetaData;
3636
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
37-
import org.apache.parquet.hadoop.util.HadoopInputFile;
38-
import org.apache.parquet.io.InputFile;
3937
import org.apache.parquet.io.api.Binary;
4038
import org.apache.parquet.schema.MessageType;
4139
import org.apache.parquet.schema.PrimitiveType;
@@ -68,9 +66,7 @@ public ShowBloomFilterCommand(Logger console) {
6866
public int run() throws IOException {
6967
Preconditions.checkArgument(file != null, "A Parquet file is required.");
7068

71-
InputFile in = HadoopInputFile.fromPath(qualifiedPath(file), getConf());
72-
73-
try (ParquetFileReader reader = ParquetFileReader.open(in)) {
69+
try (ParquetFileReader reader = createParquetFileReader(file)) {
7470
MessageType schema = reader.getFileMetaData().getSchema();
7571
PrimitiveType type = Util.primitive(columnPath, schema);
7672

parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public int run() throws IOException {
9696
}
9797

9898
String source = targets.get(0);
99-
try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) {
99+
try (ParquetFileReader reader = createParquetFileReader(source)) {
100100
MessageType schema = reader.getFileMetaData().getSchema();
101101
Map<ColumnDescriptor, PrimitiveType> columns = Maps.newLinkedHashMap();
102102
if (this.columns == null || this.columns.isEmpty()) {

parquet-cli/src/test/java/org/apache/parquet/cli/commands/ShowBloomFilterCommandTest.java

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,31 @@
1919

2020
package org.apache.parquet.cli.commands;
2121

22+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
23+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
24+
2225
import java.io.File;
2326
import java.io.IOException;
27+
import java.nio.charset.StandardCharsets;
2428
import java.util.Arrays;
29+
import java.util.HashMap;
30+
import java.util.Map;
2531
import org.apache.hadoop.conf.Configuration;
32+
import org.apache.hadoop.fs.Path;
33+
import org.apache.parquet.crypto.ColumnEncryptionProperties;
34+
import org.apache.parquet.crypto.FileEncryptionProperties;
35+
import org.apache.parquet.crypto.ParquetCipher;
36+
import org.apache.parquet.example.data.Group;
37+
import org.apache.parquet.example.data.simple.SimpleGroup;
38+
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
39+
import org.apache.parquet.hadoop.ParquetWriter;
40+
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
41+
import org.apache.parquet.hadoop.example.GroupWriteSupport;
42+
import org.apache.parquet.hadoop.metadata.ColumnPath;
43+
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
44+
import org.apache.parquet.io.api.Binary;
45+
import org.apache.parquet.schema.MessageType;
46+
import org.apache.parquet.schema.Types;
2647
import org.junit.Assert;
2748
import org.junit.Test;
2849

@@ -37,4 +58,137 @@ public void testShowBloomFilterCommand() throws IOException {
3758
command.setConf(new Configuration());
3859
Assert.assertEquals(0, command.run());
3960
}
61+
62+
@Test
63+
public void testEncryptedFileWithBloomFilter() throws IOException {
64+
File encryptedFile = createEncryptedFileWithBloomFilter();
65+
66+
ShowBloomFilterCommand command = new ShowBloomFilterCommand(createLogger());
67+
command.file = encryptedFile.getAbsolutePath();
68+
command.columnPath = "name";
69+
command.testValues = Arrays.asList(new String[] {"test_value_1", "non_existent_value"});
70+
71+
Configuration conf = new Configuration();
72+
conf.set("parquet.encryption.footer.key", "0102030405060708090a0b0c0d0e0f10");
73+
conf.set(
74+
"parquet.encryption.column.keys",
75+
"02030405060708090a0b0c0d0e0f1011:name,email;0405060708090a0b0c0d0e0f10111213:phone");
76+
command.setConf(conf);
77+
78+
Assert.assertEquals(0, command.run());
79+
80+
ShowBloomFilterCommand emailCommand = new ShowBloomFilterCommand(createLogger());
81+
emailCommand.file = encryptedFile.getAbsolutePath();
82+
emailCommand.columnPath = "email";
83+
emailCommand.testValues = Arrays.asList(new String[] {"[email protected]", "[email protected]"});
84+
emailCommand.setConf(conf);
85+
86+
Assert.assertEquals(0, emailCommand.run());
87+
88+
ShowBloomFilterCommand phoneCommand = new ShowBloomFilterCommand(createLogger());
89+
phoneCommand.file = encryptedFile.getAbsolutePath();
90+
phoneCommand.columnPath = "phone";
91+
phoneCommand.testValues = Arrays.asList(new String[] {"555-0001", "555-9999"});
92+
phoneCommand.setConf(conf);
93+
94+
Assert.assertEquals(0, phoneCommand.run());
95+
96+
encryptedFile.delete();
97+
}
98+
99+
private File createEncryptedFileWithBloomFilter() throws IOException {
100+
MessageType schema = Types.buildMessage()
101+
.required(INT32)
102+
.named("id")
103+
.required(BINARY)
104+
.named("name")
105+
.required(BINARY)
106+
.named("email")
107+
.required(BINARY)
108+
.named("phone")
109+
.named("test_schema");
110+
111+
File tempFile = new File(getTempFolder(), "encrypted_bloom_test.parquet");
112+
tempFile.deleteOnExit();
113+
114+
Configuration conf = new Configuration();
115+
GroupWriteSupport.setSchema(schema, conf);
116+
117+
String[] encryptColumns = {"name", "email", "phone"};
118+
FileEncryptionProperties encryptionProperties =
119+
createFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, true);
120+
121+
SimpleGroupFactory factory = new SimpleGroupFactory(schema);
122+
String[] nameValues = {"test_value_1", "test_value_2", "another_test", "bloom_filter_test", "final_value"};
123+
String[] emailValues = {
124+
125+
};
126+
String[] phoneValues = {"555-0001", "555-0002", "555-0003", "555-0004", "555-0005"};
127+
128+
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(new Path(tempFile.toURI()))
129+
.withConf(conf)
130+
.withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
131+
.withEncryption(encryptionProperties)
132+
.withBloomFilterEnabled("name", true)
133+
.withBloomFilterEnabled("email", true)
134+
.withBloomFilterEnabled("phone", true)
135+
.withPageSize(1024)
136+
.withRowGroupSize(4096)
137+
.build()) {
138+
139+
for (int i = 0; i < nameValues.length; i++) {
140+
SimpleGroup group = (SimpleGroup) factory.newGroup();
141+
group.add("id", i + 1);
142+
group.add("name", Binary.fromString(nameValues[i]));
143+
group.add("email", Binary.fromString(emailValues[i]));
144+
group.add("phone", Binary.fromString(phoneValues[i]));
145+
writer.write(group);
146+
}
147+
}
148+
149+
return tempFile;
150+
}
151+
152+
private FileEncryptionProperties createFileEncryptionProperties(
153+
String[] encryptColumns, ParquetCipher cipher, boolean footerEncryption) {
154+
155+
byte[] footerKey = {
156+
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10
157+
};
158+
159+
byte[] sharedKey = new byte[] {
160+
0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11
161+
};
162+
byte[] phoneKey = new byte[] {
163+
0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11, 0x12, 0x13
164+
};
165+
166+
Map<String, byte[]> columnKeys = new HashMap<>();
167+
columnKeys.put("name", sharedKey);
168+
columnKeys.put("email", sharedKey);
169+
columnKeys.put("phone", phoneKey);
170+
171+
Map<ColumnPath, ColumnEncryptionProperties> columnPropertyMap = new HashMap<>();
172+
for (String columnPath : encryptColumns) {
173+
ColumnPath column = ColumnPath.fromDotString(columnPath);
174+
byte[] columnKey = columnKeys.get(columnPath);
175+
176+
ColumnEncryptionProperties columnProps = ColumnEncryptionProperties.builder(column)
177+
.withKey(columnKey)
178+
.withKeyMetaData(columnPath.getBytes(StandardCharsets.UTF_8))
179+
.build();
180+
columnPropertyMap.put(column, columnProps);
181+
}
182+
183+
FileEncryptionProperties.Builder builder = FileEncryptionProperties.builder(footerKey)
184+
.withFooterKeyMetadata("footkey".getBytes(StandardCharsets.UTF_8))
185+
.withAlgorithm(cipher)
186+
.withEncryptedColumns(columnPropertyMap);
187+
188+
if (!footerEncryption) {
189+
builder.withPlaintextFooter();
190+
}
191+
192+
return builder.build();
193+
}
40194
}

0 commit comments

Comments
 (0)