Skip to content

Commit a7a741d

Browse files
committed
[core] Introduce FallbackSplit in FallbackReadFileStoreTable
1 parent a9df02e commit a7a741d

File tree

5 files changed

+97
-58
lines changed

5 files changed

+97
-58
lines changed

paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,11 @@
4040
import org.apache.paimon.types.RowType;
4141
import org.apache.paimon.utils.ChainTableUtils;
4242
import org.apache.paimon.utils.InternalRowPartitionComputer;
43-
import org.apache.paimon.utils.Preconditions;
4443
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
4544

4645
import java.io.IOException;
4746
import java.util.ArrayList;
48-
import java.util.Arrays;
47+
import java.util.Collections;
4948
import java.util.HashMap;
5049
import java.util.HashSet;
5150
import java.util.LinkedHashMap;
@@ -54,17 +53,19 @@
5453
import java.util.Set;
5554
import java.util.stream.Collectors;
5655

56+
import static org.apache.paimon.utils.Preconditions.checkArgument;
57+
import static org.apache.paimon.utils.Preconditions.checkNotNull;
58+
5759
/**
5860
* Chain table which mainly read from the snapshot branch. However, if the snapshot branch does not
5961
* have a partition, it will fall back to chain read.
6062
*/
6163
public class ChainGroupReadTable extends FallbackReadFileStoreTable {
6264

63-
public ChainGroupReadTable(
64-
AbstractFileStoreTable snapshotStoreTable, AbstractFileStoreTable deltaStoreTable) {
65+
public ChainGroupReadTable(FileStoreTable snapshotStoreTable, FileStoreTable deltaStoreTable) {
6566
super(snapshotStoreTable, deltaStoreTable);
66-
Preconditions.checkArgument(snapshotStoreTable instanceof PrimaryKeyFileStoreTable);
67-
Preconditions.checkArgument(deltaStoreTable instanceof PrimaryKeyFileStoreTable);
67+
checkArgument(snapshotStoreTable instanceof PrimaryKeyFileStoreTable);
68+
checkArgument(deltaStoreTable instanceof PrimaryKeyFileStoreTable);
6869
}
6970

7071
@Override
@@ -187,20 +188,20 @@ public Plan plan() {
187188
List<BinaryRow> deltaPartitions = fallbackScan.listPartitions();
188189
deltaPartitions =
189190
deltaPartitions.stream()
190-
.sorted((o1, o2) -> partitionComparator.compare(o1, o2))
191+
.sorted(partitionComparator)
191192
.collect(Collectors.toList());
192193
BinaryRow maxPartition = deltaPartitions.get(deltaPartitions.size() - 1);
193194
Predicate snapshotPredicate =
194195
ChainTableUtils.createTriangularPredicate(
195196
maxPartition,
196197
partitionConverter,
197-
(Integer i, Object j) -> builder.equal(i, j),
198-
(Integer i, Object j) -> builder.lessThan(i, j));
198+
builder::equal,
199+
builder::lessThan);
199200
mainScan.withPartitionFilter(snapshotPredicate);
200201
List<BinaryRow> candidateSnapshotPartitions = mainScan.listPartitions();
201202
candidateSnapshotPartitions =
202203
candidateSnapshotPartitions.stream()
203-
.sorted((o1, o2) -> partitionComparator.compare(o1, o2))
204+
.sorted(partitionComparator)
204205
.collect(Collectors.toList());
205206
Map<BinaryRow, BinaryRow> partitionMapping =
206207
ChainTableUtils.findFirstLatestPartitions(
@@ -214,13 +215,13 @@ public Plan plan() {
214215
ChainTableUtils.createTriangularPredicate(
215216
partitionParis.getKey(),
216217
partitionConverter,
217-
(Integer i, Object j) -> builder.equal(i, j),
218-
(Integer i, Object j) -> builder.lessThan(i, j)));
218+
builder::equal,
219+
builder::lessThan));
219220
predicates.add(
220221
ChainTableUtils.createLinearPredicate(
221222
partitionParis.getKey(),
222223
partitionConverter,
223-
(Integer i, Object j) -> builder.equal(i, j)));
224+
builder::equal));
224225
deltaScan.withPartitionFilter(PredicateBuilder.or(predicates));
225226
} else {
226227
List<BinaryRow> selectedDeltaPartitions =
@@ -237,7 +238,8 @@ public Plan plan() {
237238
List<Split> subSplits = deltaScan.plan().splits();
238239
Set<String> snapshotFileNames = new HashSet<>();
239240
if (partitionParis.getValue() != null) {
240-
snapshotScan.withPartitionFilter(Arrays.asList(partitionParis.getValue()));
241+
snapshotScan.withPartitionFilter(
242+
Collections.singletonList(partitionParis.getValue()));
241243
List<Split> mainSubSplits = snapshotScan.plan().splits();
242244
snapshotFileNames =
243245
mainSubSplits.stream()
@@ -254,8 +256,10 @@ public Plan plan() {
254256
Map<Integer, List<DataSplit>> bucketSplits = new LinkedHashMap<>();
255257
for (Split split : subSplits) {
256258
DataSplit dataSplit = (DataSplit) split;
257-
Preconditions.checkArgument(
258-
dataSplit.totalBuckets() == options.bucket(),
259+
Integer totalBuckets = dataSplit.totalBuckets();
260+
checkNotNull(totalBuckets);
261+
checkArgument(
262+
totalBuckets == options.bucket(),
259263
"Inconsistent bucket num " + dataSplit.bucket());
260264
bucketSplits
261265
.computeIfAbsent(dataSplit.bucket(), k -> new ArrayList<>())
@@ -288,7 +292,7 @@ public Plan plan() {
288292
}
289293
}
290294
}
291-
return new DataFilePlan(splits);
295+
return new DataFilePlan<>(splits);
292296
}
293297

294298
@Override
@@ -349,7 +353,7 @@ public TableRead withIOManager(IOManager ioManager) {
349353

350354
@Override
351355
public RecordReader<InternalRow> createReader(Split split) throws IOException {
352-
Preconditions.checkArgument(split instanceof ChainSplit);
356+
checkArgument(split instanceof ChainSplit);
353357
return fallbackRead.createReader(split);
354358
}
355359
}

paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java

Lines changed: 70 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.paimon.data.InternalRow;
2525
import org.apache.paimon.disk.IOManager;
2626
import org.apache.paimon.fs.Path;
27-
import org.apache.paimon.io.DataFileMeta;
2827
import org.apache.paimon.io.DataInputView;
2928
import org.apache.paimon.io.DataInputViewStreamWrapper;
3029
import org.apache.paimon.io.DataOutputView;
@@ -37,7 +36,6 @@
3736
import org.apache.paimon.predicate.TopN;
3837
import org.apache.paimon.reader.RecordReader;
3938
import org.apache.paimon.schema.TableSchema;
40-
import org.apache.paimon.table.source.ChainSplit;
4139
import org.apache.paimon.table.source.DataFilePlan;
4240
import org.apache.paimon.table.source.DataSplit;
4341
import org.apache.paimon.table.source.DataTableScan;
@@ -83,12 +81,13 @@ public FallbackReadFileStoreTable(FileStoreTable wrapped, FileStoreTable fallbac
8381
this.fallback = fallback;
8482

8583
Preconditions.checkArgument(!(wrapped instanceof FallbackReadFileStoreTable));
86-
Preconditions.checkArgument(isValidFallbackTable());
87-
}
88-
89-
private boolean isValidFallbackTable() {
90-
return fallback instanceof ChainGroupReadTable
91-
|| !(fallback instanceof FallbackReadFileStoreTable);
84+
if (fallback instanceof FallbackReadFileStoreTable) {
85+
// ChainGroupReadTable need to be wrapped again
86+
if (!(fallback instanceof ChainGroupReadTable)) {
87+
throw new IllegalArgumentException(
88+
"This is a bug, perhaps there is a recursive call.");
89+
}
90+
}
9291
}
9392

9493
public FileStoreTable fallback() {
@@ -248,8 +247,42 @@ private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType fallbackR
248247
return true;
249248
}
250249

251-
/** DataSplit for fallback read. */
252-
public static class FallbackDataSplit extends DataSplit {
250+
/** Split for fallback read. */
251+
public interface FallbackSplit extends Split {
252+
boolean isFallback();
253+
254+
Split wrapped();
255+
}
256+
257+
private static class FallbackSplitImpl implements FallbackSplit {
258+
259+
private static final long serialVersionUID = 1L;
260+
261+
private final Split split;
262+
private final boolean isFallback;
263+
264+
public FallbackSplitImpl(Split split, boolean isFallback) {
265+
this.split = split;
266+
this.isFallback = isFallback;
267+
}
268+
269+
@Override
270+
public boolean isFallback() {
271+
return isFallback;
272+
}
273+
274+
@Override
275+
public Split wrapped() {
276+
return split;
277+
}
278+
279+
@Override
280+
public long rowCount() {
281+
return split.rowCount();
282+
}
283+
}
284+
285+
private static class FallbackDataSplit extends DataSplit implements FallbackSplit {
253286

254287
private static final long serialVersionUID = 1L;
255288

@@ -290,6 +323,23 @@ public static FallbackDataSplit deserialize(DataInputView in) throws IOException
290323
DataSplit dataSplit = DataSplit.deserialize(in);
291324
return new FallbackDataSplit(dataSplit, in.readBoolean());
292325
}
326+
327+
@Override
328+
public boolean isFallback() {
329+
return isFallback;
330+
}
331+
332+
@Override
333+
public Split wrapped() {
334+
return this;
335+
}
336+
}
337+
338+
public static FallbackSplit toFallbackSplit(Split split, boolean fallback) {
339+
if (split instanceof DataSplit) {
340+
return new FallbackDataSplit((DataSplit) split, fallback);
341+
}
342+
return new FallbackSplitImpl(split, fallback);
293343
}
294344

295345
/** Scan implementation for {@link FallbackReadFileStoreTable}. */
@@ -393,7 +443,7 @@ public TableScan.Plan plan() {
393443
Set<BinaryRow> completePartitions = new HashSet<>();
394444
for (Split split : mainScan.plan().splits()) {
395445
DataSplit dataSplit = (DataSplit) split;
396-
splits.add(new FallbackDataSplit(dataSplit, false));
446+
splits.add(toFallbackSplit(dataSplit, false));
397447
completePartitions.add(dataSplit.partition());
398448
}
399449

@@ -404,15 +454,10 @@ public TableScan.Plan plan() {
404454
if (!remainingPartitions.isEmpty()) {
405455
fallbackScan.withPartitionFilter(remainingPartitions);
406456
for (Split split : fallbackScan.plan().splits()) {
407-
if (split instanceof DataSplit) {
408-
splits.add(new FallbackDataSplit((DataSplit) split, true));
409-
} else {
410-
Preconditions.checkArgument(split instanceof ChainSplit);
411-
splits.add(split);
412-
}
457+
splits.add(toFallbackSplit(split, true));
413458
}
414459
}
415-
return new DataFilePlan(splits);
460+
return new DataFilePlan<>(splits);
416461
}
417462

418463
@Override
@@ -483,29 +528,19 @@ public TableRead withIOManager(IOManager ioManager) {
483528

484529
@Override
485530
public RecordReader<InternalRow> createReader(Split split) throws IOException {
486-
if (wrapped.coreOptions().isChainTable()) {
487-
if (split instanceof ChainSplit) {
488-
return fallbackRead.createReader(split);
489-
} else {
490-
return mainRead.createReader(split);
491-
}
492-
}
493-
if (split instanceof FallbackDataSplit) {
494-
FallbackDataSplit fallbackDataSplit = (FallbackDataSplit) split;
495-
if (fallbackDataSplit.isFallback) {
531+
if (split instanceof FallbackSplit) {
532+
FallbackSplit fallbackSplit = (FallbackSplit) split;
533+
if (fallbackSplit.isFallback()) {
496534
try {
497-
return fallbackRead.createReader(fallbackDataSplit);
535+
return fallbackRead.createReader(fallbackSplit.wrapped());
498536
} catch (Exception ignored) {
499537
LOG.error(
500-
"Reading from fallback branch has problems for files: {}",
501-
fallbackDataSplit.dataFiles().stream()
502-
.map(DataFileMeta::fileName)
503-
.collect(Collectors.joining(", ")));
538+
"Reading from fallback branch has problems: {}",
539+
fallbackSplit.wrapped());
504540
}
505541
}
506542
}
507-
DataSplit dataSplit = (DataSplit) split;
508-
return mainRead.createReader(dataSplit);
543+
return mainRead.createReader(split);
509544
}
510545
}
511546
}

paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@ public static List<BinaryRow> getDeltaPartitions(
8989
LocalDateTime candidateTime = stratPartitionTime;
9090
LocalDateTime endPartitionTime =
9191
timeExtractor.extract(partitionColumns, endPartitionValues);
92-
while (candidateTime.compareTo(endPartitionTime) <= 0) {
92+
while (!candidateTime.isAfter(endPartitionTime)) {
9393
if (isDailyPartition) {
94-
if (candidateTime.compareTo(stratPartitionTime) > 0) {
94+
if (candidateTime.isAfter(stratPartitionTime)) {
9595
deltaPartitions.add(
9696
serializer
9797
.toBinaryRow(

paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public static List<DataSplit> packSplits(
172172
List<DataSplit> toPack = new ArrayList<>();
173173
int numFiles = 0;
174174
for (DataSplit split : splits) {
175-
if (split instanceof FallbackReadFileStoreTable.FallbackDataSplit) {
175+
if (split instanceof FallbackReadFileStoreTable.FallbackSplit) {
176176
dataSplits.add(split);
177177
} else if (split.beforeFiles().isEmpty() && split.rawConvertible()) {
178178
numFiles += split.dataFiles().size();

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.paimon.CoreOptions._
2323
import org.apache.paimon.io.DataFileMeta
2424
import org.apache.paimon.spark.PaimonInputPartition
2525
import org.apache.paimon.spark.util.SplitUtils
26-
import org.apache.paimon.table.FallbackReadFileStoreTable.FallbackDataSplit
26+
import org.apache.paimon.table.FallbackReadFileStoreTable.FallbackSplit
2727
import org.apache.paimon.table.format.FormatDataSplit
2828
import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split}
2929

@@ -77,7 +77,7 @@ case class BinPackingSplits(coreOptions: CoreOptions, readRowSizeRatio: Double =
7777

7878
def pack(splits: Array[Split]): Seq[PaimonInputPartition] = {
7979
val (toReshuffle, reserved) = splits.partition {
80-
case _: FallbackDataSplit => false
80+
case _: FallbackSplit => false
8181
case split: DataSplit => split.beforeFiles().isEmpty && split.rawConvertible()
8282
// Currently, format table reader only supports reading one file.
8383
case _: FormatDataSplit => false

0 commit comments

Comments
 (0)