Skip to content

Conversation

@HuangZhenQiu
Copy link
Contributor

Describe the issue this Pull Request addresses

Add basic hudi source split reader functionality for MOR

Summary and Changelog

Added HoodieSourceSplitReader, HoodieRecordEmitter and BatchRecords and etc class
Added test cases for these classes

Impact

None

Risk Level

none

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Jan 3, 2026
@HuangZhenQiu HuangZhenQiu force-pushed the new-source-split-reader branch 2 times, most recently from c6f4bb2 to faa181f Compare January 3, 2026 17:12
@HuangZhenQiu HuangZhenQiu force-pushed the new-source-split-reader branch from faa181f to 93be7d8 Compare January 3, 2026 17:24
@hudi-bot
Copy link
Collaborator

hudi-bot commented Jan 3, 2026

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@xushiyan xushiyan self-assigned this Jan 5, 2026
@xushiyan xushiyan linked an issue Jan 5, 2026 that may be closed by this pull request
String fileId,
HoodieCDCFileSplit[] changes) {
super(splitNum, null, Option.empty(), "", tablePath,
super(splitNum, null, Option.empty(), "", tablePath, "",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there constant for empty str partition path

public static MergeOnReadInputSplit singleLogFile2Split(String tablePath, String filePath, long maxCompactionMemoryInBytes) {
return new MergeOnReadInputSplit(0, null, Option.of(Collections.singletonList(filePath)),
FSUtils.getDeltaCommitTimeFromLogPath(new StoragePath(filePath)), tablePath, maxCompactionMemoryInBytes,
FSUtils.getDeltaCommitTimeFromLogPath(new StoragePath(filePath)), tablePath, "", maxCompactionMemoryInBytes,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using the empty partition path constant makes this more readable

// source merge type
private final String mergeType;
// the latest commit instant time
private final String latestCommit;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling it the "latest" does not seem appropriate for the corresponding commit, as there will always be new commits from time to time. this property is more accurately to be as_of_instant_time

* limitations under the License.
*/

package org.apache.hudi.source.reader;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not under the function subpackage?

// the SourceOperator will stop processing and recycling the fetched batches. This exhausts the
// {@link ArrayPoolDataIteratorBatcher#pool} and the `currentReader.next()` call will be
// blocked even without split-level watermark alignment. Based on this the
// `pauseOrResumeSplits` and the `wakeUp` are left empty.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this multi-line comment to the method doc and add // no op here


@Override
public Set<String> finishedSplits() {
return finishedSplits;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will finishedSplits be populated?

Comment on lines +84 to +88
try (HoodieFileGroupReader<RowData> fileGroupReader = createFileGroupReader(split)) {
final ClosableIterator<RowData> recordIterator = fileGroupReader.getClosableIterator();
BatchRecords<RowData> records = BatchRecords.forRecords(splitId, recordIterator, split.getFileOffset(), split.getRecordOffset());
records.seek(split.getRecordOffset());
return records;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileGroupReader will be closed before records being consumed by the caller. fileGroupReader lifecycle should be managed at a higher level

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. The fileGroupReader will be closed when closing the ClosableIterator. We can close the iterator in BatchRecords#recycle().

private final HoodieReaderContext<RowData> readerContext;
private final HoodieSchema tableSchema;
private final HoodieSchema requiredSchema;
private final String mergeType;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

field not used.

.withShouldUseRecordPosition(true);

// Add schemas if provided
if (tableSchema != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataSchema annd requestedSchema cannot be null for file group reader.

private final HoodieRecordWithPosition<T> recordAndPosition;

// point to current read position within the records list
private int position;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

position value is increased but never accessed.


public void seek(long startingRecordOffset) {
for (long i = 0; i < startingRecordOffset; ++i) {
if (recordIterator.hasNext()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if position is necessary, it should be also increased here?

this.logPaths = logPaths;
this.latestCommit = latestCommit;
this.tablePath = tablePath;
this.partitionPath = partitionPath;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionPath seems not needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we can just pass around an empty string when building the file group reader now.

this.fileId = fileId;
}

public String getFileId() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary changes. We use lombok annotations now.

return toString();
}

public String getFileId() {
Copy link
Collaborator

@cshuo cshuo Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary changes. We use lombok annotations now.

/**
* Reader function implementation for Merge On Read table.
*/
public class MergeOnReadSplitReaderFunction<I, K, O> implements SplitReaderFunction<RowData> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieSourceSplitReaderFunction ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see the mini-batch read in this function, is it hanled automatically?

// We request a split only if we did not get splits during the checkpoint restore.
// Otherwise, reader restarts will keep requesting more and more splits.
if (getNumberOfCurrentlyAssignedSplits() == 0) {
requestSplit(new ArrayList<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use Collections.emptyList()

try (HoodieFileGroupReader<RowData> fileGroupReader = createFileGroupReader(split)) {
final ClosableIterator<RowData> recordIterator = fileGroupReader.getClosableIterator();
BatchRecords<RowData> records = BatchRecords.forRecords(splitId, recordIterator, split.getFileOffset(), split.getRecordOffset());
records.seek(split.getRecordOffset());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like the recordOffset and consumed works as the same functionality in HoodieSourceSplit: to bookeep the offset of the last consumed records, is it possible to unify these two?


@Override
public RecordsWithSplitIds<HoodieRecordWithPosition<T>> fetch() throws IOException {
HoodieSourceSplit nextSplit = splits.poll();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this removed the split from the queue right? if the read function errors out, the split will be lost?


@Override
public void close() throws Exception {
currentSplitId = null;
Copy link
Member

@xushiyan xushiyan Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about clean up other properties

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XL PR with lines of changes > 1000

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Create Hudi Source Split Reader

5 participants