Skip to content

Commit fda3319

Browse files
committed
Variant: Refactor readers to use a parent handler.
1 parent 5ed5536 commit fda3319

File tree

13 files changed

+731
-699
lines changed

13 files changed

+731
-699
lines changed

parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.parquet.schema.GroupType;
3333
import org.apache.parquet.schema.PrimitiveStringifier;
3434
import org.apache.parquet.schema.PrimitiveType;
35-
import org.apache.parquet.variant.VariantColumnConverter;
3635

3736
public class AvroConverters {
3837

@@ -341,27 +340,4 @@ public String convert(Binary binary) {
341340
return stringifier.stringify(binary);
342341
}
343342
}
344-
345-
static final class FieldVariantConverter<T> extends VariantColumnConverter {
346-
private final ParentValueContainer parent;
347-
private final Schema avroSchema;
348-
private final GenericData model;
349-
350-
public FieldVariantConverter(
351-
ParentValueContainer parent, GroupType schema, Schema avroSchema, GenericData model) {
352-
super(schema);
353-
this.avroSchema = avroSchema;
354-
this.model = model;
355-
this.parent = parent;
356-
}
357-
358-
@Override
359-
@SuppressWarnings("unchecked")
360-
public void addVariant(ByteBuffer value, ByteBuffer metadata) {
361-
T currentRecord = (T) model.newRecord(null, avroSchema);
362-
model.setField(currentRecord, "metadata", 0, metadata);
363-
model.setField(currentRecord, "value", 1, value);
364-
parent.add(currentRecord);
365-
}
366-
}
367343
}

parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ private static Converter newConverter(Schema schema, Type type, GenericData mode
170170
return new MapConverter(parent, type.asGroupType(), schema, model);
171171
case RECORD:
172172
if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) {
173-
return new AvroConverters.FieldVariantConverter(parent, type.asGroupType(), schema, model);
173+
return new AvroVariantConverter(parent, type.asGroupType(), schema, model);
174174
} else {
175175
return new AvroIndexedRecordConverter<>(parent, type.asGroupType(), schema, model);
176176
}

parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ private static Converter newConverter(
396396
return newStringConverter(schema, model, parent, validator);
397397
case RECORD:
398398
if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) {
399-
return new AvroConverters.FieldVariantConverter(parent, type.asGroupType(), schema, model);
399+
return new AvroVariantConverter(parent, type.asGroupType(), schema, model);
400400
} else {
401401
return new AvroRecordConverter(parent, type.asGroupType(), schema, model, validator);
402402
}
@@ -1004,7 +1004,7 @@ static boolean isElementType(Type repeatedType, Schema elementSchema) {
10041004
} else if (elementSchema != null && elementSchema.getType() == Schema.Type.RECORD) {
10051005
Schema schemaFromRepeated = CONVERTER.convert(repeatedType.asGroupType());
10061006
if (checkReaderWriterCompatibility(elementSchema, schemaFromRepeated)
1007-
.getType()
1007+
.getType()
10081008
== COMPATIBLE) {
10091009
return true;
10101010
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.avro;
20+
21+
import java.nio.ByteBuffer;
22+
import java.util.function.Consumer;
23+
import org.apache.avro.Schema;
24+
import org.apache.avro.generic.GenericData;
25+
import org.apache.parquet.Preconditions;
26+
import org.apache.parquet.io.api.Converter;
27+
import org.apache.parquet.io.api.GroupConverter;
28+
import org.apache.parquet.schema.GroupType;
29+
import org.apache.parquet.variant.ImmutableMetadata;
30+
import org.apache.parquet.variant.VariantBuilder;
31+
import org.apache.parquet.variant.VariantConverters;
32+
33+
/**
34+
* Converter for Variant values.
35+
*/
36+
class AvroVariantConverter extends GroupConverter implements VariantConverters.ParentConverter<VariantBuilder> {
37+
private final ParentValueContainer parent;
38+
private final Schema avroSchema;
39+
private final GenericData model;
40+
private final int metadataPos;
41+
private final int valuePos;
42+
private final GroupConverter wrappedConverter;
43+
44+
private VariantBuilder builder = null;
45+
private ImmutableMetadata metadata = null;
46+
47+
AvroVariantConverter(ParentValueContainer parent, GroupType variantGroup, Schema avroSchema, GenericData model) {
48+
this.parent = parent;
49+
this.avroSchema = avroSchema;
50+
this.metadataPos = avroSchema.getField("metadata").pos();
51+
this.valuePos = avroSchema.getField("value").pos();
52+
this.model = model;
53+
this.wrappedConverter = VariantConverters.newVariantConverter(variantGroup, this::setMetadata, this);
54+
}
55+
56+
@Override
57+
public void build(Consumer<VariantBuilder> consumer) {
58+
Preconditions.checkState(builder != null, "Cannot build variant: builder has not been initialized");
59+
consumer.accept(builder);
60+
}
61+
62+
@Override
63+
public Converter getConverter(int fieldIndex) {
64+
return wrappedConverter.getConverter(fieldIndex);
65+
}
66+
67+
@Override
68+
public void start() {
69+
wrappedConverter.start();
70+
}
71+
72+
@Override
73+
public void end() {
74+
wrappedConverter.end();
75+
76+
Preconditions.checkState(metadata != null, "Cannot build variant: missing metadata");
77+
78+
builder.appendNullIfEmpty();
79+
80+
Object record = model.newRecord(null, avroSchema);
81+
model.setField(record, "metadata", metadataPos, metadata.getEncodedBuffer());
82+
model.setField(record, "value", valuePos, builder.encodedValue());
83+
parent.add(record);
84+
85+
this.builder = null;
86+
}
87+
88+
void setMetadata(ByteBuffer metadataBuffer) {
89+
// If the metadata hasn't changed, we don't need to rebuild the map.
90+
if (metadata == null || metadata.getEncodedBuffer() != metadataBuffer) {
91+
this.metadata = new ImmutableMetadata(metadataBuffer);
92+
}
93+
94+
this.builder = new VariantBuilder(metadata);
95+
}
96+
}

0 commit comments

Comments
 (0)