Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions assembly/src/main/assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,14 @@
<include>**/*</include>
</includes>
</fileSet>
<fileSet>
<directory>
${project.parent.basedir}/spark-wrapper/spark-3.4/target/classes/
</directory>
<outputDirectory>resources/spark-wrapper-spark-3_4</outputDirectory>
<includes>
<include>**/*</include>
</includes>
</fileSet>
</fileSets>
</assembly>
2 changes: 1 addition & 1 deletion core/src/main/scala/com/pingcap/tispark/TiSparkInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.tikv.common.exception.TiInternalException
object TiSparkInfo {
private final val logger = LoggerFactory.getLogger(getClass.getName)

val SUPPORTED_SPARK_VERSION: List[String] = "3.0" :: "3.1" :: "3.2" :: "3.3" :: Nil
val SUPPORTED_SPARK_VERSION: List[String] = "3.0" :: "3.1" :: "3.2" :: "3.3" :: "3.4" :: Nil

val SPARK_VERSION: String = org.apache.spark.SPARK_VERSION

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,15 @@ case class SpecialSum(child: Expression, retType: DataType, initVal: Any)
override def dataType: DataType = resultType

override def checkInputDataTypes(): TypeCheckResult =
TypeUtils.checkForNumericExpr(child.dataType, "function sum")
checkForNumericExpr(child.dataType, "function sum")

def checkForNumericExpr(dt: DataType, caller: String): TypeCheckResult = {
if (dt.isInstanceOf[NumericType] || dt == NullType) {
TypeCheckResult.TypeCheckSuccess
} else {
TypeCheckResult.TypeCheckFailure(s"$caller requires numeric types, not ${dt.catalogString}")
}
}

/**
* The implement is same as the [[org.apache.spark.sql.catalyst.expressions.aggregate.Sum]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ class TiAuthIntegrationSuite extends SharedSQLContext {
exception should not have message(
s"DELETE command denied to user `$user`@% for table default.`$hive_table`")
val errorMessage = exception.getMessage
assert(errorMessage.contains(s"DELETE is only supported with v2 tables."))
assert(
errorMessage.contains(s"DELETE is only supported with v2 tables.") ||
// For Spark 3.4, which is fixed in 3.5
errorMessage.contains("[INTERNAL_ERROR] Unexpected table relation: HiveTableRelation"))

spark.sql(s"DROP TABLE IF EXISTS `$hive_table`")
}
Expand Down Expand Up @@ -156,7 +159,10 @@ class TiAuthIntegrationSuite extends SharedSQLContext {
spark.sql(s"select * from $table")
}
// validateCatalog has been set namespace with "use tidb_catalog.$dbPrefix$dummyDatabase" in beforeAll() method
assert(caught.getMessage.contains(s"Table or view not found: $table"))
assert(
caught.getMessage.contains(s"Table or view not found: $table") ||
// For Spark 3.4
caught.getMessage.contains(s"The table or view `$table` cannot be found"))
}

test(f"Show databases without privilege should not contains db") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ class BaseDataSourceTest(val table: String, val database: String = "tispark_test
this.jdbcWrite(data, schema)
}
assert(
caughtJDBC.getCause.getClass.equals(jdbcErrorClass),
s"${caughtJDBC.getCause.getClass.getName} not equals to ${jdbcErrorClass.getName}")
jdbcErrorClass.isAssignableFrom(caughtJDBC.getCause.getClass),
s"${jdbcErrorClass.getName} not assignable from ${caughtJDBC.getCause.getClass.getName}")

val caughtTiDB = intercept[SparkException] {
this.tidbWrite(data, schema)
}
assert(
caughtTiDB.getCause.getClass.equals(tidbErrorClass),
s"${caughtTiDB.getCause.getClass.getName} not equals to ${tidbErrorClass.getName}")
tidbErrorClass.isAssignableFrom(caughtTiDB.getCause.getClass),
s"${tidbErrorClass.getName} not assignable from ${caughtTiDB.getCause.getClass.getName}")

if (tidbErrorMsg != null) {
if (!msgStartWith) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class ExceptionTestSuite extends BaseBatchWriteTest("test_datasource_exception_t
tidbWrite(List(row1, row2), schema)
}
assert(
caught.getMessage.contains(s"Table or view '$table' not found in database '$database'"))
caught.getMessage.contains(s"Table or view '$table' not found in database '$database'") ||
// For Spark 3.4
caught.getMessage.contains(s"The table or view `$database`.`$table` cannot be found"))
}

test("Test column does not exist") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class SparkDataTypeTestSuite extends BaseTiSparkTest {
test("double type test") {
compSparkWithTiDB(
qSpark =
"select id_dt, tp_double, tp_float, tp_real from full_data_type_table order by tp_double desc nulls first limit 10",
"select id_dt, tp_double, tp_float, tp_real from full_data_type_table order by tp_double desc nulls first, id_dt asc limit 10",
qTiDB =
"select id_dt, tp_double, tp_float, tp_real from full_data_type_table order by tp_double is null desc, tp_double desc limit 10")
"select id_dt, tp_double, tp_float, tp_real from full_data_type_table order by tp_double is null desc, tp_double desc, id_dt asc limit 10")
}

test("decimal type test") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ class StaleReadSuite extends BaseTiSparkTest {
val caught = intercept[org.apache.spark.sql.AnalysisException] {
spark.sql(s"select count(*) from $table").collect()
}
caught.getMessage() should include("Table or view not found")
assert(
caught.getMessage().contains("Table or view not found") ||
// For spark 3.4
caught.getMessage().contains(s"The table or view `$table` cannot be found"))

spark.conf.set(TiConfigConst.STALE_READ, t1)
assert(1 == spark.sql(s"select * from $table").schema.fields.length)
Expand Down
19 changes: 16 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,14 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<protobuf.version>3.1.0</protobuf.version>
<spark.version.compile>${spark3_0.version}</spark.version.compile>
<spark.version.test>${spark3_0.version}</spark.version.test>
<spark.version.release>3.0</spark.version.release>
<spark.version.compile>${spark3_4.version}</spark.version.compile>
<spark.version.test>${spark3_4.version}</spark.version.test>
<spark.version.release>3.4</spark.version.release>
<spark3_0.version>3.0.3</spark3_0.version>
<spark3_1.version>3.1.3</spark3_1.version>
<spark3_2.version>3.2.3</spark3_2.version>
<spark3_3.version>3.3.1</spark3_3.version>
<spark3_4.version>3.4.2</spark3_4.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.10</scala.version>
<scala.version.release>2.12</scala.version.release>
Expand Down Expand Up @@ -175,6 +176,7 @@
<module>spark-wrapper/spark-3.1</module>
<module>spark-wrapper/spark-3.2</module>
<module>spark-wrapper/spark-3.3</module>
<module>spark-wrapper/spark-3.4</module>
<module>assembly</module>
</modules>

Expand Down Expand Up @@ -212,6 +214,17 @@
<spark.version.release>3.3</spark.version.release>
</properties>
</profile>
<profile>
<id>spark-3.4</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<spark.version.compile>${spark3_4.version}</spark.version.compile>
<spark.version.test>${spark3_4.version}</spark.version.test>
<spark.version.release>3.4</spark.version.release>
</properties>
</profile>
<profile>
<id>jenkins</id>
<modules>
Expand Down
129 changes: 129 additions & 0 deletions spark-wrapper/spark-3.4/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.pingcap.tispark</groupId>
<artifactId>tispark-parent</artifactId>
<version>3.3.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>spark-wrapper-spark-3.4_${scala.version.release}</artifactId>
<packaging>jar</packaging>
<name>TiSpark Project Spark Wrapper Spark-3.4</name>
<url>http://github.copm/pingcap/tispark</url>

<properties>
<spark.version.wrapper>${spark3_4.version}</spark.version.wrapper>
</properties>

<dependencies>
<dependency>
<groupId>com.pingcap.tispark</groupId>
<artifactId>tispark-core-internal</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version.wrapper}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>${spark.version.wrapper}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version.wrapper}</version>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.3.0</version>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>doc-jar</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<showWarnings>true</showWarnings>
<showDeprecation>true</showDeprecation>
</configuration>
</plugin>
<!-- Source Plug-in -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.0.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- Scala Format Plug-in -->
<plugin>
<groupId>org.antipathy</groupId>
<artifactId>mvn-scalafmt_${scala.binary.version}</artifactId>
<version>1.0.3</version>
<configuration>
<skipSources>${scalafmt.skip}</skipSources>
<skipTestSources>${scalafmt.skip}</skipTestSources>
<sourceDirectories> <!-- (Optional) Paths to source-directories. Overrides ${project.build.sourceDirectory} -->
<sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
</sourceDirectories>
<testSourceDirectories> <!-- (Optional) Paths to test-source-directories. Overrides ${project.build.testSourceDirectory} -->
<param>${project.basedir}/src/test/scala</param>
</testSourceDirectories>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.pingcap.tispark
/*
* Copyright 2022 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.spark.sql.catalyst.expressions.{
Alias,
AliasHelper,
ExprId,
Expression,
SortOrder
}

object SparkWrapper {
def getVersion: String = {
"SparkWrapper-3.4"
}

def newAlias(child: Expression, name: String): Alias = {
Alias(child, name)()
}

def newAlias(child: Expression, name: String, exprId: ExprId): Alias = {
Alias(child, name)(exprId = exprId)
}

def trimNonTopLevelAliases(e: Expression): Expression = {
TiCleanupAliases.trimNonTopLevelAliases2(e)
}

def copySortOrder(sortOrder: SortOrder, child: Expression): SortOrder = {
sortOrder.copy(child = child)
}
}

object TiCleanupAliases extends AliasHelper {
def trimNonTopLevelAliases2[T <: Expression](e: T): T = {
super.trimNonTopLevelAliases(e)
}
}
Loading