Skip to content

Commit a4effef

Browse files
authored
[core] Not throw exception for fallback reading (#5383)
1 parent 6dc7287 commit a4effef

File tree

2 files changed

+18
-5
lines changed

2 files changed

+18
-5
lines changed

paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.paimon.data.InternalRow;
2525
import org.apache.paimon.disk.IOManager;
2626
import org.apache.paimon.fs.Path;
27+
import org.apache.paimon.io.DataFileMeta;
2728
import org.apache.paimon.manifest.PartitionEntry;
2829
import org.apache.paimon.metrics.MetricRegistry;
2930
import org.apache.paimon.options.Options;
@@ -44,6 +45,9 @@
4445
import org.apache.paimon.utils.Preconditions;
4546
import org.apache.paimon.utils.SegmentsCache;
4647

48+
import org.slf4j.Logger;
49+
import org.slf4j.LoggerFactory;
50+
4751
import java.io.IOException;
4852
import java.util.ArrayList;
4953
import java.util.HashMap;
@@ -60,6 +64,8 @@
6064
*/
6165
public class FallbackReadFileStoreTable extends DelegatedFileStoreTable {
6266

67+
private static final Logger LOG = LoggerFactory.getLogger(FallbackReadFileStoreTable.class);
68+
6369
private final FileStoreTable fallback;
6470

6571
public FallbackReadFileStoreTable(FileStoreTable wrapped, FileStoreTable fallback) {
@@ -397,10 +403,17 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
397403
DataSplit dataSplit = (DataSplit) split;
398404
if (!dataSplit.dataFiles().isEmpty()
399405
&& dataSplit.dataFiles().get(0).minKey().getFieldCount() > 0) {
400-
return fallbackRead.createReader(split);
401-
} else {
402-
return mainRead.createReader(split);
406+
try {
407+
return fallbackRead.createReader(split);
408+
} catch (Exception ignored) {
409+
LOG.error(
410+
"Reading from fallback branch has problems for files: {}",
411+
dataSplit.dataFiles().stream()
412+
.map(DataFileMeta::fileName)
413+
.collect(Collectors.joining(", ")));
414+
}
403415
}
416+
return mainRead.createReader(split);
404417
}
405418
}
406419
}

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,11 +311,11 @@ public void testFallbackBranchBatchRead() throws Exception {
311311
public void testCrossPartitionFallbackBranchBatchRead() throws Exception {
312312
sql(
313313
"CREATE TABLE t ( pk INT PRIMARY KEY NOT ENFORCED, name STRING, dt STRING ) PARTITIONED BY (dt) WITH ( 'bucket' = '-1' )");
314-
sql(
315-
"INSERT INTO t VALUES (1, 'Jack', '20250227'), (1, 'Jackson', '20250227'), (2, 'Sam', '20250228')");
316314
sql("CALL sys.create_branch('default.t', 'stream')");
317315
sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'stream' )");
318316

317+
sql(
318+
"INSERT INTO t VALUES (1, 'Jack', '20250227'), (1, 'Jackson', '20250227'), (2, 'Sam', '20250228')");
319319
sql(
320320
"INSERT INTO `t$branch_stream` VALUES (1, 'John Stream', '20250228'), (3, 'Rick Stream', '20250301')");
321321
assertThat(collectResult("SELECT pk, name, dt FROM t order by dt"))

0 commit comments

Comments
 (0)