diff --git a/docs/content/spark/sql-functions.md b/docs/content/spark/sql-functions.md new file mode 100644 index 000000000000..182f6be51892 --- /dev/null +++ b/docs/content/spark/sql-functions.md @@ -0,0 +1,55 @@ +--- +title: "SQL Functions" +weight: 2 +type: docs +aliases: +- /spark/sql-functions.html +--- + + +# SQL Functions + +This section introduce all available Paimon Spark functions. + + +## max_pt + +`max_pt($table_name)` + +It accepts a string type literal to specify the table name and return a max-valid-toplevel partition value. +- **valid**: the partition which contains data files +- **toplevel**: only return the first partition value if the table has multi-partition columns + +It would throw exception when: +- the table is not a partitioned table +- the partitioned table does not have partition +- all of the partitions do not contains data files + +**Example** + +```sql +> SELECT max_pt('t'); + 20250101 + +> SELECT * FROM t where pt = max_pt('t'); + a, 20250101 +``` + +**Since: 1.1.0** diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java index bb16bbf1c7bc..d3fc3528bb33 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java @@ -27,19 +27,14 @@ import org.apache.spark.sql.connector.catalog.SupportsNamespaces; import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; -import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; - /** Catalog methods for working with Functions. */ public interface SupportFunction extends FunctionCatalog, SupportsNamespaces { - static boolean isFunctionNamespace(String[] namespace) { + default boolean isFunctionNamespace(String[] namespace) { // Allow for empty namespace, as Spark's bucket join will use `bucket` function with empty - // namespace to generate transforms for partitioning. Otherwise, use `sys` namespace. - return namespace.length == 0 || isSystemNamespace(namespace); - } - - static boolean isSystemNamespace(String[] namespace) { - return namespace.length == 1 && namespace[0].equalsIgnoreCase(SYSTEM_DATABASE_NAME); + // namespace to generate transforms for partitioning. + // Otherwise, check if it is paimon namespace. + return namespace.length == 0 || (namespace.length == 1 && namespaceExists(namespace)); } @Override @@ -48,8 +43,6 @@ default Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceExc return PaimonFunctions.names().stream() .map(name -> Identifier.of(namespace, name)) .toArray(Identifier[]::new); - } else if (namespaceExists(namespace)) { - return new Identifier[0]; } throw new NoSuchNamespaceException(namespace); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/functions/PaimonFunctions.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/functions/PaimonFunctions.java index 46be3f2fa5e1..c7949e11948f 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/functions/PaimonFunctions.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/functions/PaimonFunctions.java @@ -21,7 +21,9 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.functions.BoundFunction; +import org.apache.spark.sql.connector.catalog.functions.ScalarFunction; import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.StructField; @@ -34,12 +36,15 @@ import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.spark.sql.types.DataTypes.IntegerType; +import static org.apache.spark.sql.types.DataTypes.StringType; /** Paimon functions. */ public class PaimonFunctions { private static final Map FUNCTIONS = - ImmutableMap.of("bucket", new PaimonFunctions.BucketFunction()); + ImmutableMap.of( + "bucket", new BucketFunction(), + "max_pt", new MaxPtFunction()); private static final List FUNCTION_NAMES = ImmutableList.copyOf(FUNCTIONS.keySet()); @@ -105,4 +110,60 @@ public String name() { return "bucket"; } } + + /** + * For partitioned tables, this function returns the maximum value of the first level partition + * of the partitioned table, sorted alphabetically. Note, empty partitions will be skipped. For + * example, a partition created by `alter table ... add partition ...`. + */ + public static class MaxPtFunction implements UnboundFunction { + @Override + public BoundFunction bind(StructType inputType) { + if (inputType.fields().length != 1) { + throw new UnsupportedOperationException( + "Wrong number of inputs, expected 1 but got " + inputType.fields().length); + } + StructField identifier = inputType.fields()[0]; + checkArgument(identifier.dataType() == StringType, "table name must be string type"); + + return new ScalarFunction() { + @Override + public DataType[] inputTypes() { + return new DataType[] {identifier.dataType()}; + } + + @Override + public DataType resultType() { + return StringType; + } + + @Override + public String produceResult(InternalRow input) { + // Does not need to implement the `produceResult` method, + // since `ReplacePaimonFunctions` will replace it with partition literal. + throw new IllegalStateException("This method should not be called"); + } + + @Override + public String name() { + return "max_pt"; + } + + @Override + public String canonicalName() { + return "paimon.max_pt(" + identifier.dataType().catalogString() + ")"; + } + }; + } + + @Override + public String description() { + return name(); + } + + @Override + public String name() { + return "max_pt"; + } + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala index 5a6abfe2849d..1817add879ef 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala @@ -38,7 +38,7 @@ import scala.collection.JavaConverters._ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement { self: SparkTable => - private lazy val partitionRowType: RowType = TypeUtils.project(table.rowType, table.partitionKeys) + lazy val partitionRowType: RowType = TypeUtils.project(table.rowType, table.partitionKeys) override lazy val partitionSchema: StructType = SparkTypeUtils.fromPaimonRowType(partitionRowType) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/ReplacePaimonFunctions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/ReplacePaimonFunctions.scala new file mode 100644 index 000000000000..d3650d27f815 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/ReplacePaimonFunctions.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.analysis + +import org.apache.paimon.spark.{DataConverter, SparkTable, SparkTypeUtils, SparkUtils} +import org.apache.paimon.spark.catalog.SparkBaseCatalog +import org.apache.paimon.utils.{InternalRowUtils, TypeUtils} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{ApplyFunctionExpression, Cast, Expression, Literal} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._ +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String + +import scala.jdk.CollectionConverters._ + +/** A rule to replace Paimon functions with literal values. */ +case class ReplacePaimonFunctions(spark: SparkSession) extends Rule[LogicalPlan] { + private def replaceMaxPt(func: ApplyFunctionExpression): Expression = { + assert(func.children.size == 1) + assert(func.children.head.dataType == StringType) + if (!func.children.head.isInstanceOf[Literal]) { + throw new UnsupportedOperationException("Table name must be a literal") + } + val tableName = func.children.head.eval().asInstanceOf[UTF8String] + if (tableName == null) { + throw new UnsupportedOperationException("Table name cannot be null") + } + val catalogAndIdentifier = SparkUtils + .catalogAndIdentifier( + spark, + tableName.toString, + spark.sessionState.catalogManager.currentCatalog) + if (!catalogAndIdentifier.catalog().isInstanceOf[SparkBaseCatalog]) { + throw new UnsupportedOperationException( + s"${catalogAndIdentifier.catalog()} is not a Paimon catalog") + } + + val table = + catalogAndIdentifier.catalog.asTableCatalog.loadTable(catalogAndIdentifier.identifier()) + assert(table.isInstanceOf[SparkTable]) + val sparkTable = table.asInstanceOf[SparkTable] + if (sparkTable.table.partitionKeys().size() == 0) { + throw new UnsupportedOperationException(s"$table is not a partitioned table") + } + + val toplevelPartitionType = + TypeUtils.project(sparkTable.table.rowType, sparkTable.table.partitionKeys()).getTypeAt(0) + val partitionValues = sparkTable.table.newReadBuilder.newScan + .listPartitionEntries() + .asScala + .filter(_.fileCount() > 0) + .map { + partitionEntry => InternalRowUtils.get(partitionEntry.partition(), 0, toplevelPartitionType) + } + .sortWith(InternalRowUtils.compare(_, _, toplevelPartitionType.getTypeRoot) < 0) + .map(DataConverter.fromPaimon(_, toplevelPartitionType)) + if (partitionValues.isEmpty) { + throw new UnsupportedOperationException( + s"$table has no partitions or none of the partitions have any data") + } + + val sparkType = SparkTypeUtils.fromPaimonType(toplevelPartitionType) + val literal = Literal(partitionValues.last, sparkType) + Cast(literal, func.dataType) + } + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.resolveExpressions { + case func: ApplyFunctionExpression + if func.function.name() == "max_pt" && + func.function.canonicalName().startsWith("paimon") => + replaceMaxPt(func) + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala index f73df64fb8ab..bfd337580dbf 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala @@ -18,7 +18,7 @@ package org.apache.paimon.spark.extensions -import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, PaimonDeleteTable, PaimonIncompatiblePHRRules, PaimonIncompatibleResolutionRules, PaimonMergeInto, PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable, PaimonViewResolver} +import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, PaimonDeleteTable, PaimonIncompatiblePHRRules, PaimonIncompatibleResolutionRules, PaimonMergeInto, PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable, PaimonViewResolver, ReplacePaimonFunctions} import org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, MergePaimonScalarSubqueries} import org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions import org.apache.paimon.spark.execution.PaimonStrategy @@ -44,6 +44,7 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) { spark => SparkShimLoader.getSparkShim.createCustomResolution(spark)) extensions.injectResolutionRule(spark => PaimonIncompatibleResolutionRules(spark)) + extensions.injectPostHocResolutionRule(spark => ReplacePaimonFunctions(spark)) extensions.injectPostHocResolutionRule(spark => PaimonPostHocResolutionRules(spark)) extensions.injectPostHocResolutionRule(spark => PaimonIncompatiblePHRRules(spark)) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala index bb48983a85a6..4b0e63731d88 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala @@ -66,6 +66,16 @@ class PaimonFunctionTest extends PaimonHiveTestBase { } } + test("Paimon function: show user functions") { + assume(gteqSpark3_4) + Seq("paimon", paimonHiveCatalogName).foreach { + catalogName => + sql(s"use $catalogName") + val functions = sql("show user functions").collect() + assert(functions.exists(_.getString(0).contains("max_pt")), catalogName) + } + } + test("Paimon function: bucket join with SparkGenericCatalog") { sql(s"use $sparkCatalogName") assume(gteqSpark3_3) @@ -105,6 +115,61 @@ class PaimonFunctionTest extends PaimonHiveTestBase { sql("DROP FUNCTION myIntSum") checkAnswer(sql(s"SHOW FUNCTIONS FROM $hiveDbName LIKE 'myIntSum'"), Seq.empty) } + + test("Add max_pt function") { + Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + { + sql(s"use $catalogName") + val maxPt = if (catalogName == sparkCatalogName) { + "paimon.max_pt" + } else { + "max_pt" + } + + intercept[Exception] { + sql(s"SELECT $maxPt(1)").collect() + } + intercept[Exception] { + sql(s"SELECT $maxPt()").collect() + } + withTable("t") { + sql("CREATE TABLE t (id INT) USING paimon") + intercept[Exception] { + sql(s"SELECT $maxPt('t')").collect() + } + } + + withTable("t") { + sql("CREATE TABLE t (id INT) USING paimon PARTITIONED BY (p1 STRING)") + intercept[Exception] { + sql(s"SELECT $maxPt('t')").collect() + } + sql("INSERT INTO t PARTITION (p1='a') VALUES (1)") + sql("INSERT INTO t PARTITION (p1='b') VALUES (2)") + sql("INSERT INTO t PARTITION (p1='aa') VALUES (3)") + sql("ALTER TABLE t ADD PARTITION (p1='z')") + checkAnswer(sql(s"SELECT $maxPt('t')"), Row("b")) + checkAnswer(sql(s"SELECT id FROM t WHERE p1 = $maxPt('default.t')"), Row(2)) + } + + withTable("t") { + sql("CREATE TABLE t (id INT) USING paimon PARTITIONED BY (p1 INT, p2 STRING)") + intercept[Exception] { + sql(s"SELECT $maxPt('t')").collect() + } + sql("INSERT INTO t PARTITION (p1=1, p2='c') VALUES (1)") + sql("INSERT INTO t PARTITION (p1=2, p2='a') VALUES (2)") + sql("INSERT INTO t PARTITION (p1=2, p2='b') VALUES (3)") + sql("ALTER TABLE t ADD PARTITION (p1='9', p2='z')") + checkAnswer(sql(s"SELECT $maxPt('t')"), Row("2")) + checkAnswer( + sql(s"SELECT id FROM t WHERE p1 = $maxPt('default.t')"), + Row(2) :: Row(3) :: Nil) + } + } + } + } } private class MyIntSum extends UserDefinedAggregateFunction {