Skip to content

Conversation

@rahil-c
Copy link
Collaborator

@rahil-c rahil-c commented Jan 16, 2026

Describe the issue this Pull Request addresses

Issue: #17627

Summary and Changelog

To rebase on master once following lance MOR support pr is landed: #17768

  • Ensure that basic schema evolution works when adding a new column to a hudi table with lance as the file format.
  • Changes to SparkLanceReaderBase for handling NULL padding when older file schema does not contain newer column from a recent table schema change.
  • Added test testSchemaEvolutionAddColumn in TestLanceDataSource

Impact

None

Risk Level

low

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@rahil-c rahil-c mentioned this pull request Jan 16, 2026
@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Jan 16, 2026
@rahil-c
Copy link
Collaborator Author

rahil-c commented Jan 16, 2026

@the-other-tim-brown @voonhous

Sharing some recent findings to give context to the current implementation, for maintaining separate classes

LanceBasicSchemaEvolution
LanceFileFormatHelper 

Instead of reusing SparkBasicSchemaEvolution and ParquetFileFormatHelper

Currently for our parquet schema evolution support, we rely on spark's native parquet reader for handling columns that do not exist in the file and as well as handling the null padding. When running a basic schema evolution test with parquet as the base file format I see it enters the following code VectorizedParquetRecordReader and has a variable called missingColumns

Screenshot 2026-01-16 at 10 59 03 AM

The missingColumns variable documents the following behavior around null padding

/**
   * For each leaf column, if it is in the set, it means the column is missing in the file and
   * we'll instead return NULLs.
   */
  private Set<ParquetColumn> missingColumns;

[https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spar[…]xecution/datasources/parquet/VectorizedParquetRecordReader.java]
(https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L97)

Unfortunately the Lance File reader does not behave this way and does not handle null padding. For example if you give the lance file reader a column that does not exist in the file it will immediately error out unlike parquet. For a quick test i tried passing the requiredSchema to the lanceReader.readAll and encountered the following runtime error.

Caused by: java.io.IOException: Failed to read Lance file: /var/folders/lm/0j1q1s_n09b4wgqkdqbzpbkm0000gn/T/junit-14064069028248189535/dataset/test_lance_schema_evolution_copy_on_write/20bc4ff6-1eb2-4068-b87d-4105182d7306-0_0-16-19_20260116105048079.lance
	at org.apache.spark.sql.execution.datasources.lance.SparkLanceReaderBase.read(SparkLanceReaderBase.scala:143)
	at org.apache.hudi.SparkFileFormatInternalRowReaderContext.getFileRecordIterator(SparkFileFormatInternalRowReaderContext.scala:105)
	at org.apache.hudi.common.engine.HoodieReaderContext.getFileRecordIterator(HoodieReaderContext.java:273)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader.makeBaseFileIterator(HoodieFileGroupReader.java:157)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader.initRecordIterators(HoodieFileGroupReader.java:129)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader.getBufferedRecordIterator(HoodieFileGroupReader.java:292)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader.getClosableHoodieRecordIterator(HoodieFileGroupReader.java:308)
	at org.apache.hudi.io.FileGroupReaderBasedMergeHandle.doMerge(FileGroupReaderBasedMergeHandle.java:269)
	... 38 more
Caused by: java.lang.RuntimeException: LanceError(Schema): Column email does not exist, /Users/runner/work/lance/lance/rust/lance-core/src/datatypes/schema.rs:265:31
	at com.lancedb.lance.file.LanceFileReader.readAllNative(Native Method)
	at com.lancedb.lance.file.LanceFileReader.readAll(LanceFileReader.java:138)
	at org.apache.spark.sql.execution.datasources.lance.SparkLanceReaderBase.read(SparkLanceReaderBase.scala:110)
	... 45 more

I also do not believe that the GenerateUnsafeProjection.generate handles the null padding either, since i ran into the following exception

Caused by: java.lang.IllegalStateException: Memory was leaked by query. Memory leaked: (64)
Allocator(hudi-arrow-SparkLanceReaderBase-data-/var/folders/lm/0j1q1s_n09b4wgqkdqbzpbkm0000gn/T/junit-9822325871446255147/dataset/test_lance_schema_evolution_copy_on_write/19f34eb0-e71d-4b91-b08f-b2fd95e266dc-0_0-16-19_20260116111900287.lance) 0/64/128/125829120 (res/actual/peak/limit)

	at org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:504)
	at org.apache.spark.sql.execution.datasources.lance.SparkLanceReaderBase.read(SparkLanceReaderBase.scala:141)
	at org.apache.hudi.SparkFileFormatInternalRowReaderContext.getFileRecordIterator(SparkFileFormatInternalRowReaderContext.scala:105)
	at org.apache.hudi.common.engine.HoodieReaderContext.getFileRecordIterator(HoodieReaderContext.java:273)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader.makeBaseFileIterator(HoodieFileGroupReader.java:157)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader.initRecordIterators(HoodieFileGroupReader.java:129)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader.getBufferedRecordIterator(HoodieFileGroupReader.java:292)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader.getClosableHoodieRecordIterator(HoodieFileGroupReader.java:308)
	at org.apache.hudi.io.FileGroupReaderBasedMergeHandle.doMerge(FileGroupReaderBasedMergeHandle.java:269)
	at org.apache.hudi.io.IOUtils.runMerge(IOUtils.java:120)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:392)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:358)
	... 35 more

After I explicitly provided a null valuie for fields that did not exist in the row
the testSchemaEvolutionAddColumn would pass. See LanceFileFormatHelper#generateUnsafeProjection.

@voonhous
Copy link
Member

Yeap, parquetReader does null padding. This is why InternalSchemaMerger adds a suffix to the schema of a column that has a field with 2 different fieldIds of different version.

This occurs during an operation like this:

schema_v1:

col_a STRING -- id: 1
col_b INT -- id: 2

schema operation:

DROP col_b;
ADD col_b UUID;

schema_v2:

col_a STRING -- id: 1
col_b UUID -- id: 3

When reading the latest snapshot, hudi will pass this schema to parquetReader for older filegroups of the latest snapshot that were written before the schema evolution operation:

col_a, col_bsuffix

This will cause col_b to return nulls, i.e. old data not be read out as the latest snapshot calls for col_b with a UUID type.

The unsafe projection makes sense to me by returning null LITERALs of the column's latest snapshot's data type.

// Create lance schema evolution helper
val evolution = new LanceBasicSchemaEvolution(
fileSchema,
requiredSchema,
Copy link
Member

@voonhous voonhous Jan 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a NIT, is there a difference between requestSchema and requiredSchema? We should keep the nomenclature the same. I traced the schema code 2 years ago and the number of *Schemas variables got me seeing stars.

Although there's LLMs that can help us read through and breakdown code now, it's still good practice to not rename variables that mean the same thing and keep nomenclature the same for the entire codebase as much as possible.

@rahil-c
Copy link
Collaborator Author

rahil-c commented Jan 19, 2026

Was able to dig further on this memory leak issue yesterday, which helped expose some bug in the existing code as well.

Caused by: java.lang.IllegalStateException: Memory was leaked by query. Memory leaked: (64)
Allocator(hudi-arrow-SparkLanceReaderBase-data-/var/folders/lm/0j1q1s_n09b4wgqkdqbzpbkm0000gn/T/junit-9822325871446255147/dataset/test_lance_schema_evolution_copy_on_write/19f34eb0-e71d-4b91-b08f-b2fd95e266dc-0_0-16-19_20260116111900287.lance) 0/64/128/125829120 (res/actual/peak/limit)

at org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:504)
at org.apache.spark.sql.execution.datasources.lance.SparkLanceReaderBase.read(SparkLanceReaderBase.scala:141)
at org.apache.hudi.SparkFileFormatInternalRowReaderContext.getFileRecordIterator(SparkFileFormatInternalRowReaderContext.scala:105)
at org.apache.hudi.common.engine.HoodieReaderContext.getFileRecordIterator(HoodieReaderContext.java:273)
at org.apache.hudi.common.table.read.HoodieFileGroupReader.makeBaseFileIterator(HoodieFileGroupReader.java:157)
at org.apache.hudi.common.table.read.HoodieFileGroupReader.initRecordIterators(HoodieFileGroupReader.java:129)
at org.apache.hudi.common.table.read.HoodieFileGroupReader.getBufferedRecordIterator(HoodieFileGroupReader.java:292)
at org.apache.hudi.common.table.read.HoodieFileGroupReader.getClosableHoodieRecordIterator(HoodieFileGroupReader.java:308)
at org.apache.hudi.io.FileGroupReaderBasedMergeHandle.doMerge(FileGroupReaderBasedMergeHandle.java:269)
at org.apache.hudi.io.IOUtils.runMerge(IOUtils.java:120)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:392)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:358)
... 35 more


After I explicitly provided a `null` valuie for fields that did not exist in the row the `testSchemaEvolutionAddColumn` would pass. See `LanceFileFormatHelper#generateUnsafeProjection`.

Currently if an exception occurs (such as not padding with null), the actual true exception would occur in LanceFileFormatHelper#generateUnsafeProjection would be the following:

Caused by: scala.MatchError: None (of class scala.None$)
   at org.apache.spark.sql.execution.datasources.lance.LanceFileFormatHelper$.$anonfun$generateUnsafeProjection$2(LanceFileFormatHelper.scala:135)
   at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
   at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
   at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
   at scala.collection.TraversableLike.map(TraversableLike.scala:286)
   at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
   at org.apache.spark.sql.execution.datasources.lance.LanceFileFormatHelper$.generateUnsafeProjection(LanceFileFormatHelper.scala:134)
   at org.apache.spark.sql.execution.datasources.lance.LanceBasicSchemaEvolution.generateUnsafeProjection(LanceBasicSchemaEvolution.scala:68)
   at org.apache.spark.sql.execution.datasources.lance.SparkLanceReaderBase.read(SparkLanceReaderBase.scala:131)
   ... 45 more

However this exception would end up getting suppressed due to the try catch logic we have.

  1. The generateUnsafeProjection in the try would fail https://github.com/apache/hudi/pull/17904/files#diff-bdccaaaeb061abdf550efec86661f9d3790c66d53e04b1ed2e9cf9a61ea06e13R126

  2. And then would enter this catch block https://github.com/apache/hudi/pull/17904/files#diff-bdccaaaeb061abdf550efec86661f9d3790c66d53e04b1ed2e9cf9a61ea06e13R141 which is currently catching all exceptions with Exception.

Now coming to why we are still getting a memory leak exception instead of eventually printing out the original exception stacktrace here https://github.com/apache/hudi/pull/17904/files#diff-bdccaaaeb061abdf550efec86661f9d3790c66d53e04b1ed2e9cf9a61ea06e13R142

This is due to the fact that we prematurely closing the allocator.close() https://github.com/apache/hudi/pull/17904/files#diff-bdccaaaeb061abdf550efec86661f9d3790c66d53e04b1ed2e9cf9a61ea06e13L135

even though technically the life cycle of the allocator should handled by the LanceRecordIterator#close https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java#L173.

Since the LanceRecordIterator still has reference to the allocator when we close prematurely ourselves in this line https://github.com/apache/hudi/pull/17904/files#diff-bdccaaaeb061abdf550efec86661f9d3790c66d53e04b1ed2e9cf9a61ea06e13R141 it ends up throwing that memory leak exception. We should ideally let the LanceRecordIterator close the allocator, however if an exception happens before the iterator is even created then we will need to close the allocator ourselves.

I have a fix for this flow that will try to put out for closing resources correctly when exceptions occur.

@rahil-c rahil-c mentioned this pull request Jan 20, 2026
@rahil-c rahil-c marked this pull request as ready for review January 20, 2026 23:58
@rahil-c rahil-c changed the title feat: Lance schema evolution add column support feat: Lance schema evolution (add column, type promotion) Jan 21, 2026
@rahil-c rahil-c mentioned this pull request Jan 21, 2026
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants