Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions docs/content/spark/sql-functions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
---
title: "SQL Functions"
weight: 2
type: docs
aliases:
- /spark/sql-functions.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# SQL Functions

This section introduce all available Paimon Spark functions.


## max_pt

`max_pt($table_name)`

It accepts a string type literal to specify the table name and return a max-valid-toplevel partition value.
- **valid**: the partition which contains data files
- **toplevel**: only return the first partition value if the table has multi-partition columns

It would throw exception when:
- the table is not a partitioned table
- the partitioned table does not have partition
- all of the partitions do not contains data files

**Example**

```sql
> SELECT max_pt('t');
20250101

> SELECT * FROM t where pt = max_pt('t');
a, 20250101
```

**Since: 1.1.0**
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,14 @@
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;

import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;

/** Catalog methods for working with Functions. */
public interface SupportFunction extends FunctionCatalog, SupportsNamespaces {

static boolean isFunctionNamespace(String[] namespace) {
default boolean isFunctionNamespace(String[] namespace) {
// Allow for empty namespace, as Spark's bucket join will use `bucket` function with empty
// namespace to generate transforms for partitioning. Otherwise, use `sys` namespace.
return namespace.length == 0 || isSystemNamespace(namespace);
}

static boolean isSystemNamespace(String[] namespace) {
return namespace.length == 1 && namespace[0].equalsIgnoreCase(SYSTEM_DATABASE_NAME);
// namespace to generate transforms for partitioning.
// Otherwise, check if it is paimon namespace.
return namespace.length == 0 || (namespace.length == 1 && namespaceExists(namespace));
}

@Override
Expand All @@ -48,8 +43,6 @@ default Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceExc
return PaimonFunctions.names().stream()
.map(name -> Identifier.of(namespace, name))
.toArray(Identifier[]::new);
} else if (namespaceExists(namespace)) {
return new Identifier[0];
}

throw new NoSuchNamespaceException(namespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
Expand All @@ -34,12 +36,15 @@

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;

/** Paimon functions. */
public class PaimonFunctions {

private static final Map<String, UnboundFunction> FUNCTIONS =
ImmutableMap.of("bucket", new PaimonFunctions.BucketFunction());
ImmutableMap.of(
"bucket", new BucketFunction(),
"max_pt", new MaxPtFunction());

private static final List<String> FUNCTION_NAMES = ImmutableList.copyOf(FUNCTIONS.keySet());

Expand Down Expand Up @@ -105,4 +110,60 @@ public String name() {
return "bucket";
}
}

/**
* For partitioned tables, this function returns the maximum value of the first level partition
* of the partitioned table, sorted alphabetically. Note, empty partitions will be skipped. For
* example, a partition created by `alter table ... add partition ...`.
*/
public static class MaxPtFunction implements UnboundFunction {
@Override
public BoundFunction bind(StructType inputType) {
if (inputType.fields().length != 1) {
throw new UnsupportedOperationException(
"Wrong number of inputs, expected 1 but got " + inputType.fields().length);
}
StructField identifier = inputType.fields()[0];
checkArgument(identifier.dataType() == StringType, "table name must be string type");

return new ScalarFunction<String>() {
@Override
public DataType[] inputTypes() {
return new DataType[] {identifier.dataType()};
}

@Override
public DataType resultType() {
return StringType;
}

@Override
public String produceResult(InternalRow input) {
// Does not need to implement the `produceResult` method,
// since `ReplacePaimonFunctions` will replace it with partition literal.
throw new IllegalStateException("This method should not be called");
}

@Override
public String name() {
return "max_pt";
}

@Override
public String canonicalName() {
return "paimon.max_pt(" + identifier.dataType().catalogString() + ")";
}
};
}

@Override
public String description() {
return name();
}

@Override
public String name() {
return "max_pt";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import scala.collection.JavaConverters._
trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
self: SparkTable =>

private lazy val partitionRowType: RowType = TypeUtils.project(table.rowType, table.partitionKeys)
lazy val partitionRowType: RowType = TypeUtils.project(table.rowType, table.partitionKeys)

override lazy val partitionSchema: StructType = SparkTypeUtils.fromPaimonRowType(partitionRowType)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.catalyst.analysis

import org.apache.paimon.spark.{DataConverter, SparkTable, SparkTypeUtils, SparkUtils}
import org.apache.paimon.spark.catalog.SparkBaseCatalog
import org.apache.paimon.utils.{InternalRowUtils, TypeUtils}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{ApplyFunctionExpression, Cast, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._
import org.apache.spark.sql.types.StringType
import org.apache.spark.unsafe.types.UTF8String

import scala.jdk.CollectionConverters._

/** A rule to replace Paimon functions with literal values. */
case class ReplacePaimonFunctions(spark: SparkSession) extends Rule[LogicalPlan] {
private def replaceMaxPt(func: ApplyFunctionExpression): Expression = {
assert(func.children.size == 1)
assert(func.children.head.dataType == StringType)
if (!func.children.head.isInstanceOf[Literal]) {
throw new UnsupportedOperationException("Table name must be a literal")
}
val tableName = func.children.head.eval().asInstanceOf[UTF8String]
if (tableName == null) {
throw new UnsupportedOperationException("Table name cannot be null")
}
val catalogAndIdentifier = SparkUtils
.catalogAndIdentifier(
spark,
tableName.toString,
spark.sessionState.catalogManager.currentCatalog)
if (!catalogAndIdentifier.catalog().isInstanceOf[SparkBaseCatalog]) {
throw new UnsupportedOperationException(
s"${catalogAndIdentifier.catalog()} is not a Paimon catalog")
}

val table =
catalogAndIdentifier.catalog.asTableCatalog.loadTable(catalogAndIdentifier.identifier())
assert(table.isInstanceOf[SparkTable])
val sparkTable = table.asInstanceOf[SparkTable]
if (sparkTable.table.partitionKeys().size() == 0) {
throw new UnsupportedOperationException(s"$table is not a partitioned table")
}

val toplevelPartitionType =
TypeUtils.project(sparkTable.table.rowType, sparkTable.table.partitionKeys()).getTypeAt(0)
val partitionValues = sparkTable.table.newReadBuilder.newScan
.listPartitionEntries()
.asScala
.filter(_.fileCount() > 0)
.map {
partitionEntry => InternalRowUtils.get(partitionEntry.partition(), 0, toplevelPartitionType)
}
.sortWith(InternalRowUtils.compare(_, _, toplevelPartitionType.getTypeRoot) < 0)
.map(DataConverter.fromPaimon(_, toplevelPartitionType))
if (partitionValues.isEmpty) {
throw new UnsupportedOperationException(
s"$table has no partitions or none of the partitions have any data")
}

val sparkType = SparkTypeUtils.fromPaimonType(toplevelPartitionType)
val literal = Literal(partitionValues.last, sparkType)
Cast(literal, func.dataType)
}

override def apply(plan: LogicalPlan): LogicalPlan = {
plan.resolveExpressions {
case func: ApplyFunctionExpression
if func.function.name() == "max_pt" &&
func.function.canonicalName().startsWith("paimon") =>
replaceMaxPt(func)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.spark.extensions

import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, PaimonDeleteTable, PaimonIncompatiblePHRRules, PaimonIncompatibleResolutionRules, PaimonMergeInto, PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable, PaimonViewResolver}
import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, PaimonDeleteTable, PaimonIncompatiblePHRRules, PaimonIncompatibleResolutionRules, PaimonMergeInto, PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable, PaimonViewResolver, ReplacePaimonFunctions}
import org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, MergePaimonScalarSubqueries}
import org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
import org.apache.paimon.spark.execution.PaimonStrategy
Expand All @@ -44,6 +44,7 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
spark => SparkShimLoader.getSparkShim.createCustomResolution(spark))
extensions.injectResolutionRule(spark => PaimonIncompatibleResolutionRules(spark))

extensions.injectPostHocResolutionRule(spark => ReplacePaimonFunctions(spark))
extensions.injectPostHocResolutionRule(spark => PaimonPostHocResolutionRules(spark))
extensions.injectPostHocResolutionRule(spark => PaimonIncompatiblePHRRules(spark))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ class PaimonFunctionTest extends PaimonHiveTestBase {
}
}

test("Paimon function: show user functions") {
assume(gteqSpark3_4)
Seq("paimon", paimonHiveCatalogName).foreach {
catalogName =>
sql(s"use $catalogName")
val functions = sql("show user functions").collect()
assert(functions.exists(_.getString(0).contains("max_pt")), catalogName)
}
}

test("Paimon function: bucket join with SparkGenericCatalog") {
sql(s"use $sparkCatalogName")
assume(gteqSpark3_3)
Expand Down Expand Up @@ -105,6 +115,61 @@ class PaimonFunctionTest extends PaimonHiveTestBase {
sql("DROP FUNCTION myIntSum")
checkAnswer(sql(s"SHOW FUNCTIONS FROM $hiveDbName LIKE 'myIntSum'"), Seq.empty)
}

test("Add max_pt function") {
Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach {
catalogName =>
{
sql(s"use $catalogName")
val maxPt = if (catalogName == sparkCatalogName) {
"paimon.max_pt"
} else {
"max_pt"
}

intercept[Exception] {
sql(s"SELECT $maxPt(1)").collect()
}
intercept[Exception] {
sql(s"SELECT $maxPt()").collect()
}
withTable("t") {
sql("CREATE TABLE t (id INT) USING paimon")
intercept[Exception] {
sql(s"SELECT $maxPt('t')").collect()
}
}

withTable("t") {
sql("CREATE TABLE t (id INT) USING paimon PARTITIONED BY (p1 STRING)")
intercept[Exception] {
sql(s"SELECT $maxPt('t')").collect()
}
sql("INSERT INTO t PARTITION (p1='a') VALUES (1)")
sql("INSERT INTO t PARTITION (p1='b') VALUES (2)")
sql("INSERT INTO t PARTITION (p1='aa') VALUES (3)")
sql("ALTER TABLE t ADD PARTITION (p1='z')")
checkAnswer(sql(s"SELECT $maxPt('t')"), Row("b"))
checkAnswer(sql(s"SELECT id FROM t WHERE p1 = $maxPt('default.t')"), Row(2))
}

withTable("t") {
sql("CREATE TABLE t (id INT) USING paimon PARTITIONED BY (p1 INT, p2 STRING)")
intercept[Exception] {
sql(s"SELECT $maxPt('t')").collect()
}
sql("INSERT INTO t PARTITION (p1=1, p2='c') VALUES (1)")
sql("INSERT INTO t PARTITION (p1=2, p2='a') VALUES (2)")
sql("INSERT INTO t PARTITION (p1=2, p2='b') VALUES (3)")
sql("ALTER TABLE t ADD PARTITION (p1='9', p2='z')")
checkAnswer(sql(s"SELECT $maxPt('t')"), Row("2"))
checkAnswer(
sql(s"SELECT id FROM t WHERE p1 = $maxPt('default.t')"),
Row(2) :: Row(3) :: Nil)
}
}
}
}
}

private class MyIntSum extends UserDefinedAggregateFunction {
Expand Down
Loading