diff --git a/CHANGELOG b/CHANGELOG index 571beea7..a4f2bc70 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,6 @@ +2025-08-15 Version 4.2.1 + * Fixed compatibility issue with Databricks 16.4 + 2025-07-22 Version 4.2.0 * Added support of Spark 4.0.0 * Changed type conversion for TINYINT to ByteType diff --git a/README.md b/README.md index 0b47ced4..ec445877 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # SingleStoreDB Spark Connector -## Version: 4.2.0 [![License](http://img.shields.io/:license-Apache%202-brightgreen.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) +## Version: 4.2.1 [![License](http://img.shields.io/:license-Apache%202-brightgreen.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) ## Getting Started @@ -13,13 +13,13 @@ spark-packages.org. The group is `com.singlestore` and the artifact is You can add the connector to your Spark application using: spark-shell, pyspark, or spark-submit ``` -$SPARK_HOME/bin/spark-shell --packages com.singlestore:singlestore-spark-connector_2.12:4.2.0-spark-4.0.0 +$SPARK_HOME/bin/spark-shell --packages com.singlestore:singlestore-spark-connector_2.12:4.2.1-spark-4.0.0 ``` We release multiple versions of the `singlestore-spark-connector`, one for each supported Spark version. The connector follows the `x.x.x-spark-y.y.y` naming convention, where `x.x.x` represents the connector version and `y.y.y` represents the corresponding Spark version. -For example, in connector `4.2.0-spark-4.0.0`, 4.1.11 is the version of the connector, +For example, in connector `4.2.1-spark-4.0.0`, 4.1.11 is the version of the connector, compiled and tested against Spark version 4.0.0. It is critical to select the connector version that corresponds to the Spark version in use. diff --git a/build.sbt b/build.sbt index 68931b37..1af0eb74 100644 --- a/build.sbt +++ b/build.sbt @@ -39,7 +39,7 @@ lazy val root = project case "3.5.0" => "scala-sparkv3.5" case "4.0.0" => "scala-sparkv4.0" }), - version := s"4.2.0-spark-${sparkVersion}", + version := s"4.2.1-spark-${sparkVersion}", licenses += "Apache-2.0" -> url( "http://opensource.org/licenses/Apache-2.0" ), diff --git a/demo/notebook/pyspark-singlestore-demo_2F8XQUKFG.zpln b/demo/notebook/pyspark-singlestore-demo_2F8XQUKFG.zpln index 6e078805..8c3ed1e5 100644 --- a/demo/notebook/pyspark-singlestore-demo_2F8XQUKFG.zpln +++ b/demo/notebook/pyspark-singlestore-demo_2F8XQUKFG.zpln @@ -45,7 +45,7 @@ }, { "title": "Configure Spark", - "text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.2.0-spark-4.0.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password", + "text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.2.1-spark-4.0.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password", "user": "anonymous", "dateUpdated": "2022-07-06 11:26:15.232", "progress": 0, diff --git a/demo/notebook/scala-singlestore-demo_2F6Y3APTX.zpln b/demo/notebook/scala-singlestore-demo_2F6Y3APTX.zpln index bd324020..c5471d9f 100644 --- a/demo/notebook/scala-singlestore-demo_2F6Y3APTX.zpln +++ b/demo/notebook/scala-singlestore-demo_2F6Y3APTX.zpln @@ -45,7 +45,7 @@ }, { "title": "Configure Spark", - "text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.2.0-spark-4.0.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password", + "text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.2.1-spark-4.0.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password", "user": "anonymous", "dateUpdated": "2022-07-06 11:31:08.311", "progress": 0, diff --git a/demo/notebook/spark-sql-singlestore-demo_2F7PZ81H6.zpln b/demo/notebook/spark-sql-singlestore-demo_2F7PZ81H6.zpln index 43a618fe..b1e1114f 100644 --- a/demo/notebook/spark-sql-singlestore-demo_2F7PZ81H6.zpln +++ b/demo/notebook/spark-sql-singlestore-demo_2F7PZ81H6.zpln @@ -45,7 +45,7 @@ }, { "title": "Configure Spark", - "text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.2.0-spark-4.0.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password", + "text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.2.1-spark-4.0.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password", "user": "anonymous", "dateUpdated": "2022-07-06 11:32:22.885", "progress": 0, diff --git a/src/main/scala-sparkv3.1/spark/VersionSpecificRelationExtractor.scala b/src/main/scala-sparkv3.1/spark/VersionSpecificRelationExtractor.scala deleted file mode 100644 index bb284437..00000000 --- a/src/main/scala-sparkv3.1/spark/VersionSpecificRelationExtractor.scala +++ /dev/null @@ -1,36 +0,0 @@ -package com.singlestore.spark - -import com.singlestore.spark.SQLGen.{Relation, SQLGenContext, VariableList} -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.datasources.LogicalRelation - -object VersionSpecificRelationExtractor { - def unapply(source: LogicalPlan): Option[Relation] = - source match { - case LogicalRelation(reader: SinglestoreReader, - output, - catalogTable, - isStreaming) => { - def convertBack(output: Seq[AttributeReference], - sql: String, - variables: VariableList, - isFinal: Boolean, - context: SQLGenContext): LogicalPlan = { - new LogicalRelation( - reader.copy(query = sql, - variables = variables, - isFinal = isFinal, - expectedOutput = output, - context = context), - output, - catalogTable, - isStreaming, - ) - } - - Some(Relation(output, reader, reader.context.nextAlias(), convertBack)) - } - case _ => None - } -} diff --git a/src/main/scala-sparkv3.2/spark/VersionSpecificRelationExtractor.scala b/src/main/scala-sparkv3.2/spark/VersionSpecificRelationExtractor.scala deleted file mode 100644 index bb284437..00000000 --- a/src/main/scala-sparkv3.2/spark/VersionSpecificRelationExtractor.scala +++ /dev/null @@ -1,36 +0,0 @@ -package com.singlestore.spark - -import com.singlestore.spark.SQLGen.{Relation, SQLGenContext, VariableList} -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.datasources.LogicalRelation - -object VersionSpecificRelationExtractor { - def unapply(source: LogicalPlan): Option[Relation] = - source match { - case LogicalRelation(reader: SinglestoreReader, - output, - catalogTable, - isStreaming) => { - def convertBack(output: Seq[AttributeReference], - sql: String, - variables: VariableList, - isFinal: Boolean, - context: SQLGenContext): LogicalPlan = { - new LogicalRelation( - reader.copy(query = sql, - variables = variables, - isFinal = isFinal, - expectedOutput = output, - context = context), - output, - catalogTable, - isStreaming, - ) - } - - Some(Relation(output, reader, reader.context.nextAlias(), convertBack)) - } - case _ => None - } -} diff --git a/src/main/scala-sparkv3.3/spark/VersionSpecificRelationExtractor.scala b/src/main/scala-sparkv3.3/spark/VersionSpecificRelationExtractor.scala deleted file mode 100644 index bb284437..00000000 --- a/src/main/scala-sparkv3.3/spark/VersionSpecificRelationExtractor.scala +++ /dev/null @@ -1,36 +0,0 @@ -package com.singlestore.spark - -import com.singlestore.spark.SQLGen.{Relation, SQLGenContext, VariableList} -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.datasources.LogicalRelation - -object VersionSpecificRelationExtractor { - def unapply(source: LogicalPlan): Option[Relation] = - source match { - case LogicalRelation(reader: SinglestoreReader, - output, - catalogTable, - isStreaming) => { - def convertBack(output: Seq[AttributeReference], - sql: String, - variables: VariableList, - isFinal: Boolean, - context: SQLGenContext): LogicalPlan = { - new LogicalRelation( - reader.copy(query = sql, - variables = variables, - isFinal = isFinal, - expectedOutput = output, - context = context), - output, - catalogTable, - isStreaming, - ) - } - - Some(Relation(output, reader, reader.context.nextAlias(), convertBack)) - } - case _ => None - } -} diff --git a/src/main/scala-sparkv3.4/spark/VersionSpecificRelationExtractor.scala b/src/main/scala-sparkv3.4/spark/VersionSpecificRelationExtractor.scala deleted file mode 100644 index bb284437..00000000 --- a/src/main/scala-sparkv3.4/spark/VersionSpecificRelationExtractor.scala +++ /dev/null @@ -1,36 +0,0 @@ -package com.singlestore.spark - -import com.singlestore.spark.SQLGen.{Relation, SQLGenContext, VariableList} -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.datasources.LogicalRelation - -object VersionSpecificRelationExtractor { - def unapply(source: LogicalPlan): Option[Relation] = - source match { - case LogicalRelation(reader: SinglestoreReader, - output, - catalogTable, - isStreaming) => { - def convertBack(output: Seq[AttributeReference], - sql: String, - variables: VariableList, - isFinal: Boolean, - context: SQLGenContext): LogicalPlan = { - new LogicalRelation( - reader.copy(query = sql, - variables = variables, - isFinal = isFinal, - expectedOutput = output, - context = context), - output, - catalogTable, - isStreaming, - ) - } - - Some(Relation(output, reader, reader.context.nextAlias(), convertBack)) - } - case _ => None - } -} diff --git a/src/main/scala-sparkv3.5/spark/VersionSpecificRelationExtractor.scala b/src/main/scala-sparkv3.5/spark/VersionSpecificRelationExtractor.scala deleted file mode 100644 index bb284437..00000000 --- a/src/main/scala-sparkv3.5/spark/VersionSpecificRelationExtractor.scala +++ /dev/null @@ -1,36 +0,0 @@ -package com.singlestore.spark - -import com.singlestore.spark.SQLGen.{Relation, SQLGenContext, VariableList} -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.datasources.LogicalRelation - -object VersionSpecificRelationExtractor { - def unapply(source: LogicalPlan): Option[Relation] = - source match { - case LogicalRelation(reader: SinglestoreReader, - output, - catalogTable, - isStreaming) => { - def convertBack(output: Seq[AttributeReference], - sql: String, - variables: VariableList, - isFinal: Boolean, - context: SQLGenContext): LogicalPlan = { - new LogicalRelation( - reader.copy(query = sql, - variables = variables, - isFinal = isFinal, - expectedOutput = output, - context = context), - output, - catalogTable, - isStreaming, - ) - } - - Some(Relation(output, reader, reader.context.nextAlias(), convertBack)) - } - case _ => None - } -} diff --git a/src/main/scala-sparkv4.0/spark/VersionSpecificRelationExtractor.scala b/src/main/scala-sparkv4.0/spark/VersionSpecificRelationExtractor.scala deleted file mode 100644 index 59982d01..00000000 --- a/src/main/scala-sparkv4.0/spark/VersionSpecificRelationExtractor.scala +++ /dev/null @@ -1,38 +0,0 @@ -package com.singlestore.spark - -import com.singlestore.spark.SQLGen.{Relation, SQLGenContext, VariableList} -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.datasources.LogicalRelation - -object VersionSpecificRelationExtractor { - def unapply(source: LogicalPlan): Option[Relation] = - source match { - case LogicalRelation(reader: SinglestoreReader, - output, - catalogTable, - isStreaming, - stream) => { - def convertBack(output: Seq[AttributeReference], - sql: String, - variables: VariableList, - isFinal: Boolean, - context: SQLGenContext): LogicalPlan = { - new LogicalRelation( - reader.copy(query = sql, - variables = variables, - isFinal = isFinal, - expectedOutput = output, - context = context), - output, - catalogTable, - isStreaming, - stream - ) - } - - Some(Relation(output, reader, reader.context.nextAlias(), convertBack)) - } - case _ => None - } -} diff --git a/src/main/scala/com/singlestore/spark/SQLGen.scala b/src/main/scala/com/singlestore/spark/SQLGen.scala index b72269fb..ffead0db 100644 --- a/src/main/scala/com/singlestore/spark/SQLGen.scala +++ b/src/main/scala/com/singlestore/spark/SQLGen.scala @@ -7,6 +7,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types._ import org.slf4j.{Logger, LoggerFactory} import com.singlestore.spark.JdbcHelpers.getDMLConnProperties +import org.apache.spark.sql.execution.datasources.LogicalRelation import scala.collection.immutable.HashMap import scala.collection.mutable @@ -229,8 +230,71 @@ object SQLGen extends LazyLogging { object Relation { def unapply(source: LogicalPlan): Option[Relation] = { - val versionSpecificRelation = VersionSpecificRelationExtractor - versionSpecificRelation.unapply(source) + source match { + case lr: LogicalRelation if lr.relation.isInstanceOf[SinglestoreReader] => { + val reader = lr.relation.asInstanceOf[SinglestoreReader] + def convertBack(output: Seq[AttributeReference], + sql: String, + variables: VariableList, + isFinal: Boolean, + context: SQLGenContext): LogicalPlan = { + LogicalRelationCompat.copyWithReader(lr, + reader.copy(query = sql, + variables = variables, + isFinal = isFinal, + expectedOutput = output, + context = context)) + } + + Some(Relation(lr.output, reader, reader.context.nextAlias(), convertBack)) + } + case _ => None + } + } + } + + // In some versions of Spark, LogicalRelation has an extra `stream` + // argument, while in others it does not. This extractor abstracts away the differences + // and safely retrieves the common fields. It will also extract the `stream` attribute only if it exists + // in the current runtime class, avoiding compile-time errors and runtime crashes. + object LogicalRelationCompat { + def copyWithReader(lr: LogicalRelation, reader: SinglestoreReader): LogicalRelation = { + val cls = lr.getClass + val copies = cls.getMethods.iterator + .filter(m => m.getName == "copy") + .toList + .sortBy(_.getParameterCount) + + // pick the most specific/longest copy; we’ll fill defaults for trailing params + val copyM = copies.lastOption.getOrElse { + throw new NoSuchMethodError("LogicalRelation.copy not found.") + } + + // The first four params are stable across versions: + // relation: BaseRelation + // output: Seq[AttributeReference] + // catalogTable: Option[CatalogTable] + // isStreaming: Boolean + val baseArgs: Array[AnyRef] = Array( + reader.asInstanceOf[AnyRef], + reader.expectedOutput, + lr.catalogTable.asInstanceOf[AnyRef], + java.lang.Boolean.valueOf(lr.isStreaming) + ) + + val paramCount = copyM.getParameterCount + val args = + if (paramCount <= 4) baseArgs + else { + // Fill in extra params with their default values: copy$default$5, copy$default$6, ... + val extras = (5 to paramCount).map { i => + val dm = cls.getMethod(s"copy$$default$$$i") + dm.invoke(lr) + } + baseArgs ++ extras + } + + copyM.invoke(lr, args: _*).asInstanceOf[LogicalRelation] } }