Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.HoodieLazyInsertIterable.HoodieInsertValueGenResult;
Expand Down Expand Up @@ -111,7 +110,7 @@ public void consume(HoodieInsertValueGenResult<HoodieRecord> genResult) {
record.getPartitionPath(), idPrefix, taskContextSupplier);
handles.put(partitionPath, handle);
}
handle.write(record, HoodieSchema.fromAvroSchema(genResult.schema), config.getProps());
handle.write(record, genResult.schema, config.getProps());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.ExecutorFactory;

import org.apache.avro.Schema;

import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
Expand Down Expand Up @@ -76,9 +75,9 @@ public HoodieLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr, boolean are
// Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread.
public static class HoodieInsertValueGenResult<R extends HoodieRecord> {
private final R record;
public final Schema schema;
public final HoodieSchema schema;

public HoodieInsertValueGenResult(R record, Schema schema) {
public HoodieInsertValueGenResult(R record, HoodieSchema schema) {
this.record = record;
this.schema = schema;
}
Expand All @@ -92,12 +91,12 @@ public R getResult() {
* Transformer function to help transform a HoodieRecord. This transformer is used by BufferedIterator to offload some
* expensive operations of transformation to the reader thread.
*/
public <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformer(Schema schema,
public <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformer(HoodieSchema schema,
HoodieWriteConfig writeConfig) {
return getTransformerInternal(schema, writeConfig);
}

public static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformerInternal(Schema schema,
public static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformerInternal(HoodieSchema schema,
HoodieWriteConfig writeConfig) {
// NOTE: Whether record have to be cloned here is determined based on the executor type used
// for writing: executors relying on an inner queue, will be keeping references to the records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,10 @@ private static <R> Option<HoodieRecord<R>> mergeIncomingWithExistingRecordWithEx
//record is inserted or updated
String partitionPath = inferPartitionPath(incoming, existing, writeSchemaWithMetaFields, keyGenerator, existingRecordContext, mergeResult);
HoodieRecord<R> result = existingRecordContext.constructHoodieRecord(mergeResult, partitionPath);
HoodieRecord<R> withMeta = result.prependMetaFields(writeSchema.toAvroSchema(), writeSchemaWithMetaFields.toAvroSchema(),
HoodieRecord<R> withMeta = result.prependMetaFields(writeSchema, writeSchemaWithMetaFields,
new MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(partitionPath), properties);
return Option.of(withMeta.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields.toAvroSchema(), properties, Option.empty(),
config.allowOperationMetadataField(), Option.empty(), false, Option.of(writeSchema.toAvroSchema())));
return Option.of(withMeta.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields, properties, Option.empty(),
config.allowOperationMetadataField(), Option.empty(), false, Option.of(writeSchema)));
}

private static <R> String inferPartitionPath(HoodieRecord<R> incoming, HoodieRecord<R> existing, HoodieSchema recordSchema, BaseKeyGenerator keyGenerator,
Expand Down Expand Up @@ -488,11 +488,11 @@ private static <R> Option<HoodieRecord<R>> mergeIncomingWithExistingRecord(
return Option.of(existingRecordContext.constructHoodieRecord(mergeResult, partitionPath));
} else {
HoodieRecord<R> result = existingRecordContext.constructHoodieRecord(mergeResult, partitionPath);
HoodieRecord<R> resultWithMetaFields = result.prependMetaFields(writeSchema.toAvroSchema(), writeSchemaWithMetaFields.toAvroSchema(),
HoodieRecord<R> resultWithMetaFields = result.prependMetaFields(writeSchema, writeSchemaWithMetaFields,
new MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(partitionPath), properties);
// the merged record needs to be converted back to the original payload
return Option.of(resultWithMetaFields.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields.toAvroSchema(), properties, Option.empty(),
config.allowOperationMetadataField(), Option.empty(), false, Option.of(writeSchema.toAvroSchema())));
return Option.of(resultWithMetaFields.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields, properties, Option.empty(),
config.allowOperationMetadataField(), Option.empty(), false, Option.of(writeSchema)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected void doWrite(HoodieRecord record, HoodieSchema schema, TypedProperties
Option<Map<String, String>> recordMetadata = getRecordMetadata(record, schema, props);
try {
if (!HoodieOperation.isDelete(record.getOperation()) && !record.isDelete(deleteContext, config.getProps())) {
if (record.shouldIgnore(schema.toAvroSchema(), config.getProps())) {
if (record.shouldIgnore(schema, config.getProps())) {
return;
}

Expand Down Expand Up @@ -153,7 +153,7 @@ protected void writeRecordToFile(HoodieRecord record, HoodieSchema schema) throw
fileWriter.write(record.getRecordKey(), populatedRecord, writeSchemaWithMetaFields, config.getProps());
} else {
// rewrite the record to include metadata fields in schema, and the values will be set later.
record = record.prependMetaFields(schema.toAvroSchema(), writeSchemaWithMetaFields.toAvroSchema(), new MetadataValues(), config.getProps());
record = record.prependMetaFields(schema, writeSchemaWithMetaFields, new MetadataValues(), config.getProps());
if (isSecondaryIndexStatsStreamingWritesEnabled) {
SecondaryIndexStreamingTracker.trackSecondaryIndexStats(record, writeStatus, writeSchemaWithMetaFields, secondaryIndexDefns, config);
}
Expand All @@ -163,7 +163,7 @@ record = record.prependMetaFields(schema.toAvroSchema(), writeSchemaWithMetaFiel

protected HoodieRecord<T> updateFileName(HoodieRecord<T> record, HoodieSchema schema, HoodieSchema targetSchema, String fileName, Properties prop) {
MetadataValues metadataValues = new MetadataValues().setFileName(fileName);
return record.prependMetaFields(schema.toAvroSchema(), targetSchema.toAvroSchema(), metadataValues, prop);
return record.prependMetaFields(schema, targetSchema, metadataValues, prop);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,14 +632,14 @@ private void writeToBuffer(HoodieRecord<T> record) {

private void bufferInsertAndUpdate(HoodieSchema schema, HoodieRecord<T> hoodieRecord, boolean isUpdateRecord) throws IOException {
// Check if the record should be ignored (special case for [[ExpressionPayload]])
if (hoodieRecord.shouldIgnore(schema.toAvroSchema(), recordProperties)) {
if (hoodieRecord.shouldIgnore(schema, recordProperties)) {
return;
}

// Prepend meta-fields into the record
MetadataValues metadataValues = populateMetadataFields(hoodieRecord);
HoodieRecord populatedRecord =
hoodieRecord.prependMetaFields(schema.toAvroSchema(), writeSchemaWithMetaFields.toAvroSchema(), metadataValues, recordProperties);
hoodieRecord.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, recordProperties);

// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
Expand All @@ -661,7 +661,7 @@ private void bufferDelete(HoodieRecord<T> hoodieRecord) {
recordsDeleted++;

// store ordering value with Java type.
final Comparable<?> orderingVal = hoodieRecord.getOrderingValueAsJava(writeSchema.toAvroSchema(), recordProperties, orderingFields);
final Comparable<?> orderingVal = hoodieRecord.getOrderingValueAsJava(writeSchema, recordProperties, orderingFields);
long position = baseFileInstantTimeOfPositions.isPresent() ? hoodieRecord.getCurrentPosition() : -1L;
recordsToDeleteWithPositions.add(Pair.of(DeleteRecord.create(hoodieRecord.getKey(), orderingVal), position));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTa
@Override
public void write(HoodieRecord oldRecord) {
HoodieSchema oldSchema = config.populateMetaFields() ? writeSchemaWithMetaFields : writeSchema;
String key = oldRecord.getRecordKey(oldSchema.toAvroSchema(), keyGeneratorOpt);
String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
try {
// NOTE: We're enforcing preservation of the record metadata to keep existing semantic
writeToFile(new HoodieKey(key, partitionPath), oldRecord, oldSchema, config.getPayloadConfig().getProps(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protected void writeInsertRecord(HoodieRecord<T> newRecord) throws IOException {
HoodieRecord<T> savedRecord = newRecord.newInstance();
super.writeInsertRecord(newRecord);
if (!HoodieOperation.isDelete(newRecord.getOperation()) && !savedRecord.isDelete(deleteContext, config.getPayloadConfig().getProps())) {
cdcLogger.put(newRecord, null, savedRecord.toIndexedRecord(schema.toAvroSchema(), config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData));
cdcLogger.put(newRecord, null, savedRecord.toIndexedRecord(schema, config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, Hoo
public void write(HoodieRecord oldRecord) {
HoodieSchema oldSchema = config.populateMetaFields() ? writeSchemaWithMetaFields : writeSchema;
HoodieSchema newSchema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
String key = oldRecord.getRecordKey(oldSchema.toAvroSchema(), keyGeneratorOpt);
String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);

// To maintain overall sorted order across updates and inserts, write any new inserts whose keys are less than
// the oldRecord's key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public boolean preFileCreation(HoodieLogFile logFile) {

protected static Option<IndexedRecord> toAvroRecord(HoodieRecord record, HoodieSchema writerSchema, TypedProperties props) {
try {
return record.toIndexedRecord(writerSchema.toAvroSchema(), props).map(HoodieAvroIndexedRecord::getData);
return record.toIndexedRecord(writerSchema, props).map(HoodieAvroIndexedRecord::getData);
} catch (IOException e) {
log.error("Failed to convert to IndexedRecord", e);
return Option.empty();
Expand All @@ -353,13 +353,13 @@ protected static Option<IndexedRecord> toAvroRecord(HoodieRecord record, HoodieS
protected Option<Map<String, String>> getRecordMetadata(HoodieRecord record, HoodieSchema schema, Properties props) {
Option<Map<String, String>> recordMetadata = record.getMetadata();
if (isTrackingEventTimeWatermark) {
Object eventTime = record.getColumnValueAsJava(schema.toAvroSchema(), eventTimeFieldName, props);
Object eventTime = record.getColumnValueAsJava(schema, eventTimeFieldName, props);
if (eventTime != null) {
// Append event_time.
Option<HoodieSchemaField> field = HoodieSchemaUtils.findNestedField(schema, eventTimeFieldName);
// Field should definitely exist.
eventTime = record.convertColumnValueForLogicalType(
field.get().schema().toAvroSchema(), eventTime, keepConsistentLogicalTimestamp);
field.get().schema(), eventTime, keepConsistentLogicalTimestamp);
Map<String, String> metadata = recordMetadata.orElse(new HashMap<>());
metadata.put(METADATA_EVENT_TIME_KEY, String.valueOf(eventTime));
return Option.of(metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> o
protected void writeInsertRecord(HoodieRecord<T> newRecord) throws IOException {
HoodieSchema schema = getNewSchema();
// just skip the ignored record
if (newRecord.shouldIgnore(schema.toAvroSchema(), config.getProps())) {
if (newRecord.shouldIgnore(schema, config.getProps())) {
return;
}
writeInsertRecord(newRecord, schema, config.getProps());
Expand Down Expand Up @@ -359,7 +359,7 @@ public void write(HoodieRecord<T> oldRecord) {
HoodieSchema oldSchema = writeSchemaWithMetaFields;
HoodieSchema newSchema = getNewSchema();
boolean copyOldRecord = true;
String key = oldRecord.getRecordKey(oldSchema.toAvroSchema(), keyGeneratorOpt);
String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
TypedProperties props = config.getPayloadConfig().getProps();
if (keyToNewRecords.containsKey(key)) {
// If we have duplicate records that we are updating, then the hoodie record will be deflated after
Expand All @@ -372,7 +372,7 @@ public void write(HoodieRecord<T> oldRecord) {
BufferedRecord<T> mergeResult = recordMerger.merge(oldBufferedRecord, newBufferedRecord, readerContext.getRecordContext(), props);
HoodieSchema combineRecordSchema = readerContext.getRecordContext().getSchemaFromBufferRecord(mergeResult);
HoodieRecord combinedRecord = readerContext.getRecordContext().constructHoodieRecord(mergeResult);
if (combinedRecord.shouldIgnore(combineRecordSchema.toAvroSchema(), props)) {
if (combinedRecord.shouldIgnore(combineRecordSchema, props)) {
// If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
copyOldRecord = true;
} else if (writeUpdateRecord(newRecord, oldRecord, combinedRecord, combineRecordSchema)) {
Expand Down Expand Up @@ -409,13 +409,11 @@ protected void writeToFile(HoodieKey key, HoodieRecord<T> record, HoodieSchema s
if (shouldPreserveRecordMetadata) {
// NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
// file holding this record even in cases when overall metadata is preserved
HoodieRecord populatedRecord = record.updateMetaField(schema.toAvroSchema(), HoodieRecord.FILENAME_META_FIELD_ORD, newFilePath.getName());
//TODO boundary to follow up in later pr to use HoodieSchema directly
HoodieRecord populatedRecord = record.updateMetaField(schema, HoodieRecord.FILENAME_META_FIELD_ORD, newFilePath.getName());
fileWriter.write(key.getRecordKey(), populatedRecord, writeSchemaWithMetaFields, props);
} else {
// rewrite the record to include metadata fields in schema, and the values will be set later.
//TODO boundary to follow up in later pr to use HoodieSchema directly
record = record.prependMetaFields(schema.toAvroSchema(), writeSchemaWithMetaFields.toAvroSchema(), new MetadataValues(), config.getProps());
record = record.prependMetaFields(schema, writeSchemaWithMetaFields, new MetadataValues(), config.getProps());
fileWriter.writeWithMetadata(key, record, writeSchemaWithMetaFields, props);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ static void trackSecondaryIndexStats(HoodieRecord record, WriteStatus writeStatu
List<HoodieIndexDefinition> secondaryIndexDefns, HoodieWriteConfig config) {
// Add secondary index records for all the inserted records (including null values)
secondaryIndexDefns.forEach(def -> {
Object secondaryKey = record.getColumnValueAsJava(writeSchemaWithMetaFields.toAvroSchema(), def.getSourceFieldsKey(), config.getProps());
Object secondaryKey = record.getColumnValueAsJava(writeSchemaWithMetaFields, def.getSourceFieldsKey(), config.getProps());
addSecondaryIndexStat(writeStatus, def.getIndexName(), record.getRecordKey(), secondaryKey, false);
});
}
Expand Down Expand Up @@ -177,7 +177,7 @@ static <T> void trackSecondaryIndexStats(@Nullable HoodieKey hoodieKey, HoodieRe
Object oldSecondaryKey = null;

if (hasOldValue) {
oldSecondaryKey = oldRecord.getColumnValueAsJava(writeSchemaWithMetaFields.toAvroSchema(), secondaryIndexSourceField, config.getProps());
oldSecondaryKey = oldRecord.getColumnValueAsJava(writeSchemaWithMetaFields, secondaryIndexSourceField, config.getProps());
}

// For new/combined record
Expand All @@ -186,7 +186,7 @@ static <T> void trackSecondaryIndexStats(@Nullable HoodieKey hoodieKey, HoodieRe

if (!isDelete) {
HoodieSchema newSchema = newSchemaSupplier.get();
newSecondaryKey = combinedRecord.getColumnValueAsJava(newSchema.toAvroSchema(), secondaryIndexSourceField, config.getProps());
newSecondaryKey = combinedRecord.getColumnValueAsJava(newSchema, secondaryIndexSourceField, config.getProps());
hasNewValue = true;
}

Expand All @@ -210,7 +210,7 @@ static <T> void trackSecondaryIndexStats(@Nullable HoodieKey hoodieKey, HoodieRe
// 4. Old record does not exist, new record does not exist - do nothing
if (shouldUpdate) {
String recordKey = Option.ofNullable(hoodieKey).map(HoodieKey::getRecordKey)
.or(() -> Option.ofNullable(oldRecord).map(rec -> rec.getRecordKey(writeSchemaWithMetaFields.toAvroSchema(), keyGeneratorOpt)))
.or(() -> Option.ofNullable(oldRecord).map(rec -> rec.getRecordKey(writeSchemaWithMetaFields, keyGeneratorOpt)))
.orElseGet(combinedRecord::getRecordKey);

// Delete old secondary index entry if old record exists.
Expand Down
Loading
Loading