Skip to content

Commit 46aa743

Browse files
committed
Update the test to pass avro read schema
1 parent ee34713 commit 46aa743

File tree

3 files changed

+30
-19
lines changed

3 files changed

+30
-19
lines changed

parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,8 +470,11 @@ public Optional<Schema> visit(
470470
@Override
471471
public Optional<Schema> visit(
472472
LogicalTypeAnnotation.VariantLogicalTypeAnnotation variantLogicalType) {
473-
return of(
474-
convertFields(parquetGroupType.getName(), parquetGroupType.getFields(), names));
473+
String name = parquetGroupType.getName();
474+
List<Schema.Field> fields = new ArrayList<>();
475+
fields.add(new Schema.Field("metadata", Schema.create(Schema.Type.BYTES)));
476+
fields.add(new Schema.Field("value", Schema.create(Schema.Type.BYTES)));
477+
return of(Schema.createRecord(name, null, namespace(name, names), false, fields));
475478
}
476479
})
477480
.orElseThrow(

parquet-avro/src/main/java/org/apache/parquet/avro/AvroVariantConverter.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.nio.ByteBuffer;
2222
import java.util.function.Consumer;
2323
import org.apache.avro.Schema;
24-
import org.apache.avro.SchemaBuilder;
2524
import org.apache.avro.generic.GenericData;
2625
import org.apache.parquet.Preconditions;
2726
import org.apache.parquet.io.api.Converter;
@@ -35,27 +34,21 @@
3534
* Converter for Variant values.
3635
*/
3736
class AvroVariantConverter extends GroupConverter implements VariantConverters.ParentConverter<VariantBuilder> {
38-
private static final Schema VARIANT_SCHEMA = SchemaBuilder.record("VariantRecord")
39-
.fields()
40-
.name("metadata")
41-
.type()
42-
.bytesType()
43-
.noDefault()
44-
.name("value")
45-
.type()
46-
.bytesType()
47-
.noDefault()
48-
.endRecord();
49-
5037
private final ParentValueContainer parent;
38+
private final Schema avroSchema;
5139
private final GenericData model;
40+
private final int metadataPos;
41+
private final int valuePos;
5242
private final GroupConverter wrappedConverter;
5343

5444
private VariantBuilder builder = null;
5545
private ImmutableMetadata metadata = null;
5646

5747
AvroVariantConverter(ParentValueContainer parent, GroupType variantGroup, Schema avroSchema, GenericData model) {
5848
this.parent = parent;
49+
this.avroSchema = avroSchema;
50+
this.metadataPos = avroSchema.getField("metadata").pos();
51+
this.valuePos = avroSchema.getField("value").pos();
5952
this.model = model;
6053
this.wrappedConverter = VariantConverters.newVariantConverter(variantGroup, this::setMetadata, this);
6154
}
@@ -84,9 +77,9 @@ public void end() {
8477

8578
builder.appendNullIfEmpty();
8679

87-
Object record = model.newRecord(null, VARIANT_SCHEMA);
88-
model.setField(record, "metadata", 0, metadata.getEncodedBuffer());
89-
model.setField(record, "value", 1, builder.encodedValue());
80+
Object record = model.newRecord(null, avroSchema);
81+
model.setField(record, "metadata", metadataPos, metadata.getEncodedBuffer());
82+
model.setField(record, "value", valuePos, builder.encodedValue());
9083
parent.add(record);
9184

9285
this.builder = null;

parquet-avro/src/test/java/org/apache/parquet/variant/TestVariantReadsFromFile.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,18 @@
3232
import java.nio.file.Paths;
3333
import java.util.List;
3434
import java.util.stream.Stream;
35+
import org.apache.avro.Schema;
3536
import org.apache.avro.generic.GenericData;
3637
import org.apache.avro.generic.GenericRecord;
38+
import org.apache.hadoop.conf.Configuration;
3739
import org.apache.parquet.Preconditions;
3840
import org.apache.parquet.avro.AvroParquetReader;
41+
import org.apache.parquet.avro.AvroReadSupport;
42+
import org.apache.parquet.avro.AvroSchemaConverter;
43+
import org.apache.parquet.hadoop.ParquetFileReader;
3944
import org.apache.parquet.io.InputFile;
4045
import org.apache.parquet.io.LocalInputFile;
46+
import org.apache.parquet.schema.MessageType;
4147
import org.assertj.core.api.Assertions;
4248
import org.junit.jupiter.params.ParameterizedTest;
4349
import org.junit.jupiter.params.provider.Arguments;
@@ -186,8 +192,17 @@ private GenericRecord readParquetRecord(String parquetFile) throws IOException {
186192
private List<GenericRecord> readParquet(String parquetFile) throws IOException {
187193
org.apache.parquet.io.InputFile inputFile = new LocalInputFile(Paths.get(CASE_LOCATION + "/" + parquetFile));
188194
List<GenericRecord> records = Lists.newArrayList();
195+
196+
// Set Avro read schema converted from the Parquet schema
197+
Configuration conf = new Configuration(false);
198+
try (ParquetFileReader fileReader = ParquetFileReader.open(inputFile)) {
199+
final MessageType fileSchema = fileReader.getFileMetaData().getSchema();
200+
Schema avroReadSchema = new AvroSchemaConverter().convert(fileSchema);
201+
AvroReadSupport.setAvroReadSchema(conf, avroReadSchema);
202+
}
203+
189204
try (org.apache.parquet.hadoop.ParquetReader<GenericRecord> reader =
190-
AvroParquetReader.<GenericRecord>builder(inputFile).build()) {
205+
AvroParquetReader.<GenericRecord>builder(inputFile).withConf(conf).build()) {
191206
GenericRecord record;
192207
while ((record = reader.read()) != null) {
193208
records.add(record);

0 commit comments

Comments
 (0)