From dbd8bb86fc7da7c4e3cf31744a61d0b56030cd5a Mon Sep 17 00:00:00 2001 From: Aihua Xu Date: Sat, 26 Jul 2025 19:07:33 -0700 Subject: [PATCH 1/4] Test Variant read from files --- parquet-avro/pom.xml | 12 + .../parquet/avro/AvroRecordConverter.java | 2 +- .../parquet/avro/AvroVariantConverter.java | 27 +- .../org/apache/parquet/variant/JsonUtil.java | 76 +++++ .../variant/TestVariantReadsFromFile.java | 282 ++++++++++++++++++ 5 files changed, 386 insertions(+), 13 deletions(-) create mode 100644 parquet-avro/src/test/java/org/apache/parquet/variant/JsonUtil.java create mode 100644 parquet-avro/src/test/java/org/apache/parquet/variant/TestVariantReadsFromFile.java diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml index 27cabb757f..d659104e91 100644 --- a/parquet-avro/pom.xml +++ b/parquet-avro/pom.xml @@ -133,6 +133,18 @@ ${powermock.version} test + + org.assertj + assertj-core + 3.25.3 + test + + + org.junit.jupiter + junit-jupiter-params + 5.12.2 + test + diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java index 340dc77220..eb80f8468b 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java @@ -395,7 +395,7 @@ private static Converter newConverter( } return newStringConverter(schema, model, parent, validator); case RECORD: - if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) { + if (type.getName().equals("var") || type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) { return new AvroVariantConverter(parent, type.asGroupType(), schema, model); } else { return new AvroRecordConverter(parent, type.asGroupType(), schema, model, validator); diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroVariantConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroVariantConverter.java index 765d87e71c..56e8f99696 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroVariantConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroVariantConverter.java @@ -27,6 +27,7 @@ import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.schema.GroupType; import org.apache.parquet.variant.ImmutableMetadata; +import org.apache.parquet.variant.Variant; import org.apache.parquet.variant.VariantBuilder; import org.apache.parquet.variant.VariantConverters; @@ -35,10 +36,10 @@ */ class AvroVariantConverter extends GroupConverter implements VariantConverters.ParentConverter { private final ParentValueContainer parent; - private final Schema avroSchema; - private final GenericData model; - private final int metadataPos; - private final int valuePos; +// private final Schema avroSchema; +// private final GenericData model; +// private final int metadataPos; +// private final int valuePos; private final GroupConverter wrappedConverter; private VariantBuilder builder = null; @@ -46,10 +47,10 @@ class AvroVariantConverter extends GroupConverter implements VariantConverters.P AvroVariantConverter(ParentValueContainer parent, GroupType variantGroup, Schema avroSchema, GenericData model) { this.parent = parent; - this.avroSchema = avroSchema; - this.metadataPos = avroSchema.getField("metadata").pos(); - this.valuePos = avroSchema.getField("value").pos(); - this.model = model; +// this.avroSchema = avroSchema; +// this.metadataPos = avroSchema.getField("metadata").pos(); +// this.valuePos = avroSchema.getField("value").pos(); +// this.model = model; this.wrappedConverter = VariantConverters.newVariantConverter(variantGroup, this::setMetadata, this); } @@ -77,10 +78,12 @@ public void end() { builder.appendNullIfEmpty(); - Object record = model.newRecord(null, avroSchema); - model.setField(record, "metadata", metadataPos, metadata.getEncodedBuffer()); - model.setField(record, "value", valuePos, builder.encodedValue()); - parent.add(record); + Variant variant = builder.build(); + parent.add(variant); +// Object record = model.newRecord(null, avroSchema); +// model.setField(record, "metadata", metadataPos, metadata.getEncodedBuffer()); +// model.setField(record, "value", valuePos, builder.encodedValue()); +// parent.add(record); this.builder = null; } diff --git a/parquet-avro/src/test/java/org/apache/parquet/variant/JsonUtil.java b/parquet-avro/src/test/java/org/apache/parquet/variant/JsonUtil.java new file mode 100644 index 0000000000..9b0d0b2fec --- /dev/null +++ b/parquet-avro/src/test/java/org/apache/parquet/variant/JsonUtil.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.variant; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonFactoryBuilder; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.parquet.Preconditions; + + +public class JsonUtil { + + private JsonUtil() {} + + private static final JsonFactory FACTORY = + new JsonFactoryBuilder() + .configure(JsonFactory.Feature.INTERN_FIELD_NAMES, false) + .configure(JsonFactory.Feature.FAIL_ON_SYMBOL_HASH_OVERFLOW, false) + .build(); + private static final ObjectMapper MAPPER = new ObjectMapper(FACTORY); + + public static ObjectMapper mapper() { + return MAPPER; + } + + public static int getInt(String property, JsonNode node) { + Preconditions.checkArgument(node.has(property), "Cannot parse missing int: %s", property); + JsonNode pNode = node.get(property); + Preconditions.checkArgument( + pNode != null && !pNode.isNull() && pNode.isIntegralNumber() && pNode.canConvertToInt(), + "Cannot parse to an integer value: %s: %s", + property, + pNode); + return pNode.asInt(); + } + + public static String getString(String property, JsonNode node) { + Preconditions.checkArgument(node.has(property), "Cannot parse missing string: %s", property); + JsonNode pNode = node.get(property); + Preconditions.checkArgument( + pNode != null && !pNode.isNull() && pNode.isTextual(), + "Cannot parse to a string value: %s: %s", + property, + pNode); + return pNode.asText(); + } + + public static String getStringOrNull(String property, JsonNode node) { + if (!node.has(property)) { + return null; + } + JsonNode pNode = node.get(property); + if (pNode != null && pNode.isNull()) { + return null; + } + return getString(property, node); + } + +} diff --git a/parquet-avro/src/test/java/org/apache/parquet/variant/TestVariantReadsFromFile.java b/parquet-avro/src/test/java/org/apache/parquet/variant/TestVariantReadsFromFile.java new file mode 100644 index 0000000000..fae17687af --- /dev/null +++ b/parquet-avro/src/test/java/org/apache/parquet/variant/TestVariantReadsFromFile.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.variant; + + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Streams; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.file.Paths; +import java.util.List; +import java.util.stream.Stream; +import org.apache.parquet.Preconditions; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.LocalInputFile; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.apache.avro.generic.GenericRecord; +import org.apache.parquet.avro.AvroParquetReader; + + +public class TestVariantReadsFromFile { + // Set this location to generated variant test cases + private static final String CASE_LOCATION = null; + + private static Stream cases() throws IOException { + if (CASE_LOCATION == null) { + return Stream.of(JsonUtil.mapper().readValue("{\"case_number\": -1}", JsonNode.class)); + } + + InputFile caseJsonInput = new LocalInputFile(Paths.get(CASE_LOCATION + "/cases.json")); + JsonNode cases = JsonUtil.mapper().readValue(caseJsonInput.newStream(), JsonNode.class); + Preconditions.checkArgument( + cases != null && cases.isArray(), "Invalid case JSON, not an array: %s", caseJsonInput); + + return Streams.stream(cases); + } + + private static Stream errorCases() throws IOException { + return cases() + .filter(caseNode -> caseNode.has("error_message") || !caseNode.has("parquet_file")) + .map( + caseNode -> { + int caseNumber = JsonUtil.getInt("case_number", caseNode); + String testName = JsonUtil.getStringOrNull("test", caseNode); + String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode); + String errorMessage = JsonUtil.getStringOrNull("error_message", caseNode); + return Arguments.of(caseNumber, testName, parquetFile, errorMessage); + }); + } + + private static Stream singleVariantCases() throws IOException { + return cases() + .filter(caseNode -> caseNode.has("variant_file") || !caseNode.has("parquet_file")) + .map( + caseNode -> { + int caseNumber = JsonUtil.getInt("case_number", caseNode); + String testName = JsonUtil.getStringOrNull("test", caseNode); + String variant = JsonUtil.getStringOrNull("variant", caseNode); + String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode); + String variantFile = JsonUtil.getStringOrNull("variant_file", caseNode); + return Arguments.of(caseNumber, testName, variant, parquetFile, variantFile); + }); + } + + private static Stream multiVariantCases() throws IOException { + return cases() + .filter(caseNode -> caseNode.has("variant_files") || !caseNode.has("parquet_file")) + .map( + caseNode -> { + int caseNumber = JsonUtil.getInt("case_number", caseNode); + String testName = JsonUtil.getStringOrNull("test", caseNode); + String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode); + List variantFiles = + caseNode.has("variant_files") + ? Lists.newArrayList( + Iterables.transform( + caseNode.get("variant_files"), + node -> node == null || node.isNull() ? null : node.asText())) + : null; + String variants = JsonUtil.getStringOrNull("variants", caseNode); + return Arguments.of(caseNumber, testName, variants, parquetFile, variantFiles); + }); + } + + @ParameterizedTest + @MethodSource("errorCases") + public void testError(int caseNumber, String testName, String parquetFile, String errorMessage) { + if (parquetFile == null) { + return; + } + + Assertions.assertThatThrownBy(() -> readParquet(parquetFile)) + .as("Test case %s: %s", caseNumber, testName); + //.hasMessage(errorMessage); + } + + @ParameterizedTest + @MethodSource("singleVariantCases") + public void testSingleVariant( + int caseNumber, String testName, String variant, String parquetFile, String variantFile) + throws IOException { + if (parquetFile == null) { + return; + } + + Variant expected = readVariant(variantFile); + + GenericRecord record = readParquetRecord(parquetFile); + Assertions.assertThat(record.get("var")).isInstanceOf(Variant.class); + Variant actual = (Variant) record.get("var"); + assertEqual(expected, actual); + } + + @ParameterizedTest + @MethodSource("multiVariantCases") + public void testMultiVariant( + int caseNumber, + String testName, + String variants, + String parquetFile, + List variantFiles) + throws IOException { + if (parquetFile == null) { + return; + } + + List records = readParquet(parquetFile); + + for (int i = 0; i < records.size(); i += 1) { + GenericRecord record = records.get(i); + String variantFile = variantFiles.get(i); + + if (variantFile != null) { + Variant expected = readVariant(variantFile); + Assertions.assertThat(record.get("var")).isInstanceOf(Variant.class); + Variant actual = (Variant) record.get("var"); + assertEqual(expected, actual); + } else { + Assertions.assertThat(record.get("var")).isNull(); + } + } + } + + private static byte[] readAllBytes(InputStream in) throws IOException { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + byte[] data = new byte[4096]; + int nRead; + while ((nRead = in.read(data, 0, data.length)) != -1) { + buffer.write(data, 0, nRead); + } + return buffer.toByteArray(); + } + + private Variant readVariant(String variantFile) throws IOException { + try (InputStream in = new LocalInputFile(Paths.get(CASE_LOCATION + "/" + variantFile)).newStream()) { + byte[] variantBytes = readAllBytes(in); + ByteBuffer variantBuffer = ByteBuffer.wrap(variantBytes); + + byte header = variantBytes[0]; + int offsetSize = 1 + ((header & 0b11000000) >> 6); + int dictSize = VariantUtil.readUnsigned(variantBuffer, 1, offsetSize); + int offsetListOffset = 1 + offsetSize; + int dataOffset = offsetListOffset + ((1 + dictSize) * offsetSize); + int endOffset = dataOffset + VariantUtil.readUnsigned(variantBuffer, offsetListOffset + (offsetSize * dictSize), offsetSize); + + return new Variant(VariantUtil.slice(variantBuffer, endOffset), variantBuffer); + } + } + + private GenericRecord readParquetRecord(String parquetFile) throws IOException { + return Iterables.getOnlyElement(readParquet(parquetFile)); + } + + private List readParquet(String parquetFile) throws IOException { + org.apache.parquet.io.InputFile inputFile = new LocalInputFile(Paths.get(CASE_LOCATION + "/" + parquetFile)); + List records = Lists.newArrayList(); + try (org.apache.parquet.hadoop.ParquetReader reader = AvroParquetReader.builder(inputFile).build()) { + GenericRecord record; + while ((record = reader.read()) != null) { + records.add(record); + } + } + return records; + } + + private static void assertEqual(Variant expected, Variant actual) { + assertThat(actual).isNotNull(); + assertThat(expected).isNotNull(); + assertThat(actual.getType()).isEqualTo(expected.getType()); + + switch (expected.getType()) { + case NULL: + // nothing to compare + break; + case BOOLEAN: + assertThat(actual.getBoolean()).isEqualTo(expected.getBoolean()); + break; + case BYTE: + assertThat(actual.getByte()).isEqualTo(expected.getByte()); + break; + case SHORT: + assertThat(actual.getShort()).isEqualTo(expected.getShort()); + break; + case INT: + case DATE: + assertThat(actual.getInt()).isEqualTo(expected.getInt()); + break; + case LONG: + case TIMESTAMP_TZ: + case TIMESTAMP_NTZ: + case TIME: + case TIMESTAMP_NANOS_TZ: + case TIMESTAMP_NANOS_NTZ: + assertThat(actual.getLong()).isEqualTo(expected.getLong()); + break; + case FLOAT: + assertThat(actual.getFloat()).isEqualTo(expected.getFloat()); + break; + case DOUBLE: + assertThat(actual.getDouble()).isEqualTo(expected.getDouble()); + break; + case DECIMAL4: + case DECIMAL8: + case DECIMAL16: + assertThat(actual.getDecimal()).isEqualTo(expected.getDecimal()); + break; + case STRING: + assertThat(actual.getString()).isEqualTo(expected.getString()); + break; + case BINARY: + assertThat(actual.getBinary()).isEqualTo(expected.getBinary()); + break; + case UUID: + assertThat(actual.getUUID()).isEqualTo(expected.getUUID()); + break; + case OBJECT: + assertThat(actual.numObjectElements()).isEqualTo(expected.numObjectElements()); + for (int i = 0; i < expected.numObjectElements(); ++i) { + Variant.ObjectField expectedField = expected.getFieldAtIndex(i); + Variant.ObjectField actualField = actual.getFieldAtIndex(i); + + assertThat(actualField.key).isEqualTo(expectedField.key); + assertEqual(actualField.value, actualField.value); + } + break; + case ARRAY: + assertThat(actual.numArrayElements()).isEqualTo(expected.numArrayElements()); + for (int i = 0; i < expected.numArrayElements(); ++i) { + assertEqual(expected.getElementAtIndex(i),actual.getElementAtIndex(i)); + } + break; + default: + throw new UnsupportedOperationException("Unknown Variant type: " + expected.getType()); + } + } +} From d07879a380bba42b5af12de77579d11bec5c2aa8 Mon Sep 17 00:00:00 2001 From: Aihua Xu Date: Tue, 29 Jul 2025 18:37:33 -0700 Subject: [PATCH 2/4] Compare record with Variant --- .../parquet/avro/AvroRecordConverter.java | 4 +- .../parquet/avro/AvroVariantConverter.java | 34 +++--- .../org/apache/parquet/variant/JsonUtil.java | 11 +- .../variant/TestVariantReadsFromFile.java | 114 +++++++++--------- 4 files changed, 81 insertions(+), 82 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java index eb80f8468b..4030e383c1 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java @@ -395,7 +395,9 @@ private static Converter newConverter( } return newStringConverter(schema, model, parent, validator); case RECORD: - if (type.getName().equals("var") || type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) { + if (type.getName().equals("var") + || type.getLogicalTypeAnnotation() + instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) { return new AvroVariantConverter(parent, type.asGroupType(), schema, model); } else { return new AvroRecordConverter(parent, type.asGroupType(), schema, model, validator); diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroVariantConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroVariantConverter.java index 56e8f99696..4e820122b5 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroVariantConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroVariantConverter.java @@ -21,13 +21,13 @@ import java.nio.ByteBuffer; import java.util.function.Consumer; import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; import org.apache.parquet.Preconditions; import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.schema.GroupType; import org.apache.parquet.variant.ImmutableMetadata; -import org.apache.parquet.variant.Variant; import org.apache.parquet.variant.VariantBuilder; import org.apache.parquet.variant.VariantConverters; @@ -35,11 +35,20 @@ * Converter for Variant values. */ class AvroVariantConverter extends GroupConverter implements VariantConverters.ParentConverter { + private static final Schema VARIANT_SCHEMA = SchemaBuilder.record("VariantRecord") + .fields() + .name("metadata") + .type() + .bytesType() + .noDefault() + .name("value") + .type() + .bytesType() + .noDefault() + .endRecord(); + private final ParentValueContainer parent; -// private final Schema avroSchema; -// private final GenericData model; -// private final int metadataPos; -// private final int valuePos; + private final GenericData model; private final GroupConverter wrappedConverter; private VariantBuilder builder = null; @@ -47,10 +56,7 @@ class AvroVariantConverter extends GroupConverter implements VariantConverters.P AvroVariantConverter(ParentValueContainer parent, GroupType variantGroup, Schema avroSchema, GenericData model) { this.parent = parent; -// this.avroSchema = avroSchema; -// this.metadataPos = avroSchema.getField("metadata").pos(); -// this.valuePos = avroSchema.getField("value").pos(); -// this.model = model; + this.model = model; this.wrappedConverter = VariantConverters.newVariantConverter(variantGroup, this::setMetadata, this); } @@ -78,12 +84,10 @@ public void end() { builder.appendNullIfEmpty(); - Variant variant = builder.build(); - parent.add(variant); -// Object record = model.newRecord(null, avroSchema); -// model.setField(record, "metadata", metadataPos, metadata.getEncodedBuffer()); -// model.setField(record, "value", valuePos, builder.encodedValue()); -// parent.add(record); + Object record = model.newRecord(null, VARIANT_SCHEMA); + model.setField(record, "metadata", 0, metadata.getEncodedBuffer()); + model.setField(record, "value", 1, builder.encodedValue()); + parent.add(record); this.builder = null; } diff --git a/parquet-avro/src/test/java/org/apache/parquet/variant/JsonUtil.java b/parquet-avro/src/test/java/org/apache/parquet/variant/JsonUtil.java index 9b0d0b2fec..887324867a 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/variant/JsonUtil.java +++ b/parquet-avro/src/test/java/org/apache/parquet/variant/JsonUtil.java @@ -24,16 +24,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.parquet.Preconditions; - public class JsonUtil { private JsonUtil() {} - private static final JsonFactory FACTORY = - new JsonFactoryBuilder() - .configure(JsonFactory.Feature.INTERN_FIELD_NAMES, false) - .configure(JsonFactory.Feature.FAIL_ON_SYMBOL_HASH_OVERFLOW, false) - .build(); + private static final JsonFactory FACTORY = new JsonFactoryBuilder() + .configure(JsonFactory.Feature.INTERN_FIELD_NAMES, false) + .configure(JsonFactory.Feature.FAIL_ON_SYMBOL_HASH_OVERFLOW, false) + .build(); private static final ObjectMapper MAPPER = new ObjectMapper(FACTORY); public static ObjectMapper mapper() { @@ -72,5 +70,4 @@ public static String getStringOrNull(String property, JsonNode node) { } return getString(property, node); } - } diff --git a/parquet-avro/src/test/java/org/apache/parquet/variant/TestVariantReadsFromFile.java b/parquet-avro/src/test/java/org/apache/parquet/variant/TestVariantReadsFromFile.java index fae17687af..3324fd7c92 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/variant/TestVariantReadsFromFile.java +++ b/parquet-avro/src/test/java/org/apache/parquet/variant/TestVariantReadsFromFile.java @@ -19,7 +19,6 @@ package org.apache.parquet.variant; - import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import com.fasterxml.jackson.databind.JsonNode; @@ -33,19 +32,18 @@ import java.nio.file.Paths; import java.util.List; import java.util.stream.Stream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.parquet.Preconditions; +import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.LocalInputFile; import org.assertj.core.api.Assertions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.apache.avro.generic.GenericRecord; -import org.apache.parquet.avro.AvroParquetReader; - public class TestVariantReadsFromFile { - // Set this location to generated variant test cases private static final String CASE_LOCATION = null; private static Stream cases() throws IOException { @@ -62,50 +60,42 @@ private static Stream cases() throws IOException { } private static Stream errorCases() throws IOException { - return cases() - .filter(caseNode -> caseNode.has("error_message") || !caseNode.has("parquet_file")) - .map( - caseNode -> { - int caseNumber = JsonUtil.getInt("case_number", caseNode); - String testName = JsonUtil.getStringOrNull("test", caseNode); - String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode); - String errorMessage = JsonUtil.getStringOrNull("error_message", caseNode); - return Arguments.of(caseNumber, testName, parquetFile, errorMessage); - }); + return cases().filter(caseNode -> caseNode.has("error_message") || !caseNode.has("parquet_file")) + .map(caseNode -> { + int caseNumber = JsonUtil.getInt("case_number", caseNode); + String testName = JsonUtil.getStringOrNull("test", caseNode); + String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode); + String errorMessage = JsonUtil.getStringOrNull("error_message", caseNode); + return Arguments.of(caseNumber, testName, parquetFile, errorMessage); + }); } private static Stream singleVariantCases() throws IOException { - return cases() - .filter(caseNode -> caseNode.has("variant_file") || !caseNode.has("parquet_file")) - .map( - caseNode -> { - int caseNumber = JsonUtil.getInt("case_number", caseNode); - String testName = JsonUtil.getStringOrNull("test", caseNode); - String variant = JsonUtil.getStringOrNull("variant", caseNode); - String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode); - String variantFile = JsonUtil.getStringOrNull("variant_file", caseNode); - return Arguments.of(caseNumber, testName, variant, parquetFile, variantFile); - }); + return cases().filter(caseNode -> caseNode.has("variant_file") || !caseNode.has("parquet_file")) + .map(caseNode -> { + int caseNumber = JsonUtil.getInt("case_number", caseNode); + String testName = JsonUtil.getStringOrNull("test", caseNode); + String variant = JsonUtil.getStringOrNull("variant", caseNode); + String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode); + String variantFile = JsonUtil.getStringOrNull("variant_file", caseNode); + return Arguments.of(caseNumber, testName, variant, parquetFile, variantFile); + }); } private static Stream multiVariantCases() throws IOException { - return cases() - .filter(caseNode -> caseNode.has("variant_files") || !caseNode.has("parquet_file")) - .map( - caseNode -> { - int caseNumber = JsonUtil.getInt("case_number", caseNode); - String testName = JsonUtil.getStringOrNull("test", caseNode); - String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode); - List variantFiles = - caseNode.has("variant_files") - ? Lists.newArrayList( - Iterables.transform( - caseNode.get("variant_files"), - node -> node == null || node.isNull() ? null : node.asText())) - : null; - String variants = JsonUtil.getStringOrNull("variants", caseNode); - return Arguments.of(caseNumber, testName, variants, parquetFile, variantFiles); - }); + return cases().filter(caseNode -> caseNode.has("variant_files") || !caseNode.has("parquet_file")) + .map(caseNode -> { + int caseNumber = JsonUtil.getInt("case_number", caseNode); + String testName = JsonUtil.getStringOrNull("test", caseNode); + String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode); + List variantFiles = caseNode.has("variant_files") + ? Lists.newArrayList(Iterables.transform( + caseNode.get("variant_files"), + node -> node == null || node.isNull() ? null : node.asText())) + : null; + String variants = JsonUtil.getStringOrNull("variants", caseNode); + return Arguments.of(caseNumber, testName, variants, parquetFile, variantFiles); + }); } @ParameterizedTest @@ -115,9 +105,8 @@ public void testError(int caseNumber, String testName, String parquetFile, Strin return; } - Assertions.assertThatThrownBy(() -> readParquet(parquetFile)) - .as("Test case %s: %s", caseNumber, testName); - //.hasMessage(errorMessage); + Assertions.assertThatThrownBy(() -> readParquet(parquetFile)).as("Test case %s: %s", caseNumber, testName); + // .hasMessage(errorMessage); } @ParameterizedTest @@ -132,19 +121,15 @@ public void testSingleVariant( Variant expected = readVariant(variantFile); GenericRecord record = readParquetRecord(parquetFile); - Assertions.assertThat(record.get("var")).isInstanceOf(Variant.class); - Variant actual = (Variant) record.get("var"); + Assertions.assertThat(record.get("var")).isInstanceOf(GenericData.Record.class); + GenericData.Record actual = (GenericData.Record) record.get("var"); assertEqual(expected, actual); } @ParameterizedTest @MethodSource("multiVariantCases") public void testMultiVariant( - int caseNumber, - String testName, - String variants, - String parquetFile, - List variantFiles) + int caseNumber, String testName, String variants, String parquetFile, List variantFiles) throws IOException { if (parquetFile == null) { return; @@ -158,8 +143,8 @@ public void testMultiVariant( if (variantFile != null) { Variant expected = readVariant(variantFile); - Assertions.assertThat(record.get("var")).isInstanceOf(Variant.class); - Variant actual = (Variant) record.get("var"); + Assertions.assertThat(record.get("var")).isInstanceOf(GenericData.Record.class); + GenericData.Record actual = (GenericData.Record) record.get("var"); assertEqual(expected, actual); } else { Assertions.assertThat(record.get("var")).isNull(); @@ -187,7 +172,8 @@ private Variant readVariant(String variantFile) throws IOException { int dictSize = VariantUtil.readUnsigned(variantBuffer, 1, offsetSize); int offsetListOffset = 1 + offsetSize; int dataOffset = offsetListOffset + ((1 + dictSize) * offsetSize); - int endOffset = dataOffset + VariantUtil.readUnsigned(variantBuffer, offsetListOffset + (offsetSize * dictSize), offsetSize); + int endOffset = dataOffset + + VariantUtil.readUnsigned(variantBuffer, offsetListOffset + (offsetSize * dictSize), offsetSize); return new Variant(VariantUtil.slice(variantBuffer, endOffset), variantBuffer); } @@ -200,7 +186,8 @@ private GenericRecord readParquetRecord(String parquetFile) throws IOException { private List readParquet(String parquetFile) throws IOException { org.apache.parquet.io.InputFile inputFile = new LocalInputFile(Paths.get(CASE_LOCATION + "/" + parquetFile)); List records = Lists.newArrayList(); - try (org.apache.parquet.hadoop.ParquetReader reader = AvroParquetReader.builder(inputFile).build()) { + try (org.apache.parquet.hadoop.ParquetReader reader = + AvroParquetReader.builder(inputFile).build()) { GenericRecord record; while ((record = reader.read()) != null) { records.add(record); @@ -209,9 +196,18 @@ private List readParquet(String parquetFile) throws IOException { return records; } + private static void assertEqual(Variant expected, GenericData.Record actualRecord) { + assertThat(actualRecord).isNotNull(); + assertThat(expected).isNotNull(); + Variant actual = new Variant((ByteBuffer) actualRecord.get("value"), (ByteBuffer) actualRecord.get("metadata")); + + assertEqual(expected, actual); + } + private static void assertEqual(Variant expected, Variant actual) { assertThat(actual).isNotNull(); assertThat(expected).isNotNull(); + assertThat(actual.getType()).isEqualTo(expected.getType()); switch (expected.getType()) { @@ -266,13 +262,13 @@ private static void assertEqual(Variant expected, Variant actual) { Variant.ObjectField actualField = actual.getFieldAtIndex(i); assertThat(actualField.key).isEqualTo(expectedField.key); - assertEqual(actualField.value, actualField.value); + assertEqual(expectedField.value, actualField.value); } break; case ARRAY: assertThat(actual.numArrayElements()).isEqualTo(expected.numArrayElements()); for (int i = 0; i < expected.numArrayElements(); ++i) { - assertEqual(expected.getElementAtIndex(i),actual.getElementAtIndex(i)); + assertEqual(expected.getElementAtIndex(i), actual.getElementAtIndex(i)); } break; default: From ee34713e4d906d61f95d2b09145945638b2e2296 Mon Sep 17 00:00:00 2001 From: Aihua Xu Date: Tue, 29 Jul 2025 22:25:46 -0700 Subject: [PATCH 3/4] Update against the test files with variant logical type --- parquet-avro/pom.xml | 12 ++++++++++++ .../org/apache/parquet/avro/AvroRecordConverter.java | 4 +--- .../org/apache/parquet/avro/AvroSchemaConverter.java | 7 ++----- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml index d659104e91..156f98e364 100644 --- a/parquet-avro/pom.xml +++ b/parquet-avro/pom.xml @@ -145,6 +145,18 @@ 5.12.2 test + + com.fasterxml.jackson.core + jackson-databind + ${jackson-databind.version} + test + + + com.fasterxml.jackson.core + jackson-core + ${jackson-databind.version} + test + diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java index 4030e383c1..340dc77220 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java @@ -395,9 +395,7 @@ private static Converter newConverter( } return newStringConverter(schema, model, parent, validator); case RECORD: - if (type.getName().equals("var") - || type.getLogicalTypeAnnotation() - instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) { + if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) { return new AvroVariantConverter(parent, type.asGroupType(), schema, model); } else { return new AvroRecordConverter(parent, type.asGroupType(), schema, model, validator); diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index 8e5a58df86..7ae55d7fb9 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -470,11 +470,8 @@ public Optional visit( @Override public Optional visit( LogicalTypeAnnotation.VariantLogicalTypeAnnotation variantLogicalType) { - String name = parquetGroupType.getName(); - List fields = new ArrayList<>(); - fields.add(new Schema.Field("metadata", Schema.create(Schema.Type.BYTES))); - fields.add(new Schema.Field("value", Schema.create(Schema.Type.BYTES))); - return of(Schema.createRecord(name, null, namespace(name, names), false, fields)); + return of( + convertFields(parquetGroupType.getName(), parquetGroupType.getFields(), names)); } }) .orElseThrow( From 46aa74303e4c1e1c2a93cdf7c64f71829b52f27e Mon Sep 17 00:00:00 2001 From: Aihua Xu Date: Mon, 11 Aug 2025 17:17:25 -0700 Subject: [PATCH 4/4] Update the test to pass avro read schema --- .../parquet/avro/AvroSchemaConverter.java | 7 ++++-- .../parquet/avro/AvroVariantConverter.java | 25 +++++++------------ .../variant/TestVariantReadsFromFile.java | 17 ++++++++++++- 3 files changed, 30 insertions(+), 19 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index 7ae55d7fb9..8e5a58df86 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -470,8 +470,11 @@ public Optional visit( @Override public Optional visit( LogicalTypeAnnotation.VariantLogicalTypeAnnotation variantLogicalType) { - return of( - convertFields(parquetGroupType.getName(), parquetGroupType.getFields(), names)); + String name = parquetGroupType.getName(); + List fields = new ArrayList<>(); + fields.add(new Schema.Field("metadata", Schema.create(Schema.Type.BYTES))); + fields.add(new Schema.Field("value", Schema.create(Schema.Type.BYTES))); + return of(Schema.createRecord(name, null, namespace(name, names), false, fields)); } }) .orElseThrow( diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroVariantConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroVariantConverter.java index 4e820122b5..765d87e71c 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroVariantConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroVariantConverter.java @@ -21,7 +21,6 @@ import java.nio.ByteBuffer; import java.util.function.Consumer; import org.apache.avro.Schema; -import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; import org.apache.parquet.Preconditions; import org.apache.parquet.io.api.Converter; @@ -35,20 +34,11 @@ * Converter for Variant values. */ class AvroVariantConverter extends GroupConverter implements VariantConverters.ParentConverter { - private static final Schema VARIANT_SCHEMA = SchemaBuilder.record("VariantRecord") - .fields() - .name("metadata") - .type() - .bytesType() - .noDefault() - .name("value") - .type() - .bytesType() - .noDefault() - .endRecord(); - private final ParentValueContainer parent; + private final Schema avroSchema; private final GenericData model; + private final int metadataPos; + private final int valuePos; private final GroupConverter wrappedConverter; private VariantBuilder builder = null; @@ -56,6 +46,9 @@ class AvroVariantConverter extends GroupConverter implements VariantConverters.P AvroVariantConverter(ParentValueContainer parent, GroupType variantGroup, Schema avroSchema, GenericData model) { this.parent = parent; + this.avroSchema = avroSchema; + this.metadataPos = avroSchema.getField("metadata").pos(); + this.valuePos = avroSchema.getField("value").pos(); this.model = model; this.wrappedConverter = VariantConverters.newVariantConverter(variantGroup, this::setMetadata, this); } @@ -84,9 +77,9 @@ public void end() { builder.appendNullIfEmpty(); - Object record = model.newRecord(null, VARIANT_SCHEMA); - model.setField(record, "metadata", 0, metadata.getEncodedBuffer()); - model.setField(record, "value", 1, builder.encodedValue()); + Object record = model.newRecord(null, avroSchema); + model.setField(record, "metadata", metadataPos, metadata.getEncodedBuffer()); + model.setField(record, "value", valuePos, builder.encodedValue()); parent.add(record); this.builder = null; diff --git a/parquet-avro/src/test/java/org/apache/parquet/variant/TestVariantReadsFromFile.java b/parquet-avro/src/test/java/org/apache/parquet/variant/TestVariantReadsFromFile.java index 3324fd7c92..c00c989037 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/variant/TestVariantReadsFromFile.java +++ b/parquet-avro/src/test/java/org/apache/parquet/variant/TestVariantReadsFromFile.java @@ -32,12 +32,18 @@ import java.nio.file.Paths; import java.util.List; import java.util.stream.Stream; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.Preconditions; import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.LocalInputFile; +import org.apache.parquet.schema.MessageType; import org.assertj.core.api.Assertions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -186,8 +192,17 @@ private GenericRecord readParquetRecord(String parquetFile) throws IOException { private List readParquet(String parquetFile) throws IOException { org.apache.parquet.io.InputFile inputFile = new LocalInputFile(Paths.get(CASE_LOCATION + "/" + parquetFile)); List records = Lists.newArrayList(); + + // Set Avro read schema converted from the Parquet schema + Configuration conf = new Configuration(false); + try (ParquetFileReader fileReader = ParquetFileReader.open(inputFile)) { + final MessageType fileSchema = fileReader.getFileMetaData().getSchema(); + Schema avroReadSchema = new AvroSchemaConverter().convert(fileSchema); + AvroReadSupport.setAvroReadSchema(conf, avroReadSchema); + } + try (org.apache.parquet.hadoop.ParquetReader reader = - AvroParquetReader.builder(inputFile).build()) { + AvroParquetReader.builder(inputFile).withConf(conf).build()) { GenericRecord record; while ((record = reader.read()) != null) { records.add(record);