Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

68 changes: 66 additions & 2 deletions src/main/scala/com/singlestore/spark/SQLGen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._
import org.slf4j.{Logger, LoggerFactory}
import com.singlestore.spark.JdbcHelpers.getDMLConnProperties
import org.apache.spark.sql.execution.datasources.LogicalRelation

import scala.collection.immutable.HashMap
import scala.collection.mutable
Expand Down Expand Up @@ -229,8 +230,71 @@ object SQLGen extends LazyLogging {

object Relation {
def unapply(source: LogicalPlan): Option[Relation] = {
val versionSpecificRelation = VersionSpecificRelationExtractor
versionSpecificRelation.unapply(source)
source match {
case lr: LogicalRelation if lr.relation.isInstanceOf[SinglestoreReader] => {
val reader = lr.relation.asInstanceOf[SinglestoreReader]
def convertBack(output: Seq[AttributeReference],
sql: String,
variables: VariableList,
isFinal: Boolean,
context: SQLGenContext): LogicalPlan = {
LogicalRelationCompat.copyWithReader(lr,
reader.copy(query = sql,
variables = variables,
isFinal = isFinal,
expectedOutput = output,
context = context))
}

Some(Relation(lr.output, reader, reader.context.nextAlias(), convertBack))
}
case _ => None
}
}
}

// In some versions of Spark, LogicalRelation has an extra `stream`
// argument, while in others it does not. This extractor abstracts away the differences
// and safely retrieves the common fields. It will also extract the `stream` attribute only if it exists
// in the current runtime class, avoiding compile-time errors and runtime crashes.
object LogicalRelationCompat {
def copyWithReader(lr: LogicalRelation, reader: SinglestoreReader): LogicalRelation = {
val cls = lr.getClass
val copies = cls.getMethods.iterator
.filter(m => m.getName == "copy")
.toList
.sortBy(_.getParameterCount)

// pick the most specific/longest copy; we’ll fill defaults for trailing params
val copyM = copies.lastOption.getOrElse {
throw new NoSuchMethodError("LogicalRelation.copy not found.")
}

// The first four params are stable across versions:
// relation: BaseRelation
// output: Seq[AttributeReference]
// catalogTable: Option[CatalogTable]
// isStreaming: Boolean
val baseArgs: Array[AnyRef] = Array(
reader.asInstanceOf[AnyRef],
reader.expectedOutput,
lr.catalogTable.asInstanceOf[AnyRef],
java.lang.Boolean.valueOf(lr.isStreaming)
)

val paramCount = copyM.getParameterCount
val args =
if (paramCount <= 4) baseArgs
else {
// Fill in extra params with their default values: copy$default$5, copy$default$6, ...
val extras = (5 to paramCount).map { i =>
val dm = cls.getMethod(s"copy$$default$$$i")
dm.invoke(lr)
}
baseArgs ++ extras
}

copyM.invoke(lr, args: _*).asInstanceOf[LogicalRelation]
}
}

Expand Down
Loading