Skip to content

Commit d292847

Browse files
feat: Ensure MOR table works, with lance base files and avro logs file (#17768)
* Ensure MOR table works, with lance base files and avro logs file * fix style * minor * version downgrade in lance spark and lance core due to arrow issue for MOR * retrigger ci * cleanup * add compaction validation * refactor test to reduce code duplication, add clustering validation, fix writer bug --------- Co-authored-by: Timothy Brown <tim@onehouse.ai>
1 parent c4eb637 commit d292847

File tree

5 files changed

+238
-290
lines changed

5 files changed

+238
-290
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ public static HoodieLogBlock.HoodieLogBlockType getLogBlockType(HoodieWriteConfi
108108
}
109109
HoodieFileFormat baseFileFormat = getBaseFileFormat(writeConfig, tableConfig);
110110
switch (getBaseFileFormat(writeConfig, tableConfig)) {
111+
case LANCE:
111112
case PARQUET:
112113
case ORC:
113114
return HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK;

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.hudi.HoodieSchemaConversionUtils;
2323
import org.apache.hudi.SparkAdapterSupport$;
2424
import org.apache.hudi.SparkFileFormatInternalRecordContext;
25+
import org.apache.hudi.avro.HoodieAvroUtils;
2526
import org.apache.hudi.client.model.HoodieInternalRow;
2627
import org.apache.hudi.common.schema.HoodieSchema;
2728
import org.apache.hudi.common.schema.HoodieSchemaType;
@@ -327,7 +328,15 @@ public Option<HoodieAvroIndexedRecord> toIndexedRecord(HoodieSchema recordSchema
327328

328329
@Override
329330
public ByteArrayOutputStream getAvroBytes(HoodieSchema recordSchema, Properties props) throws IOException {
330-
throw new UnsupportedOperationException();
331+
// Convert Spark InternalRow to Avro GenericRecord
332+
if (data == null) {
333+
throw new IOException("Cannot convert null data to Avro bytes");
334+
}
335+
StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
336+
GenericRecord avroRecord = AvroConversionUtils
337+
.createInternalRowToAvroConverter(structType, recordSchema.toAvroSchema(), false)
338+
.apply(data);
339+
return HoodieAvroUtils.avroToBytesStream(avroRecord);
331340
}
332341

333342
@Override

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,16 @@
1818

1919
package org.apache.hudi.io.storage;
2020

21-
import com.lancedb.lance.spark.arrow.LanceArrowWriter;
22-
import org.apache.arrow.vector.types.pojo.Schema;
2321
import org.apache.hudi.common.engine.TaskContextSupplier;
2422
import org.apache.hudi.common.model.HoodieKey;
2523
import org.apache.hudi.common.model.HoodieRecord;
2624
import org.apache.hudi.io.lance.HoodieBaseLanceWriter;
2725
import org.apache.hudi.io.storage.row.HoodieInternalRowFileWriter;
2826
import org.apache.hudi.storage.HoodieStorage;
2927
import org.apache.hudi.storage.StoragePath;
28+
29+
import com.lancedb.lance.spark.arrow.LanceArrowWriter;
30+
import org.apache.arrow.vector.types.pojo.Schema;
3031
import org.apache.spark.sql.catalyst.InternalRow;
3132
import org.apache.spark.sql.types.StructType;
3233
import org.apache.spark.sql.util.LanceArrowUtils;
@@ -113,26 +114,26 @@ public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws IOExcept
113114
if (populateMetaFields) {
114115
UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
115116
updateRecordMetadata(row, recordKey, key.getPartitionPath(), getWrittenRecordCount());
116-
super.write(row);
117+
super.write(row.copy());
117118
} else {
118-
super.write(row);
119+
super.write(row.copy());
119120
}
120121
}
121122

122123
@Override
123124
public void writeRow(String recordKey, InternalRow row) throws IOException {
124-
super.write(row);
125+
super.write(row.copy());
125126
}
126127

127128
@Override
128129
public void writeRow(UTF8String key, InternalRow row) throws IOException {
129130
// Key reserved for future bloom filter support (https://github.com/apache/hudi/issues/17664)
130-
super.write(row);
131+
super.write(row.copy());
131132
}
132133

133134
@Override
134135
public void writeRow(InternalRow row) throws IOException {
135-
super.write(row);
136+
super.write(row.copy());
136137
}
137138

138139
@Override

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -516,9 +516,6 @@ class HoodieSparkSqlWriterInternal {
516516
// scalastyle:on
517517

518518
val writeConfig = client.getConfig
519-
if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == MERGE_ON_READ && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) {
520-
throw new UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} only support parquet log.")
521-
}
522519
instantTime = client.startCommit(commitActionType)
523520
// if table has undergone upgrade, we need to reload table config
524521
tableMetaClient.reloadTableConfig()

0 commit comments

Comments
 (0)