Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.variant.Variant;
import org.apache.parquet.variant.VariantValueWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -181,9 +184,58 @@ public void write(T record) {
}

private void writeRecord(GroupType schema, Schema avroSchema, Object record) {
recordConsumer.startGroup();
writeRecordFields(schema, avroSchema, record);
recordConsumer.endGroup();
if (schema.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) {
writeVariantFields(schema, avroSchema, record);
} else {
recordConsumer.startGroup();
writeRecordFields(schema, avroSchema, record);
recordConsumer.endGroup();
}
}

private void writeVariantFields(GroupType schema, Schema avroSchema, Object record) {
List<Type> fields = schema.getFields();
List<Schema.Field> avroFields = avroSchema.getFields();
boolean binarySchema = true;
ByteBuffer metadata = null;
ByteBuffer value = null;
// Extract the value and metadata binary.
for (int index = 0; index < avroFields.size(); index++) {
Schema.Field avroField = avroFields.get(index);
Schema fieldSchema = AvroSchemaConverter.getNonNull(avroField.schema());
if (!fieldSchema.getType().equals(Schema.Type.BYTES)) {
binarySchema = false;
break;
}
Type fieldType = fields.get(index);
if (fieldType.getName() == "value") {
Object valueObj = model.getField(record, avroField.name(), index);
if (valueObj instanceof byte[]) {
value = ByteBuffer.wrap((byte[]) valueObj);
} else {
value = (ByteBuffer) valueObj;
}
} else if (fieldType.getName() == "metadata") {
Object metadataObj = model.getField(record, avroField.name(), index);
if (metadataObj instanceof byte[]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the type passed in here not known? I think that the representation used for metadata and value should use (and validate) an expected type. Probably ByteBuffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just copying the code used for writing BYTES: https://github.com/apache/parquet-java/blob/master/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java#L357. But I can enforce that it's always ByteBuffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this was only done for metadata and not for value. Since it matches bytes, I don't really mind allowing both, but it does make this longer than necessary.

I'd also recommend using Preconditions.checkArgument rather than using a separate if that throws a generic RuntimeException.

metadata = ByteBuffer.wrap((byte[]) metadataObj);
} else {
metadata = (ByteBuffer) metadataObj;
}
} else {
binarySchema = false;
break;
}
}

if (binarySchema) {
VariantValueWriter.write(recordConsumer, schema, new Variant(value, metadata));
} else {
// If the schema was something other than value and metaadata, treat the value as a non-variant record.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: metaa

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this should be a failure. writeVariantFields is called when there is a variant logical type annotation. If the record is not actually a variant then it is better to fail quickly instead of trying to recover by treating it like a record and then failing for some unknown reason later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. My thought was that someone might want to write data that has already been shredded - i.e. annotate the Parquet type as Variant, but directly write using an Avro record that already matches the shredded schema. But if there's a real use case for that, and no other workaround, it can always be added later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @rdblue, I realized that this was needed in order to allow the tests in TestReadVariant.java to work, because those tests are doing what I described in this comment: writing already-shredded data with the logical annotation on the top-level struct.

I pushed a commit to explicitly check for the cases of having exactly three fields in the schema, or having two fields with typed_value, which is also valid, but otherwise leave in the stricter checking that was added based on this comment. Let me know if you think this is okay, or if you prefer a different approach.

recordConsumer.startGroup();
writeRecordFields(schema, avroSchema, record);
recordConsumer.endGroup();
}
}

private void writeRecordFields(GroupType schema, Schema avroSchema, Object record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.variant.Variant;
import org.junit.Assert;
import org.junit.rules.TemporaryFolder;

Expand Down Expand Up @@ -129,4 +130,36 @@ public static Configuration conf(String name, boolean value) {
conf.setBoolean(name, value);
return conf;
}

/**
* Assert that to Variant values are logically equivalent.
* E.g. object fields may be ordered differently in the binary.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean values? The fields themselves must have the same order because the ID list must be in lexicographical order with respect to the keys.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I meant. Is values in an object may be ordered differently better?

*/
static void assertEquivalent(Variant expected, Variant actual) {
Assert.assertEquals(expected.getType(), actual.getType());
switch (expected.getType()) {
case STRING:
// Short strings may use the compact or extended representation.
Assert.assertEquals(expected.getString(), actual.getString());
break;
case ARRAY:
Assert.assertEquals(expected.numArrayElements(), actual.numArrayElements());
for (int i = 0; i < expected.numArrayElements(); ++i) {
assertEquivalent(expected.getElementAtIndex(i), actual.getElementAtIndex(i));
}
break;
case OBJECT:
Assert.assertEquals(expected.numObjectElements(), actual.numObjectElements());
for (int i = 0; i < expected.numObjectElements(); ++i) {
Variant.ObjectField expectedField = expected.getFieldAtIndex(i);
Variant.ObjectField actualField = actual.getFieldAtIndex(i);
Assert.assertEquals(expectedField.key, actualField.key);
assertEquivalent(expectedField.value, actualField.value);
}
break;
default:
// All other types have a single representation, and must be bit-for-bit identical.
Assert.assertEquals(expected.getValueBuffer(), actual.getValueBuffer());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2111,36 +2111,8 @@ void assertThrows(Callable callable, Class<? extends Exception> exception, Strin
void assertEquivalent(ByteBuffer expectedMetadata, ByteBuffer expectedValue, GenericRecord actual) {
assertEquals(expectedMetadata, (ByteBuffer) actual.get("metadata"));
assertEquals(expectedMetadata, (ByteBuffer) actual.get("metadata"));
assertEquivalent(
AvroTestUtil.assertEquivalent(
new Variant(expectedValue, expectedMetadata),
new Variant(((ByteBuffer) actual.get("value")), expectedMetadata));
}

void assertEquivalent(Variant expected, Variant actual) {
assertEquals(expected.getType(), actual.getType());
switch (expected.getType()) {
case STRING:
// Short strings may use the compact or extended representation.
assertEquals(expected.getString(), actual.getString());
break;
case ARRAY:
assertEquals(expected.numArrayElements(), actual.numArrayElements());
for (int i = 0; i < expected.numArrayElements(); ++i) {
assertEquivalent(expected.getElementAtIndex(i), actual.getElementAtIndex(i));
}
break;
case OBJECT:
assertEquals(expected.numObjectElements(), actual.numObjectElements());
for (int i = 0; i < expected.numObjectElements(); ++i) {
Variant.ObjectField expectedField = expected.getFieldAtIndex(i);
Variant.ObjectField actualField = actual.getFieldAtIndex(i);
assertEquals(expectedField.key, actualField.key);
assertEquivalent(expectedField.value, actualField.value);
}
break;
default:
// All other types have a single representation, and must be bit-for-bit identical.
assertEquals(expected.getValueBuffer(), actual.getValueBuffer());
}
}
}
Loading