Skip to content

Commit c6f4bb2

Browse files
committed
feat: the basic new hudi source reader
1 parent c177e2b commit c6f4bb2

23 files changed

+2027
-5
lines changed

hudi-flink-datasource/hudi-flink/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,12 @@
246246
<version>${flink.version}</version>
247247
<scope>provided</scope>
248248
</dependency>
249+
<dependency>
250+
<groupId>org.apache.flink</groupId>
251+
<artifactId>flink-connector-base</artifactId>
252+
<version>${flink.version}</version>
253+
<scope>provided</scope>
254+
</dependency>
249255
<dependency>
250256
<groupId>org.apache.flink</groupId>
251257
<artifactId>flink-metrics-dropwizard</artifactId>

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ private List<MergeOnReadInputSplit> getInputSplits(
409409
String latestCommit = InstantComparison.minInstant(fileSlice.getLatestInstantTime(), endInstant);
410410
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
411411
basePath, logPaths, latestCommit,
412-
metaClient.getBasePath().toString(), maxCompactionMemoryInBytes, mergeType, instantRange, fileSlice.getFileId());
412+
metaClient.getBasePath().toString(), fileSlice.getPartitionPath(), maxCompactionMemoryInBytes, mergeType, instantRange, fileSlice.getFileId());
413413
}).sorted(Comparator.comparing(MergeOnReadInputSplit::getLatestCommit)).collect(Collectors.toList());
414414
}
415415

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.source.reader;
20+
21+
import org.apache.hudi.common.util.ValidationUtils;
22+
import org.apache.hudi.common.util.collection.ClosableIterator;
23+
24+
import java.util.Collections;
25+
import java.util.Set;
26+
import javax.annotation.Nullable;
27+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
28+
29+
/**
30+
* Implementation of RecordsWithSplitIds with a list record inside.
31+
*
32+
* Type parameters: <T> – record type
33+
*/
34+
public class BatchRecords<T> implements RecordsWithSplitIds<HoodieRecordWithPosition<T>> {
35+
private String splitId;
36+
private final ClosableIterator<T> recordIterator;
37+
private final Set<String> finishedSplits;
38+
private final HoodieRecordWithPosition<T> recordAndPosition;
39+
40+
// point to current read position within the records list
41+
private int position;
42+
43+
BatchRecords(
44+
String splitId,
45+
ClosableIterator<T> recordIterator,
46+
int fileOffset,
47+
long startingRecordOffset,
48+
Set<String> finishedSplits) {
49+
ValidationUtils.checkArgument(
50+
finishedSplits != null, "finishedSplits can be empty but not null");
51+
ValidationUtils.checkArgument(
52+
recordIterator != null, "recordIterator can be empty but not null");
53+
54+
this.splitId = splitId;
55+
this.recordIterator = recordIterator;
56+
this.finishedSplits = finishedSplits;
57+
this.recordAndPosition = new HoodieRecordWithPosition<>();
58+
this.recordAndPosition.set(null, fileOffset, startingRecordOffset);
59+
this.position = 0;
60+
}
61+
62+
@Nullable
63+
@Override
64+
public String nextSplit() {
65+
String nextSplit = this.splitId;
66+
// set the splitId to null to indicate no more splits
67+
// this class only contains record for one split
68+
this.splitId = null;
69+
return nextSplit;
70+
}
71+
72+
@Nullable
73+
@Override
74+
public HoodieRecordWithPosition<T> nextRecordFromSplit() {
75+
if (recordIterator.hasNext()) {
76+
recordAndPosition.record(recordIterator.next());
77+
position++;
78+
return recordAndPosition;
79+
} else {
80+
return null;
81+
}
82+
}
83+
84+
@Override
85+
public Set<String> finishedSplits() {
86+
return finishedSplits;
87+
}
88+
89+
public void seek(long startingRecordOffset) {
90+
for (long i = 0; i < startingRecordOffset; ++i) {
91+
if (recordIterator.hasNext()) {
92+
recordIterator.next();
93+
} else {
94+
throw new IllegalStateException(
95+
String.format(
96+
"Invalid starting record offset %d for split %s",
97+
startingRecordOffset,
98+
splitId));
99+
}
100+
}
101+
}
102+
103+
public static <T> BatchRecords<T> forRecords(
104+
String splitId, ClosableIterator<T> recordIterator, int fileOffset, long startingRecordOffset) {
105+
106+
return new BatchRecords<>(
107+
splitId, recordIterator, fileOffset, startingRecordOffset, Collections.emptySet());
108+
}
109+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.source.reader;
20+
21+
import org.apache.flink.api.connector.source.SourceOutput;
22+
import org.apache.flink.connector.base.source.reader.RecordEmitter;
23+
import org.apache.hudi.source.split.HoodieSourceSplit;
24+
25+
/**
26+
* Default Hoodie record emitter.
27+
* @param <T>
28+
*/
29+
public class HoodieRecordEmitter<T> implements RecordEmitter<HoodieRecordWithPosition<T>, T, HoodieSourceSplit> {
30+
31+
@Override
32+
public void emitRecord(HoodieRecordWithPosition<T> record, SourceOutput<T> output, HoodieSourceSplit split) throws Exception {
33+
output.collect(record.record());
34+
split.updatePosition(record.fileOffset(), record.recordOffset());
35+
}
36+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.source.reader;
20+
21+
import java.util.Locale;
22+
23+
/**
24+
* The Hoodie record with position information.
25+
*/
26+
public class HoodieRecordWithPosition<T> {
27+
private T record;
28+
private int fileOffset;
29+
private long recordOffset;
30+
31+
public HoodieRecordWithPosition(T record, int fileOffset, long recordOffset) {
32+
this.record = record;
33+
this.fileOffset = fileOffset;
34+
this.recordOffset = recordOffset;
35+
}
36+
37+
public HoodieRecordWithPosition() {
38+
39+
}
40+
41+
// ------------------------------------------------------------------------
42+
43+
public T record() {
44+
return record;
45+
}
46+
47+
public int fileOffset() {
48+
return fileOffset;
49+
}
50+
51+
public long recordOffset() {
52+
return recordOffset;
53+
}
54+
55+
/** Updates the record and position in this object. */
56+
public void set(T newRecord, int newFileOffset, long newRecordOffset) {
57+
this.record = newRecord;
58+
this.fileOffset = newFileOffset;
59+
this.recordOffset = newRecordOffset;
60+
}
61+
62+
/** Sets the next record of a sequence. This increments the {@code recordOffset} by one. */
63+
public void record(T nextRecord) {
64+
this.record = nextRecord;
65+
this.recordOffset++;
66+
}
67+
68+
@Override
69+
public String toString() {
70+
return String.format(Locale.ROOT, "%s @ %d + %d", record, fileOffset, recordOffset);
71+
}
72+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.source.reader;
20+
21+
import org.apache.flink.api.connector.source.SourceReaderContext;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.connector.base.source.reader.RecordEmitter;
24+
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
25+
import org.apache.hudi.source.split.HoodieSourceSplit;
26+
import org.apache.hudi.source.split.SerializableComparator;
27+
import org.apache.hudi.source.split.SplitRequestEvent;
28+
29+
import java.util.ArrayList;
30+
import java.util.Collection;
31+
import java.util.Map;
32+
33+
/**
34+
* The reader implementation of Hoodie Source.
35+
* @param <T> record type
36+
*/
37+
public class HoodieSourceReader<T> extends
38+
SingleThreadMultiplexSourceReaderBase<HoodieRecordWithPosition<T>, T, HoodieSourceSplit, HoodieSourceSplit> {
39+
40+
public HoodieSourceReader(
41+
RecordEmitter<HoodieRecordWithPosition<T>, T, HoodieSourceSplit> recordEmitter,
42+
Configuration config,
43+
SourceReaderContext context,
44+
SplitReaderFunction<T> readerFunction,
45+
SerializableComparator<HoodieSourceSplit> splitComparator) {
46+
super(() -> new HoodieSourceSplitReader<>(context, readerFunction, splitComparator), recordEmitter, config, context);
47+
}
48+
49+
@Override
50+
public void start() {
51+
// We request a split only if we did not get splits during the checkpoint restore.
52+
// Otherwise, reader restarts will keep requesting more and more splits.
53+
if (getNumberOfCurrentlyAssignedSplits() == 0) {
54+
requestSplit(new ArrayList<>());
55+
}
56+
}
57+
58+
@Override
59+
protected void onSplitFinished(Map<String, HoodieSourceSplit> finishedSplitIds) {
60+
requestSplit(new ArrayList<>(finishedSplitIds.keySet()));
61+
}
62+
63+
@Override
64+
protected HoodieSourceSplit initializedState(HoodieSourceSplit hoodieSourceSplit) {
65+
return hoodieSourceSplit;
66+
}
67+
68+
@Override
69+
protected HoodieSourceSplit toSplitType(String splitId, HoodieSourceSplit hoodieSourceSplit) {
70+
return hoodieSourceSplit;
71+
}
72+
73+
private void requestSplit(Collection<String> finishedSplitIds) {
74+
context.sendSourceEventToCoordinator(new SplitRequestEvent(finishedSplitIds));
75+
}
76+
}

0 commit comments

Comments
 (0)