Skip to content

Commit 186efa0

Browse files
[core] Fix paimon_incremental_query with limit push down (#7269)
1 parent 913c6d7 commit 186efa0

File tree

2 files changed

+36
-2
lines changed

2 files changed

+36
-2
lines changed

paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,14 @@ private Optional<StartingScanner.Result> applyPushDownLimit() {
137137

138138
long scannedRowCount = 0;
139139
SnapshotReader.Plan plan = ((ScannedResult) result).plan();
140-
List<DataSplit> splits = plan.dataSplits();
140+
List<Split> planSplits = plan.splits();
141+
// Limit pushdown only supports DataSplit. Skip for IncrementalSplit.
142+
if (planSplits.stream().anyMatch(s -> !(s instanceof DataSplit))) {
143+
return Optional.of(result);
144+
}
145+
@SuppressWarnings("unchecked")
146+
List<DataSplit> splits = (List<DataSplit>) (List<?>) planSplits;
147+
141148
LOG.info("Applying limit pushdown. Original splits count: {}", splits.size());
142149
if (splits.isEmpty()) {
143150
return Optional.of(result);
@@ -193,7 +200,13 @@ private Optional<StartingScanner.Result> applyPushDownTopN() {
193200
}
194201

195202
SnapshotReader.Plan plan = ((ScannedResult) result).plan();
196-
List<DataSplit> splits = plan.dataSplits();
203+
List<Split> planSplits = plan.splits();
204+
// TopN pushdown only supports DataSplit. Skip for IncrementalSplit.
205+
if (planSplits.stream().anyMatch(s -> !(s instanceof DataSplit))) {
206+
return Optional.of(result);
207+
}
208+
@SuppressWarnings("unchecked")
209+
List<DataSplit> splits = (List<DataSplit>) (List<?>) planSplits;
197210
if (splits.isEmpty()) {
198211
return Optional.of(result);
199212
}

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,27 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase {
344344
}
345345
}
346346

347+
test("incremental query by tag with LIMIT") {
348+
sql("use paimon")
349+
withTable("t") {
350+
spark.sql("""
351+
|CREATE TABLE t (a INT, b INT, c STRING)
352+
|USING paimon
353+
|TBLPROPERTIES ('primary-key'='a,b', 'bucket' = '2')
354+
|PARTITIONED BY (a)
355+
|""".stripMargin)
356+
spark.sql("INSERT INTO t VALUES (1, 1, '1'), (2, 2, '2')")
357+
sql("CALL sys.create_tag('t', 'tag1')")
358+
spark.sql("INSERT INTO t VALUES (1, 3, '3'), (2, 4, '4')")
359+
sql("CALL sys.create_tag('t', 'tag2')")
360+
361+
checkAnswer(
362+
spark.sql(
363+
"SELECT * FROM paimon_incremental_query('t', 'tag1', 'tag2') ORDER BY a, b LIMIT 5"),
364+
Seq(Row(1, 3, "3"), Row(2, 4, "4")))
365+
}
366+
}
367+
347368
private def incrementalDF(tableIdent: String, start: Int, end: Int): DataFrame = {
348369
spark.read
349370
.format("paimon")

0 commit comments

Comments
 (0)