Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdks/java/io/parquet/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ dependencies {
implementation "org.apache.parquet:parquet-column:$parquet_version"
implementation "org.apache.parquet:parquet-common:$parquet_version"
implementation "org.apache.parquet:parquet-hadoop:$parquet_version"
implementation "org.apache.parquet:parquet-protobuf:$parquet_version"
implementation library.java.avro
provided library.java.hadoop_client
permitUnusedDeclared library.java.hadoop_client
provided library.java.hadoop_common
testImplementation library.java.hadoop_client
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation project(path: ":sdks:java:extensions:avro")
testImplementation "org.apache.avro:avro-protobuf:1.10.2"
testImplementation library.java.junit
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.avro.AvroParquetReader;
Expand All @@ -73,6 +74,7 @@
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.FilterCompat.Filter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.InitContext;
import org.apache.parquet.hadoop.api.ReadSupport;
Expand All @@ -89,6 +91,8 @@
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.proto.ProtoParquetReader;
import org.apache.parquet.proto.ProtoReadSupport;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -263,6 +267,7 @@ public static ReadFiles readFiles(Schema schema) {
return new AutoValue_ParquetIO_ReadFiles.Builder()
.setSchema(schema)
.setInferBeamSchema(false)
.setUseProtoReader(false) // NEW: default to false
.build();
}

Expand Down Expand Up @@ -616,6 +621,9 @@ public abstract static class ReadFiles

abstract Builder toBuilder();

// New: flag to indicate using ProtoParquetReader for protobuf data.
abstract boolean getUseProtoReader();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider using a enum type parameter (AVRO / PROTO) and defaults to AVRO? This allows flexibility for future additions.


@AutoValue.Builder
abstract static class Builder {
abstract Builder setSchema(Schema schema);
Expand All @@ -630,6 +638,8 @@ abstract static class Builder {

abstract Builder setInferBeamSchema(boolean inferBeamSchema);

abstract Builder setUseProtoReader(boolean useProtoReader);

abstract ReadFiles build();
}

Expand Down Expand Up @@ -663,6 +673,11 @@ public ReadFiles withBeamSchemas(boolean inferBeamSchema) {
return toBuilder().setInferBeamSchema(inferBeamSchema).build();
}

// New: method to opt into using ProtoParquetReader.
public ReadFiles withProtoReader() {
return toBuilder().setUseProtoReader(true).build();
}

@Override
public PCollection<GenericRecord> expand(PCollection<ReadableFile> input) {
checkNotNull(getSchema(), "Schema can not be null");
Expand All @@ -673,7 +688,9 @@ public PCollection<GenericRecord> expand(PCollection<ReadableFile> input) {
getAvroDataModel(),
getProjectionSchema(),
GenericRecordPassthroughFn.create(),
getConfiguration())))
getConfiguration(),
getUseProtoReader() // New: pass the new flag here
)))
.setCoder(getCollectionCoder());
}

Expand All @@ -686,7 +703,10 @@ public void populateDisplayData(DisplayData.Builder builder) {
DisplayData.item("inferBeamSchema", getInferBeamSchema())
.withLabel("Infer Beam Schema"))
.addIfNotNull(DisplayData.item("projectionSchema", String.valueOf(getProjectionSchema())))
.addIfNotNull(DisplayData.item("avroDataModel", String.valueOf(getAvroDataModel())));
.addIfNotNull(DisplayData.item("avroDataModel", String.valueOf(getAvroDataModel())))
.add(
DisplayData.item("useProtoReader", getUseProtoReader())
.withLabel("Use ProtoParquetReader"));
if (this.getConfiguration() != null) {
Configuration configuration = this.getConfiguration().get();
for (Entry<String, String> entry : configuration) {
Expand Down Expand Up @@ -718,16 +738,30 @@ static class SplitReadFn<T> extends DoFn<ReadableFile, T> {

private final SerializableFunction<GenericRecord, T> parseFn;

private final boolean useProtoReader; // New: new flag for protobuf

SplitReadFn(
GenericData model,
Schema requestSchema,
SerializableFunction<GenericRecord, T> parseFn,
@Nullable SerializableConfiguration configuration) {
@Nullable SerializableConfiguration configuration,
boolean useProtoReader // New: add flag here
) {

this.modelClass = model != null ? model.getClass() : null;
this.requestSchemaString = requestSchema != null ? requestSchema.toString() : null;
this.parseFn = checkNotNull(parseFn, "GenericRecord parse function can't be null");
this.configuration = configuration;
this.useProtoReader = useProtoReader; // New: assign the flag
}

// New: Overloaded constructor for backward compatibility:
SplitReadFn(
GenericData model,
Schema requestSchema,
SerializableFunction<GenericRecord, T> parseFn,
@Nullable SerializableConfiguration configuration) {
this(model, requestSchema, parseFn, configuration, false);
}

private ParquetFileReader getParquetFileReader(ReadableFile file) throws Exception {
Expand All @@ -746,6 +780,31 @@ public void processElement(
tracker.currentRestriction().getFrom(),
tracker.currentRestriction().getTo());
Configuration conf = getConfWithModelClass();

if (useProtoReader) {
// Use ProtoParquetReader to read protobuf data.
// Derive a Hadoop Path from the file metadata. Adjust as needed.
Path path = new Path(file.getMetadata().resourceId().toString());

// Get the configuration and set the property using the literal.
Configuration conf2 = getConfWithModelClass();
conf2.set("parquet.proto.ignore.unknown.fields", "TRUE");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of "ignore.unknown.fields" here? In general we should not set unsafe configure for user if they are not aware of


// Use the builder overload that takes a ReadSupport and a Path.
try (ParquetReader<GenericRecord> reader =
ProtoParquetReader.<GenericRecord>builder(new ProtoReadSupport(), path).build()) {
GenericRecord message;
while ((message = reader.read()) != null) {
// Cast through Object so that parseFn (which expects GenericRecord)
// can accept the DynamicMessage.
outputReceiver.output(parseFn.apply((GenericRecord) (Object) message));
}
}

return; // exit after using the proto path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For readability, I would recommend structure the code, e.g.

getParquetFileReader() {
switch (FORMAT) {
case AVRO: return getParquetAvroReader()
case PROTO: return getParquetProtoReader()
}
}
instead of branching in place

}

// ELSE: existing logic using ParquetFileReader for Avro
GenericData model = null;
if (modelClass != null) {
model = (GenericData) modelClass.getMethod("get").invoke(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.protobuf.ProtobufData;
import org.apache.avro.reflect.ReflectData;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
Expand Down Expand Up @@ -137,6 +138,31 @@ public void testWriteAndReadWithProjection() {
readPipeline.run().waitUntilFinish();
}

@Test
public void testParquetProtobufReadError() {
ProtobufData protoData = new ProtobufData() {};
Exception thrown =
assertThrows(RuntimeException.class, () -> protoData.getSchema(GenericData.Record.class));
assertTrue(
"Error message should mention 'getDescriptor'",
thrown.getMessage().contains("org.apache.avro.generic.GenericData$Record.getDescriptor"));
}

@Test
public void testReadFilesWithProtoReaderFlag() {
// Create a ReadFiles transform with the proto-reader enabled.
ParquetIO.ReadFiles readFiles = ParquetIO.readFiles(SCHEMA).withProtoReader();
assertTrue("Proto reader flag should be enabled", readFiles.getUseProtoReader());
}

@Test
public void testReadFilesDisplayDataWithProtoReader() {
// Create a ReadFiles transform with proto-reader enabled.
ParquetIO.ReadFiles readFiles = ParquetIO.readFiles(SCHEMA).withProtoReader();
DisplayData displayData = DisplayData.from(readFiles);
assertThat(displayData, hasDisplayItem("useProtoReader", true));
}

@Test
public void testBlockTracker() {
OffsetRange range = new OffsetRange(0, 1);
Expand Down
Loading