Skip to content

Commit 88e0a16

Browse files
committed
refactor(schema): Optimize schema usage.
1 parent 8cc8fa2 commit 88e0a16

File tree

5 files changed

+20
-22
lines changed

5 files changed

+20
-22
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,17 +200,17 @@ private Option<Function<HoodieRecord, HoodieRecord>> composeSchemaEvolutionTrans
200200
.collect(Collectors.toList());
201201
InternalSchema mergedSchema = new InternalSchemaMerger(writeInternalSchema, querySchema,
202202
true, false, false).mergeSchema();
203-
Schema newWriterSchema = InternalSchemaConverter.convert(mergedSchema, writerSchema.getFullName()).getAvroSchema();
203+
HoodieSchema newWriterSchema = InternalSchemaConverter.convert(mergedSchema, writerSchema.getFullName());
204204
Schema writeSchemaFromFile = InternalSchemaConverter.convert(writeInternalSchema, newWriterSchema.getFullName()).getAvroSchema();
205-
boolean needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
206-
|| SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
205+
boolean needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size() || SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema.toAvroSchema(),
206+
writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
207207
if (needToReWriteRecord) {
208208
Map<String, String> renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
209209
return Option.of(record -> {
210210
return record.rewriteRecordWithNewSchema(
211211
recordSchema,
212212
writeConfig.getProps(),
213-
HoodieSchema.fromAvroSchema(newWriterSchema), renameCols);
213+
newWriterSchema, renameCols);
214214
});
215215
} else {
216216
return Option.empty();

hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import org.apache.hudi.storage.HoodieStorage;
4949
import org.apache.hudi.storage.StoragePath;
5050

51-
import org.apache.avro.Schema;
5251
import org.slf4j.Logger;
5352
import org.slf4j.LoggerFactory;
5453

@@ -628,13 +627,13 @@ private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpec
628627
? Option.empty()
629628
: Option.of(Pair.of(recordKeyField, partitionPathFieldOpt.orElse(null)));
630629

631-
Pair<ClosableIterator<HoodieRecord>, Schema> recordsIteratorSchemaPair =
630+
Pair<ClosableIterator<HoodieRecord>, HoodieSchema> recordsIteratorSchemaPair =
632631
getRecordsIterator(dataBlock, keySpecOpt);
633632

634633
try (ClosableIterator<HoodieRecord> recordIterator = recordsIteratorSchemaPair.getLeft()) {
635634
while (recordIterator.hasNext()) {
636635
HoodieRecord completedRecord = recordIterator.next()
637-
.wrapIntoHoodieRecordPayloadWithParams(HoodieSchema.fromAvroSchema(recordsIteratorSchemaPair.getRight()),
636+
.wrapIntoHoodieRecordPayloadWithParams(recordsIteratorSchemaPair.getRight(),
638637
hoodieTableMetaClient.getTableConfig().getProps(),
639638
recordKeyPartitionPathFieldPair,
640639
this.withOperationField,
@@ -799,7 +798,7 @@ public List<String> getValidBlockInstants() {
799798
return validBlockInstants;
800799
}
801800

802-
private Pair<ClosableIterator<HoodieRecord>, Schema> getRecordsIterator(
801+
private Pair<ClosableIterator<HoodieRecord>, HoodieSchema> getRecordsIterator(
803802
HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws IOException {
804803
ClosableIterator<HoodieRecord> blockRecordsIterator;
805804
if (keySpecOpt.isPresent()) {
@@ -810,7 +809,7 @@ private Pair<ClosableIterator<HoodieRecord>, Schema> getRecordsIterator(
810809
blockRecordsIterator = (ClosableIterator) dataBlock.getRecordIterator(recordType);
811810
}
812811

813-
Option<Pair<Function<HoodieRecord, HoodieRecord>, Schema>> schemaEvolutionTransformerOpt =
812+
Option<Pair<Function<HoodieRecord, HoodieRecord>, HoodieSchema>> schemaEvolutionTransformerOpt =
814813
composeEvolvedSchemaTransformer(dataBlock);
815814

816815
// In case when schema has been evolved original persisted records will have to be
@@ -819,8 +818,8 @@ private Pair<ClosableIterator<HoodieRecord>, Schema> getRecordsIterator(
819818
schemaEvolutionTransformerOpt.map(Pair::getLeft)
820819
.orElse(Function.identity());
821820

822-
Schema schema = schemaEvolutionTransformerOpt.map(Pair::getRight)
823-
.orElseGet(() -> dataBlock.getSchema().toAvroSchema());
821+
HoodieSchema schema = schemaEvolutionTransformerOpt.map(Pair::getRight)
822+
.orElseGet(() -> dataBlock.getSchema());
824823

825824
return Pair.of(new CloseableMappingIterator<>(blockRecordsIterator, transformer), schema);
826825
}
@@ -834,7 +833,7 @@ private Pair<ClosableIterator<HoodieRecord>, Schema> getRecordsIterator(
834833
* @param dataBlock current processed block
835834
* @return final read schema.
836835
*/
837-
private Option<Pair<Function<HoodieRecord, HoodieRecord>, Schema>> composeEvolvedSchemaTransformer(
836+
private Option<Pair<Function<HoodieRecord, HoodieRecord>, HoodieSchema>> composeEvolvedSchemaTransformer(
838837
HoodieDataBlock dataBlock) {
839838
if (internalSchema.isEmptySchema()) {
840839
return Option.empty();
@@ -844,13 +843,13 @@ private Option<Pair<Function<HoodieRecord, HoodieRecord>, Schema>> composeEvolve
844843
InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(currentInstantTime, hoodieTableMetaClient);
845844
InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, internalSchema,
846845
true, false).mergeSchema();
847-
Schema mergedAvroSchema = InternalSchemaConverter.convert(mergedInternalSchema, readerSchema.getFullName()).toAvroSchema();
846+
HoodieSchema mergedAvroSchema = InternalSchemaConverter.convert(mergedInternalSchema, readerSchema.getFullName());
848847

849848
return Option.of(Pair.of((record) -> {
850849
return record.rewriteRecordWithNewSchema(
851850
dataBlock.getSchema(),
852851
this.hoodieTableMetaClient.getTableConfig().getProps(),
853-
HoodieSchema.fromAvroSchema(mergedAvroSchema),
852+
mergedAvroSchema,
854853
Collections.emptyMap());
855854
}, mergedAvroSchema));
856855
}

hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818

1919
package org.apache.hudi.common.table.log.block;
2020

21-
import org.apache.hudi.avro.AvroSchemaCache;
2221
import org.apache.hudi.avro.HoodieAvroUtils;
2322
import org.apache.hudi.common.engine.HoodieReaderContext;
2423
import org.apache.hudi.common.fs.SizeAwareDataInputStream;
2524
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
2625
import org.apache.hudi.common.model.HoodieRecord;
2726
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
2827
import org.apache.hudi.common.schema.HoodieSchema;
28+
import org.apache.hudi.common.schema.HoodieSchemaCache;
2929
import org.apache.hudi.common.util.CollectionUtils;
3030
import org.apache.hudi.common.util.Option;
3131
import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -104,7 +104,7 @@ public HoodieLogBlockType getBlockType() {
104104

105105
@Override
106106
protected ByteArrayOutputStream serializeRecords(List<HoodieRecord> records, HoodieStorage storage) throws IOException {
107-
Schema schema = AvroSchemaCache.intern(new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)));
107+
HoodieSchema schema = HoodieSchemaCache.intern(HoodieSchema.parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)));
108108
ByteArrayOutputStream baos = new ByteArrayOutputStream();
109109
try (DataOutputStream output = new DataOutputStream(baos)) {
110110
// 1. Write out the log block version
@@ -119,7 +119,7 @@ protected ByteArrayOutputStream serializeRecords(List<HoodieRecord> records, Hoo
119119
try {
120120
// Encode the record into bytes
121121
// Spark Record not support write avro log
122-
ByteArrayOutputStream data = s.getAvroBytes(HoodieSchema.fromAvroSchema(schema), props);
122+
ByteArrayOutputStream data = s.getAvroBytes(schema, props);
123123
// Write the record size
124124
output.writeInt(data.size());
125125
// Write the content

hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -713,20 +713,20 @@ protected void validateOutputFromFileGroupReader(StorageConfiguration<?> storage
713713
String[] orderingFields) throws Exception {
714714
HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(storageConf, tablePath);
715715
HoodieSchema schema = new TableSchemaResolver(metaClient).getTableSchema();
716-
expectedHoodieRecords = getExpectedHoodieRecordsWithOrderingValue(expectedHoodieRecords, metaClient, schema.toAvroSchema());
717-
expectedHoodieUnmergedRecords = getExpectedHoodieRecordsWithOrderingValue(expectedHoodieUnmergedRecords, metaClient, schema.toAvroSchema());
716+
expectedHoodieRecords = getExpectedHoodieRecordsWithOrderingValue(expectedHoodieRecords, metaClient, schema);
717+
expectedHoodieUnmergedRecords = getExpectedHoodieRecordsWithOrderingValue(expectedHoodieUnmergedRecords, metaClient, schema);
718718
List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords = convertHoodieRecords(expectedHoodieRecords, schema.toAvroSchema(), orderingFields);
719719
List<HoodieTestDataGenerator.RecordIdentifier> expectedUnmergedRecords = convertHoodieRecords(expectedHoodieUnmergedRecords, schema.toAvroSchema(), orderingFields);
720720
validateOutputFromFileGroupReaderWithExistingRecords(
721721
storageConf, tablePath, containsBaseFile, expectedLogFileNum, recordMergeMode,
722722
expectedRecords, expectedUnmergedRecords);
723723
}
724724

725-
private static List<HoodieRecord> getExpectedHoodieRecordsWithOrderingValue(List<HoodieRecord> expectedHoodieRecords, HoodieTableMetaClient metaClient, Schema avroSchema) {
725+
private static List<HoodieRecord> getExpectedHoodieRecordsWithOrderingValue(List<HoodieRecord> expectedHoodieRecords, HoodieTableMetaClient metaClient, HoodieSchema schema) {
726726
return expectedHoodieRecords.stream().map(rec -> {
727727
List<String> orderingFields = metaClient.getTableConfig().getOrderingFields();
728728
HoodieAvroIndexedRecord avroRecord = ((HoodieAvroIndexedRecord) rec);
729-
Comparable orderingValue = OrderingValues.create(orderingFields, field -> (Comparable) avroRecord.getColumnValueAsJava(HoodieSchema.fromAvroSchema(avroSchema), field, new TypedProperties()));
729+
Comparable orderingValue = OrderingValues.create(orderingFields, field -> (Comparable) avroRecord.getColumnValueAsJava(schema, field, new TypedProperties()));
730730
return new HoodieAvroIndexedRecord(rec.getKey(), avroRecord.getData(), orderingValue);
731731
}).collect(Collectors.toList());
732732
}

hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,6 @@ public ByteArrayOutputStream serializeRecordsToLogBlock(HoodieStorage storage,
427427
config.setValue("hoodie.avro.schema", writerSchema.toString());
428428
HoodieRecord.HoodieRecordType recordType = records.iterator().next().getRecordType();
429429
try (HoodieFileWriter parquetWriter = HoodieFileWriterFactory.getFileWriter(
430-
//TODO boundary to revisit in follow up to use HoodieSchema directly
431430
HoodieFileFormat.PARQUET, outputStream, storage, config, writerSchema, recordType)) {
432431
for (HoodieRecord<?> record : records) {
433432
String recordKey = record.getRecordKey(readerSchema, keyFieldName);

0 commit comments

Comments
 (0)