Skip to content

Commit 83295fc

Browse files
SPARKC-673 Fix invalid call to CodecRegistry (#1334) (#1339)
Fix regression introduced in f1f666c Co-authored-by: Adam Dąbrowski <[email protected]>
1 parent 9f9473e commit 83295fc

File tree

4 files changed

+166
-2
lines changed

4 files changed

+166
-2
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package com.datastax.spark.connector.types
2+
3+
import com.datastax.oss.driver.api.core.CqlSession
4+
import com.datastax.spark.connector.SparkCassandraITFlatSpecBase
5+
import com.datastax.spark.connector.cluster.DefaultCluster
6+
import com.datastax.spark.connector.cql.CassandraConnector
7+
import com.datastax.spark.connector._
8+
import org.apache.spark.sql.cassandra._
9+
import org.scalatest.matchers.{MatchResult, Matcher}
10+
11+
case class Ingredients(id: Int, ingredient: (String, Array[Byte]))
12+
13+
case class Recipes(id: Int, ingredients: ((String, Array[Byte]), (String, Array[Byte])))
14+
15+
class TupleTypeSpec extends SparkCassandraITFlatSpecBase with DefaultCluster {
16+
17+
override lazy val conn = CassandraConnector(sparkConf)
18+
19+
val IngredientsTable = "ingredients"
20+
val RecipesTable = "recipes"
21+
22+
def makeTupleTables(session: CqlSession): Unit = {
23+
session.execute(
24+
s"""CREATE TABLE IF NOT EXISTS $ks.$IngredientsTable
25+
|(id int PRIMARY KEY, ingredient tuple<text, blob>);""".stripMargin)
26+
27+
session.execute(
28+
s"""CREATE TABLE IF NOT EXISTS $ks.$RecipesTable
29+
|(id int PRIMARY KEY, ingredients tuple<tuple<text, blob>, tuple<text, blob>>)""".stripMargin)
30+
}
31+
32+
override def beforeClass {
33+
conn.withSessionDo { session =>
34+
session.execute(
35+
s"""CREATE KEYSPACE IF NOT EXISTS $ks
36+
|WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }"""
37+
.stripMargin)
38+
makeTupleTables(session)
39+
}
40+
}
41+
42+
private val beTheSameIngredientAs = (expected: (String, Array[Byte])) =>
43+
Matcher { (left: (String, Array[Byte])) =>
44+
MatchResult(
45+
(left._1 equals expected._1) && (left._2 sameElements expected._2),
46+
s"$left equals $expected",
47+
s"$left does not equal $expected"
48+
)
49+
}
50+
51+
"SparkSql" should "write tuples with BLOB elements" in {
52+
val expected = ("fish", "><>".getBytes)
53+
spark.createDataFrame(Seq(Ingredients(1, expected)))
54+
.write
55+
.cassandraFormat(IngredientsTable, ks)
56+
.mode("append")
57+
.save()
58+
val row = spark.sparkContext
59+
.cassandraTable[Ingredients](ks, IngredientsTable)
60+
.collect()
61+
.head
62+
row.ingredient should beTheSameIngredientAs(expected)
63+
}
64+
65+
it should "write nested tuples" in {
66+
val expected = (("fish", "><>".getBytes), ("poisson", "»<>".getBytes))
67+
spark.createDataFrame(Seq(Recipes(1, expected)))
68+
.write
69+
.cassandraFormat(RecipesTable, ks)
70+
.mode("append")
71+
.save()
72+
val row = spark.sparkContext
73+
.cassandraTable[Recipes](ks, RecipesTable)
74+
.collect()
75+
.head
76+
row.ingredients._1 should beTheSameIngredientAs(expected._1)
77+
row.ingredients._2 should beTheSameIngredientAs(expected._2)
78+
}
79+
80+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package com.datastax.spark.connector.types
2+
3+
import com.datastax.oss.driver.api.core.CqlSession
4+
import com.datastax.spark.connector.SparkCassandraITFlatSpecBase
5+
import com.datastax.spark.connector.cluster.DefaultCluster
6+
import com.datastax.spark.connector.cql.CassandraConnector
7+
import com.datastax.spark.connector._
8+
import org.apache.spark.sql.cassandra._
9+
10+
// UDTs
11+
case class File(data: Array[Byte])
12+
13+
case class Profile(name: String, picture: File)
14+
15+
// Tables
16+
case class Files(id: Int, file: File)
17+
18+
case class Profiles(id: Int, profile: Profile)
19+
20+
class UserDefinedTypeSpec extends SparkCassandraITFlatSpecBase with DefaultCluster {
21+
22+
override lazy val conn = CassandraConnector(sparkConf)
23+
24+
val FilesTable = "files"
25+
val ProfilesTable = "profiles"
26+
27+
def makeUdtTables(session: CqlSession): Unit = {
28+
session.execute(s"""CREATE TYPE IF NOT EXISTS $ks.file (data blob);""")
29+
session.execute(
30+
s"""CREATE TABLE IF NOT EXISTS $ks.$FilesTable
31+
|(id int PRIMARY KEY, file frozen<file>);""".stripMargin)
32+
33+
session.execute(s"""CREATE TYPE IF NOT EXISTS $ks.profile (name text, picture frozen<file>)""")
34+
session.execute(
35+
s"""CREATE TABLE IF NOT EXISTS $ks.$ProfilesTable
36+
|(id int PRIMARY KEY, profile frozen<profile>)""".stripMargin)
37+
}
38+
39+
override def beforeClass {
40+
conn.withSessionDo { session =>
41+
session.execute(
42+
s"""CREATE KEYSPACE IF NOT EXISTS $ks
43+
|WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }"""
44+
.stripMargin)
45+
makeUdtTables(session)
46+
}
47+
}
48+
49+
"SparkSql" should "write UDTs with BLOB fields" in {
50+
val expected = File(":)".getBytes)
51+
spark.createDataFrame(Seq(Files(1, expected)))
52+
.write
53+
.cassandraFormat(FilesTable, ks)
54+
.mode("append")
55+
.save()
56+
val row = spark.sparkContext
57+
.cassandraTable[Files](ks, FilesTable)
58+
.collect()
59+
.head
60+
row.file.data shouldEqual expected.data
61+
}
62+
63+
it should "write nested UDTs" in {
64+
val expected = Profile("John Smith", File(":)".getBytes))
65+
spark.createDataFrame(Seq(Profiles(1, expected)))
66+
.write
67+
.cassandraFormat(ProfilesTable, ks)
68+
.mode("append")
69+
.save()
70+
val row = spark.sparkContext
71+
.cassandraTable[Profiles](ks, ProfilesTable)
72+
.collect()
73+
.head
74+
row.profile.name shouldEqual expected.name
75+
row.profile.picture.data shouldEqual expected.picture.data
76+
}
77+
78+
}

driver/src/main/scala/com/datastax/spark/connector/types/TupleType.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package com.datastax.spark.connector.types
44
import java.io.ObjectOutputStream
55

66
import com.datastax.oss.driver.api.core.`type`.{DataType, TupleType => DriverTupleType}
7+
import com.datastax.oss.driver.api.core.`type`.codec.registry.CodecRegistry
78
import com.datastax.oss.driver.api.core.data.{TupleValue => DriverTupleValue}
89
import com.datastax.spark.connector.cql.{FieldDef, StructDef}
910
import com.datastax.spark.connector.types.ColumnType.fromDriverType
@@ -116,7 +117,11 @@ object TupleType {
116117
for (i <- 0 until fieldTypes.size) {
117118
val fieldConverter = fieldConverters(i)
118119
val fieldValue = fieldConverter.convert(tupleValue.getRaw(i))
119-
toSave.set(i, fieldValue, fieldValue.getClass.asInstanceOf[Class[AnyRef]])
120+
if (fieldValue == null) {
121+
toSave.setToNull(i)
122+
} else {
123+
toSave.set(i, fieldValue, CodecRegistry.DEFAULT.codecFor(fieldTypes(i), fieldValue))
124+
}
120125
}
121126
toSave
122127
}

driver/src/main/scala/com/datastax/spark/connector/types/UserDefinedType.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com.datastax.spark.connector.types
33
import java.io.ObjectOutputStream
44

55
import com.datastax.oss.driver.api.core.`type`.{DataType, UserDefinedType => DriverUserDefinedType}
6+
import com.datastax.oss.driver.api.core.`type`.codec.registry.CodecRegistry
67
import com.datastax.oss.driver.api.core.data.{UdtValue => DriverUDTValue}
78
import com.datastax.spark.connector.cql.{FieldDef, StructDef}
89
import com.datastax.spark.connector.types.ColumnType.fromDriverType
@@ -96,7 +97,7 @@ object UserDefinedType {
9697
if (fieldValue == null) {
9798
toSave.setToNull(i)
9899
} else {
99-
toSave.set(i, fieldValue, fieldValue.getClass.asInstanceOf[Class[AnyRef]])
100+
toSave.set(i, fieldValue, CodecRegistry.DEFAULT.codecFor(fieldTypes(i), fieldValue))
100101
}
101102
}
102103
toSave

0 commit comments

Comments
 (0)