diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java index 457229d7ad3d..19f55ae3c121 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java @@ -137,7 +137,14 @@ private Optional applyPushDownLimit() { long scannedRowCount = 0; SnapshotReader.Plan plan = ((ScannedResult) result).plan(); - List splits = plan.dataSplits(); + List planSplits = plan.splits(); + // Limit pushdown only supports DataSplit. Skip for IncrementalSplit. + if (planSplits.stream().anyMatch(s -> !(s instanceof DataSplit))) { + return Optional.of(result); + } + @SuppressWarnings("unchecked") + List splits = (List) (List) planSplits; + LOG.info("Applying limit pushdown. Original splits count: {}", splits.size()); if (splits.isEmpty()) { return Optional.of(result); @@ -193,7 +200,13 @@ private Optional applyPushDownTopN() { } SnapshotReader.Plan plan = ((ScannedResult) result).plan(); - List splits = plan.dataSplits(); + List planSplits = plan.splits(); + // TopN pushdown only supports DataSplit. Skip for IncrementalSplit. + if (planSplits.stream().anyMatch(s -> !(s instanceof DataSplit))) { + return Optional.of(result); + } + @SuppressWarnings("unchecked") + List splits = (List) (List) planSplits; if (splits.isEmpty()) { return Optional.of(result); } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala index b3012e2f90b7..68f97743d360 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala @@ -344,6 +344,27 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase { } } + test("incremental query by tag with LIMIT") { + sql("use paimon") + withTable("t") { + spark.sql(""" + |CREATE TABLE t (a INT, b INT, c STRING) + |USING paimon + |TBLPROPERTIES ('primary-key'='a,b', 'bucket' = '2') + |PARTITIONED BY (a) + |""".stripMargin) + spark.sql("INSERT INTO t VALUES (1, 1, '1'), (2, 2, '2')") + sql("CALL sys.create_tag('t', 'tag1')") + spark.sql("INSERT INTO t VALUES (1, 3, '3'), (2, 4, '4')") + sql("CALL sys.create_tag('t', 'tag2')") + + checkAnswer( + spark.sql( + "SELECT * FROM paimon_incremental_query('t', 'tag1', 'tag2') ORDER BY a, b LIMIT 5"), + Seq(Row(1, 3, "3"), Row(2, 4, "4"))) + } + } + private def incrementalDF(tableIdent: String, start: Int, end: Int): DataFrame = { spark.read .format("paimon")