-
Notifications
You must be signed in to change notification settings - Fork 2.5k
feat: Ensure MOR table works, with lance base files and avro logs file #17768
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Ensure MOR table works, with lance base files and avro logs file #17768
Conversation
|
Running into issues with some tests hence draft state for now. |
| // scalastyle:on | ||
|
|
||
| val writeConfig = client.getConfig | ||
| if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == MERGE_ON_READ && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why this is needed for lance but not for the other file formats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally when I was running tests for MOR tables I ran into the following exception when the condition was present.
java.lang.UnsupportedOperationException: org.apache.hudi.DefaultSparkRecordMerger only support parquet log.
at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:513)
at org.apache.hudi.HoodieSparkSqlWriterInternal.$anonfun$write$1(HoodieSparkSqlWriter.scala:193)
at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:211)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:133)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:171)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
My assumption for why this condition is in hudi is that we did not implement the getAvroBytes originally in the HoodieSparkRecord as i see the following comment here saying that only parquet log is supported for spark and not avro.
https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java#L121
Since this change now implements getAvroBytes I thought this restriction of spark merger only supporting parquet log block instead of avro log does not seem necessary anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does the DefaultSparkRecordMerger get hit here? That path should be dead now that we have merge mode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we need to write through as Row the whole way for now since there is no avro implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 . I can take a closer pass at why we need the getAvroBytes now, and how parquet log vs avro log is working for existing table
|
Currently the following tests were failing on latest master for MOR case. However when running the original feature branch(older version of hudi and older version of lance) https://github.com/onehouseinc/hudi-internal/pull/1657 For now have downgraded the dependencies in order to have test pass. |
@rahil-c what is the issue with the newer version? |
...rk-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
Outdated
Show resolved
Hide resolved
...rk-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
Show resolved
Hide resolved
3b0709d to
b12bddb
Compare
| UTF8String recordKey = UTF8String.fromString(key.getRecordKey()); | ||
| updateRecordMetadata(row, recordKey, key.getPartitionPath(), getWrittenRecordCount()); | ||
| super.write(row); | ||
| super.write(row.copy()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is required when running bulk insert if incoming dataframe has more than 1 row. Follow up is filed to fix the underlying issue: #17808
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so is this an existing issue for Lance writer in general, unrelated to MoR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
b12bddb to
78829f1
Compare
|
@vinothchandar Was wondering if you can review/sign off on this when you get a chance? |
vinothchandar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took 1 pass. Will take a deeper pass in a IDE
| UTF8String recordKey = UTF8String.fromString(key.getRecordKey()); | ||
| updateRecordMetadata(row, recordKey, key.getPartitionPath(), getWrittenRecordCount()); | ||
| super.write(row); | ||
| super.write(row.copy()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so is this an existing issue for Lance writer in general, unrelated to MoR?
| // scalastyle:on | ||
|
|
||
| val writeConfig = client.getConfig | ||
| if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == MERGE_ON_READ && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 . I can take a closer pass at why we need the getAvroBytes now, and how parquet log vs avro log is working for existing table
| public ByteArrayOutputStream getAvroBytes(HoodieSchema recordSchema, Properties props) throws IOException { | ||
| throw new UnsupportedOperationException(); | ||
| // Convert Spark InternalRow to Avro GenericRecord | ||
| if (data == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change is not lance specific. So love to understand, why this becomes necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vinothchandar
Originally I hit the following exception in TestLanceDataSource#testBasicUpsertModifyExistingRow when trying to upsert on an existing row for the MOR case (where there should have been a lance base file but an avro log file).
You are correct however in that its not a lance specific issue, even switching the test setup to use "PARQUET" by changing this line https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala#L70
I was hitting the same issue below:
Caused by: org.apache.hudi.exception.HoodieAppendException: Failed while appending records to /var/folders/lm/0j1q1s_n09b4wgqkdqbzpbkm0000gn/T/junit-11448262777148643233/dataset/test_lance_upsert_merge_on_read/.3169035e-e73a-49ec-be8f-c7045242bf56-0_20260115220744098.log.1_0-38-60
at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:511)
at org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:470)
at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:82)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:358)
... 35 more
Caused by: java.lang.UnsupportedOperationException
at org.apache.hudi.common.model.HoodieSparkRecord.getAvroBytes(HoodieSparkRecord.java:331)
at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.serializeRecords(HoodieAvroDataBlock.java:122)
at org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(HoodieDataBlock.java:132)
at org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:147)
at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:503)
... 38 more
When examining the frames of the stack trace, i can see that is it going thru the upsert path and to HoodieAppendHandle

and attempts to write a log file in HoodieAppendHandle#appendDataAndDeleteBlocks, in the following code pointer, https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java#L503
The actual block seems to be the HoodieAvroDataBlock

which contains a method called serializeRecords
hudi/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
Line 122 in 30029e3
| ByteArrayOutputStream data = s.getAvroBytes(schema, props); |
The actual record being used in this case is HoodieSparkRecord which currently did not have a getAvroBytes hence why I implemented it for now.

vinothchandar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving the PR to unblock.
But, I'd love for us to just write lance files in the log.. like we do for parquet today. Avoid the avro logs as the default MoR write path here, given there can be "blobs" in the log files too. and avro cannot handle that easily


Describe the issue this Pull Request addresses
Issue #17626
This seeks to add support for MOR Table, and does checks to ensure bulk insert, insert, update, delete work successfully on this table type by generating avro log files alongside lance base files.
Summary and Changelog
getAvroBytesin HoodieSparkRecordImpact
None
Risk Level
low
Documentation Update
none
Contributor's checklist