Skip to content

Commit ad26072

Browse files
gowaIgor Kamyshnikov
andauthored
SPARKC-647 UDTValue performance fix - reuse CassandraRowMetadata instead of building it for each row (b2.5) (#1309)
* UDTValue to have CassandraRowMetadata in constructor similar to japi.UDTValue * preserve compatibility for UDTValue case class constructor/apply/unapply by introducing a new field metaData_ that can keep the precalculated CassandraRowMetadata (cherry picked from commit 39e55cc) * lazy val metadata (cherry picked from commit 3a62237) * change case class default constructor, add unapply method Co-authored-by: Igor Kamyshnikov <[email protected]> Co-authored-by: Igor Kamyshnikov <[email protected]>
1 parent 510f2d4 commit ad26072

File tree

3 files changed

+19
-8
lines changed

3 files changed

+19
-8
lines changed

connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ object CassandraSQLRow {
8585
case set: Set[_] => set.map(toSparkSqlType).toSeq
8686
case list: List[_] => list.map(toSparkSqlType)
8787
case map: Map[_, _] => map map { case(k, v) => (toSparkSqlType(k), toSparkSqlType(v))}
88-
case udt: UDTValue => UDTValue(udt.columnNames, udt.columnValues.map(toSparkSqlType))
88+
case udt: UDTValue => UDTValue(udt.metaData, udt.columnValues.map(toSparkSqlType))
8989
case tupleValue: TupleValue => TupleValue(tupleValue.values.map(toSparkSqlType): _*)
9090
case dateRange: DateRange => dateRange.toString
9191
case _ => value.asInstanceOf[AnyRef]
@@ -106,7 +106,7 @@ object CassandraSQLRow {
106106
case set: Set[_] => set.map(toUnsafeSqlType).toSeq
107107
case list: List[_] => list.map(toUnsafeSqlType)
108108
case map: Map[_, _] => map map { case(k, v) => (toUnsafeSqlType(k), toUnsafeSqlType(v))}
109-
case udt: UDTValue => UDTValue(udt.columnNames, udt.columnValues.map(toUnsafeSqlType))
109+
case udt: UDTValue => UDTValue(udt.metaData, udt.columnValues.map(toUnsafeSqlType))
110110
case tupleValue: TupleValue => TupleValue(tupleValue.values.map(toUnsafeSqlType): _*)
111111
case dateRange: DateRange => UTF8String.fromString(dateRange.toString)
112112
case instant: Instant => java.sql.Timestamp.from(instant)

driver/src/main/scala/com/datastax/spark/connector/UDTValue.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,19 @@ import com.datastax.spark.connector.util.DriverUtil.toName
77
import scala.collection.JavaConversions._
88
import scala.reflect.runtime.universe._
99

10-
final case class UDTValue(columnNames: IndexedSeq[String], columnValues: IndexedSeq[AnyRef])
10+
final case class UDTValue(metaData: CassandraRowMetadata, columnValues: IndexedSeq[AnyRef])
1111
extends ScalaGettableData {
12+
13+
def this(columnNames: IndexedSeq[String], columnValues: IndexedSeq[AnyRef]) =
14+
this(CassandraRowMetadata.fromColumnNames(columnNames), columnValues)
15+
16+
def columnNames: IndexedSeq[String] = metaData.columnNames
17+
1218
override def productArity: Int = columnValues.size
1319
override def productElement(i: Int) = columnValues(i)
1420

15-
override def metaData = CassandraRowMetadata.fromColumnNames(columnNames)
21+
def unapply(t: UDTValue): Some[(IndexedSeq[String],IndexedSeq[AnyRef])] =
22+
Some((t.metaData.columnNames,t.columnValues))
1623
}
1724

1825
object UDTValue {
@@ -35,4 +42,7 @@ object UDTValue {
3542
case x: UDTValue => x
3643
}
3744
}
45+
46+
def apply(columnNames: IndexedSeq[String], columnValues: IndexedSeq[AnyRef]): UDTValue =
47+
new UDTValue(columnNames, columnValues)
3848
}

driver/src/main/scala/com/datastax/spark/connector/types/UserDefinedType.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import com.datastax.oss.driver.api.core.data.{UdtValue => DriverUDTValue}
77
import com.datastax.spark.connector.cql.{FieldDef, StructDef}
88
import com.datastax.spark.connector.types.ColumnType.fromDriverType
99
import com.datastax.spark.connector.types.TypeAdapters.ValueByNameAdapter
10-
import com.datastax.spark.connector.{ColumnName, UDTValue}
10+
import com.datastax.spark.connector.{CassandraRowMetadata, ColumnName, UDTValue}
1111

1212
import scala.collection.JavaConversions._
1313
import scala.reflect.runtime.universe._
@@ -37,6 +37,7 @@ case class UserDefinedType(
3737
def cqlTypeName = name
3838

3939
val fieldConvereters = columnTypes.map(_.converterToCassandra)
40+
private lazy val metadata = CassandraRowMetadata.fromColumnNames(columnNames)
4041

4142
private lazy val valueByNameConverter = scala.util.Try(TypeConverter.forType[ValueByNameAdapter]).toOption
4243

@@ -51,7 +52,7 @@ case class UserDefinedType(
5152
val columnValue = columnConverter.convert(udtValue.getRaw(columnName))
5253
columnValue
5354
}
54-
new UDTValue(columnNames, columnValues)
55+
new UDTValue(metadata, columnValues)
5556
case value if valueByNameConverter.exists(_.convertPF.isDefinedAt(value)) =>
5657
val valuesByName = valueByNameConverter.get.convert(value)
5758
val columnValues =
@@ -61,14 +62,14 @@ case class UserDefinedType(
6162
val columnValue = columnConverter.convert(valuesByName.getByName(columnName))
6263
columnValue
6364
}
64-
new UDTValue(columnNames, columnValues)
65+
new UDTValue(metadata, columnValues)
6566
}
6667
}
6768

6869
override type ValueRepr = UDTValue
6970

7071
override def newInstance(columnValues: Any*): UDTValue = {
71-
UDTValue(columnNames, columnValues.map(_.asInstanceOf[AnyRef]).toIndexedSeq)
72+
UDTValue(metadata, columnValues.map(_.asInstanceOf[AnyRef]).toIndexedSeq)
7273
}
7374
}
7475

0 commit comments

Comments
 (0)