Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions parquet-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,26 @@
<artifactId>parquet-avro</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-protobuf</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<!-- CatCommandTest (for TestProtobuf) -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I was wrong with my previous comment. We do not actually use protobuf in prod, right? We check only for the related key in the footer and use the example binding to get the values. So test scope seems legit.
(If you want to keep this comment, maybe let it be for all three dependencies with some separation?)

Copy link
Member Author

@ArnavBalyan ArnavBalyan Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated, yeah just noticed protobuf is only pulled to write the test file

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.25.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-protobuf</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
Comment on lines +97 to +108
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By adding these dependencies in test scope only, wouldn't cause them missing at the command line execution?
There are two ways we can use the cli. One is containing the "normal" scoped dependencies for the Hadoop env, and the other is containing the "provided" scope as well for standalone. I don't think these deps will be added to either one.

Copy link
Member Author

@ArnavBalyan ArnavBalyan Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a great catch thanks! moved to ${deps.scope} which can be enabled via the profile being used

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-format-structures</artifactId>
Expand Down
51 changes: 51 additions & 0 deletions parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.security.AccessController;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
Expand All @@ -55,14 +56,56 @@
import org.apache.parquet.cli.util.GetClassLoader;
import org.apache.parquet.cli.util.Schemas;
import org.apache.parquet.cli.util.SeekableFSDataInputStream;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.slf4j.Logger;

public abstract class BaseCommand implements Command, Configurable {

private static final String RESOURCE_URI_SCHEME = "resource";
private static final String STDIN_AS_SOURCE = "stdin";

/**
* Note for dev: Due to legancy reasons, parquet-cli used the avro schema reader which
* breaks for files generated through proto. This logic is in place to auto-detect such cases
* and route the request to simple reader instead of avro.
*/
private boolean isProtobufStyleSchema(String source) throws IOException {
try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) {
Map<String, String> metadata = reader.getFooter().getFileMetaData().getKeyValueMetaData();
return metadata != null && metadata.containsKey("parquet.proto.class");
}
}

// Util to convert ParquetReader to Iterable
private static <T> Iterable<T> asIterable(final ParquetReader<T> reader) {
return () -> new Iterator<T>() {
private T next = advance();

private T advance() {
try {
return reader.read();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public boolean hasNext() {
return next != null;
}

@Override
public T next() {
T current = next;
next = advance();
return current;
}
};
}

protected final Logger console;

private Configuration conf = null;
Expand Down Expand Up @@ -320,6 +363,14 @@ protected <D> Iterable<D> openDataFile(final String source, Schema projection) t
Formats.Format format = Formats.detectFormat(open(source));
switch (format) {
case PARQUET:
boolean isProtobufStyle = isProtobufStyleSchema(source);
if (isProtobufStyle) {
final ParquetReader<Group> grp = ParquetReader.<Group>builder(
new GroupReadSupport(), qualifiedPath(source))
.withConf(getConf())
.build();
return (Iterable<D>) asIterable(grp);
}
Configuration conf = new Configuration(getConf());
// TODO: add these to the reader builder
AvroReadSupport.setRequestedProjection(conf, projection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
*/
package org.apache.parquet.cli.commands;

import com.google.protobuf.Message;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.proto.ProtoParquetWriter;
import org.apache.parquet.proto.test.TestProtobuf;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -63,4 +67,42 @@ public void testCatCommandWithInvalidColumn() throws IOException {
command.setConf(new Configuration());
command.run();
}

@Test
public void testCatCommandProtoParquetAutoDetected() throws Exception {
File protoFile = new File(getTempFolder(), "proto_someevent.parquet");
writeProtoParquet(protoFile);

CatCommand cmd = new CatCommand(createLogger(), 0);
cmd.sourceFiles = Arrays.asList(protoFile.getAbsolutePath());
cmd.setConf(new Configuration());

int result = cmd.run();
Assert.assertEquals(0, result);
}

@Test
public void testCatCommandProtoParquetSucceedsWithAutoDetection() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't the two tests the same? Maybe we should test the new config as well. (AFAIK the avro reader fails with INT96 values by default.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, added the test with the config, it seems the behaviour is same for the INT96 values. Added a test to set the config, and verified simple reader being used on the other side. This can be hopefully be better tested with the e2e test where debug statements can be logged and verified

File protoFile = new File(getTempFolder(), "proto_someevent.parquet");
writeProtoParquet(protoFile);

CatCommand cmd = new CatCommand(createLogger(), 0);
cmd.sourceFiles = Arrays.asList(protoFile.getAbsolutePath());
cmd.setConf(new Configuration());

int result = cmd.run();
Assert.assertEquals(0, result);
}

private static void writeProtoParquet(File file) throws Exception {
TestProtobuf.RepeatedIntMessage.Builder b = TestProtobuf.RepeatedIntMessage.newBuilder()
.addRepeatedInt(1)
.addRepeatedInt(2)
.addRepeatedInt(3);

try (ProtoParquetWriter<Message> w =
new ProtoParquetWriter<>(new Path(file.getAbsolutePath()), TestProtobuf.RepeatedIntMessage.class)) {
w.write(b.build());
}
}
}