Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9ac2ff8
Ensure MOR table works, with lance base files and avro logs file
rahil-c Oct 23, 2025
a81c694
fix style
rahil-c Jan 2, 2026
c496518
minor
rahil-c Jan 2, 2026
ff1bcfd
version downgrade in lance spark and lance core due to arrow issue fo…
rahil-c Jan 2, 2026
e56ed4a
retrigger ci
rahil-c Jan 2, 2026
decc2b3
cleanup
the-other-tim-brown Jan 8, 2026
b427e30
add compaction validation
the-other-tim-brown Jan 8, 2026
78829f1
refactor test to reduce code duplication, add clustering validation, …
the-other-tim-brown Jan 8, 2026
e26ff82
Add schema evolution support for add column
rahil-c Oct 26, 2025
30f145b
Merge branch 'master' into rahil/schema-evolution
rahil-c Jan 16, 2026
b48ac64
fix on top of latest master
rahil-c Jan 16, 2026
12f301f
use lance basic schema evolution
rahil-c Jan 16, 2026
0379320
fix memory leak exception
rahil-c Jan 19, 2026
1944f97
add basic primitive type evolution support
rahil-c Jan 20, 2026
f841a51
Merge branch 'master' into rahil/schema-evolution
rahil-c Jan 20, 2026
933946e
Address feedback to reuse existing code for schema evolution
rahil-c Jan 27, 2026
9cd9efb
try out tim patch, plus additional changes in fgreader tests, for run…
rahil-c Jan 28, 2026
8998c30
Merge branch 'master' into rahil/schema-evolution
rahil-c Jan 28, 2026
98dff86
handle making nested structs nullable to address lance limitation
rahil-c Jan 28, 2026
dc50226
wiring for multi format reader
rahil-c Jan 28, 2026
c560e52
fix testReadFileGroupInMergeOnReadTable for lance cases, in populateM…
rahil-c Jan 28, 2026
ffc4ad5
got nested padding working on basic nested test
rahil-c Jan 29, 2026
4cec7d3
got fgreaders spark test fully passing
rahil-c Jan 29, 2026
d69381e
try to refactor shared schema evoltuions to SparkSchemaTransformUtils
rahil-c Jan 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public String getCustomPayload() {
}

@Override
public void commitToTable(List<HoodieRecord> recordList, String operation, boolean firstCommit, Map<String, String> writeConfigs, String schemaStr) {
public void commitProcessedRecordsToTable(List<HoodieRecord> recordList, String operation, boolean firstCommit, Map<String, String> writeConfigs, String schemaStr) {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withEngineType(EngineType.JAVA)
.withEmbeddedTimelineServerEnabled(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public SparkReaderContextFactory(HoodieSparkEngineContext hoodieSparkEngineConte
if (metaClient.getTableConfig().isMultipleBaseFileFormatsEnabled()) {
SparkColumnarFileReader parquetFileReader = sparkAdapter.createParquetFileReader(false, sqlConf, options, configs);
SparkColumnarFileReader orcFileReader = getOrcFileReader(resolver, sqlConf, options, configs, sparkAdapter);
baseFileReaderBroadcast = jsc.broadcast(new MultipleColumnarFileFormatReader(parquetFileReader, orcFileReader));
SparkColumnarFileReader lanceFileReader = getLanceFileReader(sqlConf, options, configs, sparkAdapter);
baseFileReaderBroadcast = jsc.broadcast(new MultipleColumnarFileFormatReader(parquetFileReader, orcFileReader, lanceFileReader));
} else if (metaClient.getTableConfig().getBaseFileFormat() == HoodieFileFormat.ORC) {
SparkColumnarFileReader orcFileReader = getOrcFileReader(resolver, sqlConf, options, configs, sparkAdapter);
baseFileReaderBroadcast = jsc.broadcast(orcFileReader);
Expand Down Expand Up @@ -153,6 +154,17 @@ private static SparkColumnarFileReader getOrcFileReader(TableSchemaResolver reso
}
}

private static SparkColumnarFileReader getLanceFileReader(SQLConf sqlConf,
scala.collection.immutable.Map<String, String> options,
Configuration configs,
SparkAdapter sparkAdapter) {
try {
return sparkAdapter.createLanceFileReader(false, sqlConf, options, configs);
} catch (Exception e) {
throw new HoodieException("Failed to broadcast Lance file reader", e);
}
}

private static Configuration getHadoopConfiguration(Configuration configuration) {
// new Configuration() is critical so that we don't run into ConcurrentModificatonException
Configuration hadoopConf = new Configuration(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public ClosableIterator<UnsafeRow> getUnsafeRowIterator(HoodieSchema requestedSc
Option<MessageType> messageSchema = Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
boolean enableTimestampFieldRepair = storage.getConf().getBoolean(ENABLE_LOGICAL_TIMESTAMP_REPAIR, true);
StructType dataStructType = convertToStruct(enableTimestampFieldRepair ? SchemaRepair.repairLogicalTypes(getFileSchema(), messageSchema) : getFileSchema());
SparkBasicSchemaEvolution evolution = new SparkBasicSchemaEvolution(dataStructType, structSchema, SQLConf.get().sessionLocalTimeZone());
SparkBasicSchemaEvolution evolution = new SparkBasicSchemaEvolution(dataStructType, structSchema, SQLConf.get().sessionLocalTimeZone(), HoodieFileFormat.PARQUET);
String readSchemaJson = evolution.getRequestSchema().json();
SQLConf sqlConf = SQLConf.get();
storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, readSchemaJson);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, Attribute, Cast, CreateNamedStruct, CreateStruct, Expression, GetStructField, LambdaFunction, Literal, MapEntries, MapFromEntries, NamedLambdaVariable, UnsafeProjection}
import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampNTZType}

/**
* Format-agnostic utilities for Spark schema transformations including NULL padding
* and recursive type casting with workarounds for unsafe conversions.
*
* These utilities are used by file format readers that need to:
* - Pad missing columns with NULL literals (required for Lance)
* - Handle nested struct/array/map type conversions
* - Work around Spark unsafe cast issues (float->double, numeric->decimal)
*/
object SparkSchemaTransformUtils {

/**
* Generate UnsafeProjection that pads missing columns with NULL literals.
*
* IMPORTANT: This padding behavior is required for Lance format which needs explicit NULL
* values for missing columns.
*
* @param fullSchema Fields actually present in the file
* @param requiredSchema Target output schema (may have more fields than file)
* @param typeChangeInfos Map of field index to (targetType, readerType) for fields needing casting
* @param timeZoneId Session timezone for timestamp conversions
* @return UnsafeProjection that transforms file rows to required schema with NULL padding
*/
def generateProjectionWithPadding(
fullSchema: Seq[Attribute],
requiredSchema: StructType,
typeChangeInfos: java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, DataType]],
timeZoneId: Option[String]
): UnsafeProjection = {
// fullSchema will contain fields potentially not in file
// we also need to compare against the requiredSchema to do correct order and padding
val inputFieldMap = fullSchema.map(a => a.name -> a).toMap

// Build expressions for all required fields, padding missing columns with NULL
val expressions = requiredSchema.fields.zipWithIndex.map { case (field, i) =>
inputFieldMap.get(field.name) match {
case Some(attr) =>
// Field exists in file - apply casting if needed
if (typeChangeInfos.containsKey(i)) {
val dstType = typeChangeInfos.get(i).getLeft
// use filtered attr.dataType as source
// typeChangeInfos.getRight() contains unfiltered schema with extra nested fields
recursivelyCastExpressions(attr, attr.dataType, dstType, timeZoneId, padNestedFields = true)
} else if (needsNestedPadding(attr.dataType, field.dataType)) {
// Handle nested struct padding even when not in typeChangeInfos
recursivelyCastExpressions(attr, attr.dataType, field.dataType, timeZoneId, padNestedFields = true)
} else {
attr
}
case None =>
// Field missing from file, use NULL literal for padding
Literal(null, field.dataType)
}
}

GenerateUnsafeProjection.generate(expressions, fullSchema)
}

/**
* Recursively cast expressions with special handling for unsupported conversions.
* Supports nested structs, arrays, maps with optional NULL padding.
*
* @param expr Source expression to cast
* @param srcType Source data type
* @param dstType Destination data type
* @param timeZoneId Session timezone for timestamp conversions
* @param padNestedFields If true, adds NULL literals for missing nested struct fields (required for Lance).
* If false, assumes all fields exist (Parquet behavior - no padding needed).
* @return Casted expression with workarounds for unsafe conversions
*/
def recursivelyCastExpressions(
expr: Expression,
srcType: DataType,
dstType: DataType,
timeZoneId: Option[String],
padNestedFields: Boolean = false
): Expression = {
lazy val needTimeZone = Cast.needsTimeZone(srcType, dstType)
(srcType, dstType) match {
case (FloatType, DoubleType) =>
val toStr = Cast(expr, StringType, if (needTimeZone) timeZoneId else None)
Cast(toStr, dstType, if (needTimeZone) timeZoneId else None)
case (IntegerType | LongType | FloatType | DoubleType, dec: DecimalType) =>
val toStr = Cast(expr, StringType, if (needTimeZone) timeZoneId else None)
Cast(toStr, dec, if (needTimeZone) timeZoneId else None)
case (StringType, dec: DecimalType) =>
Cast(expr, dec, if (needTimeZone) timeZoneId else None)
case (StringType, DateType) =>
Cast(expr, DateType, if (needTimeZone) timeZoneId else None)
case (s: StructType, d: StructType) if padNestedFields || hasUnsupportedConversion(s, d) =>
val structFields = if (padNestedFields) {
// Padding path: required for Lance to handle missing nested fields
val srcFieldMap = s.fields.zipWithIndex.map { case (f, i) => f.name -> (f, i) }.toMap
d.fields.map { dstField =>
srcFieldMap.get(dstField.name) match {
case Some((srcField, srcIndex)) =>
val child = GetStructField(expr, srcIndex, Some(dstField.name))
recursivelyCastExpressions(child, srcField.dataType, dstField.dataType, timeZoneId, padNestedFields)
case None =>
Literal(null, dstField.dataType)
}
}
} else {
// No-padding path: Parquet behavior, all fields assumed to exist
s.fields.zip(d.fields).zipWithIndex.map {
case ((srcField, dstField), i) =>
val child = GetStructField(expr, i, Some(dstField.name))
recursivelyCastExpressions(child, srcField.dataType, dstField.dataType, timeZoneId, padNestedFields)
}
}
CreateNamedStruct(d.fields.zip(structFields).flatMap {
case (f, c) => Seq(Literal(f.name), c)
})
case (ArrayType(sElementType, containsNull), ArrayType(dElementType, _)) if hasUnsupportedConversion(sElementType, dElementType) =>
val lambdaVar = NamedLambdaVariable("element", sElementType, containsNull)
val body = recursivelyCastExpressions(lambdaVar, sElementType, dElementType, timeZoneId, padNestedFields)
val func = LambdaFunction(body, Seq(lambdaVar))
ArrayTransform(expr, func)
case (MapType(sKeyType, sValType, vnull), MapType(dKeyType, dValType, _))
if hasUnsupportedConversion(sKeyType, dKeyType) || hasUnsupportedConversion(sValType, dValType) =>
val kv = NamedLambdaVariable("kv", new StructType()
.add("key", sKeyType, nullable = false)
.add("value", sValType, nullable = vnull), nullable = false)
val newKey = recursivelyCastExpressions(GetStructField(kv, 0), sKeyType, dKeyType, timeZoneId, padNestedFields)
val newVal = recursivelyCastExpressions(GetStructField(kv, 1), sValType, dValType, timeZoneId, padNestedFields)
val entry = CreateStruct(Seq(newKey, newVal))
val func = LambdaFunction(entry, Seq(kv))
val transformed = ArrayTransform(MapEntries(expr), func)
MapFromEntries(transformed)
case _ =>
// most cases should be covered here we only need to do the recursive work for float to double
Cast(expr, dstType, if (needTimeZone) timeZoneId else None)
}
}

/**
* Used to determine if padding is required for nested struct fields.
* @param srcType Source data type
* @param dstType Destination data type
* @return true if destination has additional fields requiring NULL padding
*/
def needsNestedPadding(srcType: DataType, dstType: DataType): Boolean = (srcType, dstType) match {
// Need padding if destination has more fields or nested fields differ
case (StructType(srcFields), StructType(dstFields)) =>
dstFields.length > srcFields.length ||
srcFields.zip(dstFields).exists { case (sf, df) => needsNestedPadding(sf.dataType, df.dataType) }
case (ArrayType(srcElem, _), ArrayType(dstElem, _)) =>
needsNestedPadding(srcElem, dstElem)
case (MapType(srcKey, srcVal, _), MapType(dstKey, dstVal, _)) =>
// lance does not support map type so not needed currently
needsNestedPadding(srcKey, dstKey) || needsNestedPadding(srcVal, dstVal)
case _ => false
}

/**
* Check if a type conversion requires special handling (unsafe cast workaround).
* Caches results for performance.
*
* @param src Source data type
* @param dst Destination data type
* @return true if conversion needs special handling (e.g., float->double via string)
*/
def hasUnsupportedConversion(src: DataType, dst: DataType): Boolean = {
val addedCastCache = scala.collection.mutable.HashMap.empty[(DataType, DataType), Boolean]
addedCastCache.getOrElseUpdate((src, dst), {
(src, dst) match {
case (FloatType, DoubleType) => true
case (IntegerType, DecimalType()) => true
case (LongType, DecimalType()) => true
case (FloatType, DecimalType()) => true
case (DoubleType, DecimalType()) => true
case (StringType, DecimalType()) => true
case (StringType, DateType) => true
case (StructType(srcFields), StructType(dstFields)) =>
srcFields.zip(dstFields).exists { case (sf, df) => hasUnsupportedConversion(sf.dataType, df.dataType) }
case (ArrayType(sElem, _), ArrayType(dElem, _)) =>
hasUnsupportedConversion(sElem, dElem)
case (MapType(sKey, sVal, _), MapType(dKey, dVal, _)) =>
hasUnsupportedConversion(sKey, dKey) || hasUnsupportedConversion(sVal, dVal)
case _ => false
}
})
}

/**
* Build schema change information by comparing file schema to required schema.*
* Analyzes schema differences to determine:
* 1. Which fields need type conversions (stored in typeChangeInfos map)
* 2. The adjusted reader schema to use when reading the file
*
* @param fileStruct Schema from the file (as Spark StructType)
* @param requiredSchema Schema requested by the query
* @return Tuple of (typeChangeInfos map, adjusted reader schema)
*/
def buildImplicitSchemaChangeInfo(
fileStruct: StructType,
requiredSchema: StructType
): (java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, DataType]], StructType) = {
val implicitTypeChangeInfo: java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, DataType]] = new java.util.HashMap()

val fileStructMap = fileStruct.fields.map(f => (f.name, f.dataType)).toMap
// if there are missing fields or if field's data type needs to be changed while reading, we handle it here.
val sparkRequestStructFields = requiredSchema.map(f => {
val requiredType = f.dataType
if (fileStructMap.contains(f.name) && !isDataTypeEqual(requiredType, fileStructMap(f.name))) {
val readerType = addMissingFields(requiredType, fileStructMap(f.name))
implicitTypeChangeInfo.put(new Integer(requiredSchema.fieldIndex(f.name)), org.apache.hudi.common.util.collection.Pair.of(requiredType, readerType))
StructField(f.name, readerType, f.nullable)
} else {
f
}
})
(implicitTypeChangeInfo, StructType(sparkRequestStructFields))
}

/**
* Check if two Spark data types are equal for schema evolution purposes.
* Ignores nullability and handles special cases like TimestampNTZ stored as Long.
*
* @param requiredType Type requested by query
* @param fileType Type from file schema
* @return true if types are compatible for reading
*/
def isDataTypeEqual(requiredType: DataType, fileType: DataType): Boolean = (requiredType, fileType) match {
case (requiredType, fileType) if requiredType == fileType => true

// prevent illegal cast - TimestampNTZ can be stored as Long in files
case (TimestampNTZType, LongType) => true

case (ArrayType(rt, _), ArrayType(ft, _)) =>
// Do not care about nullability as schema evolution require fields to be nullable
isDataTypeEqual(rt, ft)

case (MapType(requiredKey, requiredValue, _), MapType(fileKey, fileValue, _)) =>
// Likewise, do not care about nullability as schema evolution require fields to be nullable
isDataTypeEqual(requiredKey, fileKey) && isDataTypeEqual(requiredValue, fileValue)

case (StructType(requiredFields), StructType(fileFields)) =>
// Find fields that are in requiredFields and fileFields as they might not be the same during add column + change column operations
val commonFieldNames = requiredFields.map(_.name) intersect fileFields.map(_.name)

// Need to match by name instead of StructField as name will stay the same whilst type may change
val fileFilteredFields = fileFields.filter(f => commonFieldNames.contains(f.name)).sortWith(_.name < _.name)
val requiredFilteredFields = requiredFields.filter(f => commonFieldNames.contains(f.name)).sortWith(_.name < _.name)

// Sorting ensures that the same field names are being compared for type differences
requiredFilteredFields.zip(fileFilteredFields).forall {
case (requiredField, fileFilteredField) =>
isDataTypeEqual(requiredField.dataType, fileFilteredField.dataType)
}

case _ => false
}

/**
* Add missing nested fields from required type to file type.
* Used during schema evolution when query expects fields that don't exist in file.
*
* @param requiredType Type requested by query (may have extra fields)
* @param fileType Type from file schema
* @return Reconciled type that includes both file and required fields
*/
def addMissingFields(requiredType: DataType, fileType: DataType): DataType = (requiredType, fileType) match {
case (requiredType, fileType) if requiredType == fileType => fileType
case (ArrayType(rt, _), ArrayType(ft, _)) => ArrayType(addMissingFields(rt, ft))
case (MapType(requiredKey, requiredValue, _), MapType(fileKey, fileValue, _)) =>
MapType(addMissingFields(requiredKey, fileKey), addMissingFields(requiredValue, fileValue))
case (StructType(requiredFields), StructType(fileFields)) =>
val fileFieldMap = fileFields.map(f => f.name -> f).toMap
StructType(requiredFields.map(f => {
fileFieldMap.get(f.name) match {
case Some(ff) => StructField(ff.name, addMissingFields(f.dataType, ff.dataType), ff.nullable, ff.metadata)
case None => f
}
}))
case _ => fileType
}
}
Loading
Loading