Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public static HoodieLogBlock.HoodieLogBlockType getLogBlockType(HoodieWriteConfi
}
HoodieFileFormat baseFileFormat = getBaseFileFormat(writeConfig, tableConfig);
switch (getBaseFileFormat(writeConfig, tableConfig)) {
case LANCE:
case PARQUET:
case ORC:
return HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.SparkFileFormatInternalRecordContext;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.model.HoodieInternalRow;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaType;
Expand Down Expand Up @@ -327,7 +328,15 @@ public Option<HoodieAvroIndexedRecord> toIndexedRecord(HoodieSchema recordSchema

@Override
public ByteArrayOutputStream getAvroBytes(HoodieSchema recordSchema, Properties props) throws IOException {
throw new UnsupportedOperationException();
// Convert Spark InternalRow to Avro GenericRecord
if (data == null) {
throw new IOException("Cannot convert null data to Avro bytes");
}
StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
GenericRecord avroRecord = AvroConversionUtils
.createInternalRowToAvroConverter(structType, recordSchema.toAvroSchema(), false)
.apply(data);
return HoodieAvroUtils.avroToBytesStream(avroRecord);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@

package org.apache.hudi.io.storage;

import com.lancedb.lance.spark.arrow.LanceArrowWriter;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.io.lance.HoodieBaseLanceWriter;
import org.apache.hudi.io.storage.row.HoodieInternalRowFileWriter;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;

import com.lancedb.lance.spark.arrow.LanceArrowWriter;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.LanceArrowUtils;
Expand Down Expand Up @@ -113,26 +114,26 @@ public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws IOExcept
if (populateMetaFields) {
UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
updateRecordMetadata(row, recordKey, key.getPartitionPath(), getWrittenRecordCount());
super.write(row);
super.write(row.copy());
} else {
super.write(row);
super.write(row.copy());
}
}

@Override
public void writeRow(String recordKey, InternalRow row) throws IOException {
super.write(row);
super.write(row.copy());
}

@Override
public void writeRow(UTF8String key, InternalRow row) throws IOException {
// Key reserved for future bloom filter support (https://github.com/apache/hudi/issues/17664)
super.write(row);
super.write(row.copy());
}

@Override
public void writeRow(InternalRow row) throws IOException {
super.write(row);
super.write(row.copy());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,9 +516,6 @@ class HoodieSparkSqlWriterInternal {
// scalastyle:on

val writeConfig = client.getConfig
if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == MERGE_ON_READ && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) {
throw new UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} only support parquet log.")
}
instantTime = client.startCommit(commitActionType)
// if table has undergone upgrade, we need to reload table config
tableMetaClient.reloadTableConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import org.apache.hadoop.conf.Configuration
import org.apache.parquet.schema.MessageType
import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow}
import org.apache.spark.sql.execution.datasources.{PartitionedFile, SparkColumnarFileReader}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.LanceArrowUtils

import java.io.IOException
Expand Down Expand Up @@ -84,9 +84,20 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna
// Open Lance file reader
val lanceReader = LanceFileReader.open(filePath, allocator)

// Extract column names from required schema for projection
val columnNames: java.util.List[String] = if (requiredSchema.nonEmpty) {
requiredSchema.fields.map(_.name).toList.asJava
// Get schema from Lance file
val arrowSchema = lanceReader.schema()
val fileSchema = LanceArrowUtils.fromArrowSchema(arrowSchema)

// Read columns that currently exist in the file, as requested col may not be present
val fileFieldNames = fileSchema.fieldNames.toSet
val (existingFields, missingFields) = if (requiredSchema.nonEmpty) {
requiredSchema.fields.partition(f => fileFieldNames.contains(f.name))
} else {
(Array.empty[StructField], Array.empty[StructField])
}

val columnNames: java.util.List[String] = if (existingFields.nonEmpty) {
existingFields.map(_.name).toList.asJava
} else {
// If only partition columns requested, read minimal data
null
Expand All @@ -95,12 +106,13 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna
// Read data with column projection (filters not supported yet)
val arrowReader = lanceReader.readAll(columnNames, null, DEFAULT_BATCH_SIZE)

val schemaForIterator = if (requiredSchema.nonEmpty) {
requiredSchema
// Create schema for the data we're actually reading
val readSchema = StructType(existingFields)

val schemaForIterator = if (readSchema.nonEmpty) {
readSchema
} else {
// Only compute schema from Lance file when requiredSchema is empty
val arrowSchema = lanceReader.schema()
LanceArrowUtils.fromArrowSchema(arrowSchema)
fileSchema
}

// Create iterator using shared LanceRecordIterator
Expand All @@ -117,17 +129,57 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna
_.addTaskCompletionListener[Unit](_ => lanceIterator.close())
)

// Need to convert to scala iterator for proper reading
val iter = lanceIterator.asScala

// Need to convert to scala iterator for row merging
val baseIter = lanceIterator.asScala
// Add NULL padding for missing columns for schema evolution
val iterWithNulls = if (missingFields.nonEmpty) {
// Create a row with NULLs for missing columns
val nullRow = new GenericInternalRow(missingFields.length)
for (i <- missingFields.indices) {
nullRow.setNullAt(i)
}

// Reorder columns to match the requiredSchema order
val fieldIndexMap = requiredSchema.fields.zipWithIndex.map { case (field, idx) =>
field.name -> idx
}.toMap

val existingFieldIndices = existingFields.map(f => fieldIndexMap(f.name))
val missingFieldIndices = missingFields.map(f => fieldIndexMap(f.name))

baseIter.map { row =>
// Create result row with correct ordering
val resultRow = new GenericInternalRow(requiredSchema.length)

// Fill in existing columns
existingFieldIndices.zipWithIndex.foreach { case (targetIdx, sourceIdx) =>
if (row.isNullAt(sourceIdx)) {
resultRow.setNullAt(targetIdx)
} else {
resultRow.update(targetIdx, row.get(sourceIdx, existingFields(sourceIdx).dataType))
}
}

// Fill in missing columns with NULL
missingFieldIndices.foreach { targetIdx =>
resultRow.setNullAt(targetIdx)
}

resultRow.asInstanceOf[InternalRow]
}
} else {
baseIter.asInstanceOf[Iterator[InternalRow]]
}

// Handle partition columns
if (partitionSchema.length == 0) {
// No partition columns - return rows directly
iter.asInstanceOf[Iterator[InternalRow]]
iterWithNulls
} else {
// Append partition values to each row using JoinedRow
val joinedRow = new JoinedRow()
iter.map(row => joinedRow(row, file.partitionValues))
iterWithNulls.map(row => joinedRow(row, file.partitionValues))
}

} catch {
Expand Down
Loading
Loading