Skip to content

Commit 8d2594d

Browse files
committed
[spark] Eliminate the De/serialization process when writing to bucket table
1 parent b874f21 commit 8d2594d

File tree

15 files changed

+896
-80
lines changed

15 files changed

+896
-80
lines changed

paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818

1919
package org.apache.paimon.spark.catalyst
2020

21+
import org.apache.spark.sql.Column
2122
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, Expression}
2223
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand}
2324
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
2425
import org.apache.spark.sql.connector.read.Scan
2526
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
27+
import org.apache.spark.sql.functions.call_udf
2628
import org.apache.spark.sql.internal.SQLConf
2729
import org.apache.spark.sql.types.DataType
2830

@@ -50,4 +52,8 @@ object Compatibility {
5052
ansiEnabled: Boolean = SQLConf.get.ansiEnabled): Cast = {
5153
Cast(child, dataType, timeZoneId, ansiEnabled)
5254
}
55+
56+
def callFunction(name: String, args: Seq[Column]): Column = {
57+
call_udf(name, args: _*)
58+
}
5359
}

paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818

1919
package org.apache.paimon.spark.catalyst
2020

21+
import org.apache.spark.sql.Column
2122
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, Expression}
2223
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand}
2324
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
2425
import org.apache.spark.sql.connector.read.Scan
2526
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
27+
import org.apache.spark.sql.functions.call_udf
2628
import org.apache.spark.sql.internal.SQLConf
2729
import org.apache.spark.sql.types.DataType
2830

@@ -50,4 +52,8 @@ object Compatibility {
5052
ansiEnabled: Boolean = SQLConf.get.ansiEnabled): Cast = {
5153
Cast(child, dataType, timeZoneId, ansiEnabled)
5254
}
55+
56+
def callFunction(name: String, args: Seq[Column]): Column = {
57+
call_udf(name, args: _*)
58+
}
5359
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.catalyst
20+
21+
import org.apache.spark.sql.Column
22+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, Expression}
23+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand}
24+
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
25+
import org.apache.spark.sql.connector.read.Scan
26+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
27+
import org.apache.spark.sql.functions.call_udf
28+
import org.apache.spark.sql.internal.SQLConf
29+
import org.apache.spark.sql.types.DataType
30+
31+
object Compatibility {
32+
33+
def createDataSourceV2ScanRelation(
34+
relation: DataSourceV2Relation,
35+
scan: Scan,
36+
output: Seq[AttributeReference]): DataSourceV2ScanRelation = {
37+
DataSourceV2ScanRelation(relation, scan, output)
38+
}
39+
40+
def withNewQuery(o: V2WriteCommand, query: LogicalPlan): V2WriteCommand = {
41+
o.withNewQuery(query)
42+
}
43+
44+
def castByTableInsertionTag: TreeNodeTag[Unit] = {
45+
Cast.BY_TABLE_INSERTION
46+
}
47+
48+
def cast(
49+
child: Expression,
50+
dataType: DataType,
51+
timeZoneId: Option[String] = None,
52+
ansiEnabled: Boolean = SQLConf.get.ansiEnabled): Cast = {
53+
Cast(child, dataType, timeZoneId, ansiEnabled)
54+
}
55+
56+
def callFunction(name: String, args: Seq[Column]): Column = {
57+
call_udf(name, args: _*)
58+
}
59+
60+
}

0 commit comments

Comments
 (0)