Skip to content

Commit 1818380

Browse files
authored
GH-3286: Add support for Parquet-Protobuf in Parquet-cli (#3287)
1 parent a1ad19a commit 1818380

File tree

4 files changed

+121
-1
lines changed

4 files changed

+121
-1
lines changed

parquet-cli/pom.xml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,28 @@
8585
<artifactId>parquet-avro</artifactId>
8686
<version>${project.version}</version>
8787
</dependency>
88+
89+
<!-- Protobuf dependencies for CLI Tests -->
90+
<dependency>
91+
<groupId>org.apache.parquet</groupId>
92+
<artifactId>parquet-protobuf</artifactId>
93+
<version>${project.version}</version>
94+
<classifier>tests</classifier>
95+
<scope>test</scope>
96+
</dependency>
97+
<dependency>
98+
<groupId>com.google.protobuf</groupId>
99+
<artifactId>protobuf-java</artifactId>
100+
<version>3.25.6</version>
101+
<scope>test</scope>
102+
</dependency>
103+
<dependency>
104+
<groupId>org.apache.parquet</groupId>
105+
<artifactId>parquet-protobuf</artifactId>
106+
<version>${project.version}</version>
107+
<scope>test</scope>
108+
</dependency>
109+
88110
<dependency>
89111
<groupId>org.apache.parquet</groupId>
90112
<artifactId>parquet-format-structures</artifactId>

parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.security.AccessController;
3636
import java.util.Iterator;
3737
import java.util.List;
38+
import java.util.Map;
3839
import java.util.NoSuchElementException;
3940
import org.apache.avro.Schema;
4041
import org.apache.avro.file.DataFileReader;
@@ -55,13 +56,56 @@
5556
import org.apache.parquet.cli.util.GetClassLoader;
5657
import org.apache.parquet.cli.util.Schemas;
5758
import org.apache.parquet.cli.util.SeekableFSDataInputStream;
59+
import org.apache.parquet.example.data.Group;
60+
import org.apache.parquet.hadoop.ParquetFileReader;
5861
import org.apache.parquet.hadoop.ParquetReader;
62+
import org.apache.parquet.hadoop.example.GroupReadSupport;
5963
import org.slf4j.Logger;
6064

6165
public abstract class BaseCommand implements Command, Configurable {
6266

6367
private static final String RESOURCE_URI_SCHEME = "resource";
6468
private static final String STDIN_AS_SOURCE = "stdin";
69+
public static final String PARQUET_CLI_ENABLE_GROUP_READER = "parquet.enable.simple-reader";
70+
71+
/**
72+
* Note for dev: Due to legancy reasons, parquet-cli used the avro schema reader which
73+
* breaks for files generated through proto. This logic is in place to auto-detect such cases
74+
* and route the request to simple reader instead of avro.
75+
*/
76+
private boolean isProtobufStyleSchema(String source) throws IOException {
77+
try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) {
78+
Map<String, String> metadata = reader.getFooter().getFileMetaData().getKeyValueMetaData();
79+
return metadata != null && metadata.containsKey("parquet.proto.class");
80+
}
81+
}
82+
83+
// Util to convert ParquetReader to Iterable
84+
private static <T> Iterable<T> asIterable(final ParquetReader<T> reader) {
85+
return () -> new Iterator<T>() {
86+
private T next = advance();
87+
88+
private T advance() {
89+
try {
90+
return reader.read();
91+
} catch (IOException e) {
92+
throw new RuntimeException(e);
93+
}
94+
}
95+
96+
@Override
97+
public boolean hasNext() {
98+
return next != null;
99+
}
100+
101+
@Override
102+
public T next() {
103+
T current = next;
104+
next = advance();
105+
return current;
106+
}
107+
};
108+
}
65109

66110
protected final Logger console;
67111

@@ -320,6 +364,15 @@ protected <D> Iterable<D> openDataFile(final String source, Schema projection) t
320364
Formats.Format format = Formats.detectFormat(open(source));
321365
switch (format) {
322366
case PARQUET:
367+
boolean isProtobufStyle = isProtobufStyleSchema(source);
368+
boolean useGroupReader = getConf().getBoolean(PARQUET_CLI_ENABLE_GROUP_READER, false);
369+
if (isProtobufStyle || useGroupReader) {
370+
final ParquetReader<Group> grp = ParquetReader.<Group>builder(
371+
new GroupReadSupport(), qualifiedPath(source))
372+
.withConf(getConf())
373+
.build();
374+
return (Iterable<D>) asIterable(grp);
375+
}
323376
Configuration conf = new Configuration(getConf());
324377
// TODO: add these to the reader builder
325378
AvroReadSupport.setRequestedProjection(conf, projection);

parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ public static Schema fromParquet(Configuration conf, URI location) throws IOExce
8282
if (schemaString != null) {
8383
return new Schema.Parser().parse(schemaString);
8484
} else {
85-
return new AvroSchemaConverter().convert(footer.getFileMetaData().getSchema());
85+
return new AvroSchemaConverter(conf)
86+
.convert(footer.getFileMetaData().getSchema());
8687
}
8788
}
8889

parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818
*/
1919
package org.apache.parquet.cli.commands;
2020

21+
import com.google.protobuf.Message;
2122
import java.io.File;
2223
import java.io.IOException;
2324
import java.util.Arrays;
2425
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.fs.Path;
27+
import org.apache.parquet.proto.ProtoParquetWriter;
28+
import org.apache.parquet.proto.test.TestProtobuf;
2529
import org.junit.Assert;
2630
import org.junit.Test;
2731

@@ -63,4 +67,44 @@ public void testCatCommandWithInvalidColumn() throws IOException {
6367
command.setConf(new Configuration());
6468
command.run();
6569
}
70+
71+
@Test
72+
public void testCatCommandProtoParquetAutoDetected() throws Exception {
73+
File protoFile = new File(getTempFolder(), "proto_someevent.parquet");
74+
writeProtoParquet(protoFile);
75+
76+
CatCommand cmd = new CatCommand(createLogger(), 0);
77+
cmd.sourceFiles = Arrays.asList(protoFile.getAbsolutePath());
78+
cmd.setConf(new Configuration());
79+
80+
int result = cmd.run();
81+
Assert.assertEquals(0, result);
82+
}
83+
84+
@Test
85+
public void testCatCommandWithSimpleReaderConfig() throws Exception {
86+
File regularFile = parquetFile();
87+
88+
Configuration conf = new Configuration();
89+
conf.setBoolean("parquet.enable.simple-reader", true);
90+
91+
CatCommand cmd = new CatCommand(createLogger(), 5);
92+
cmd.sourceFiles = Arrays.asList(regularFile.getAbsolutePath());
93+
cmd.setConf(conf);
94+
95+
int result = cmd.run();
96+
Assert.assertEquals(0, result);
97+
}
98+
99+
private static void writeProtoParquet(File file) throws Exception {
100+
TestProtobuf.RepeatedIntMessage.Builder b = TestProtobuf.RepeatedIntMessage.newBuilder()
101+
.addRepeatedInt(1)
102+
.addRepeatedInt(2)
103+
.addRepeatedInt(3);
104+
105+
try (ProtoParquetWriter<Message> w =
106+
new ProtoParquetWriter<>(new Path(file.getAbsolutePath()), TestProtobuf.RepeatedIntMessage.class)) {
107+
w.write(b.build());
108+
}
109+
}
66110
}

0 commit comments

Comments
 (0)