Skip to content

Commit dc5383d

Browse files
SPARKC-458: Fix for 2.12.10 Build Issues and Review
The Spark Packages plugin was automatically adding a default spark version of 1.4.0 to all projects which didn't inherit the sparkPackagesSettings group. With this setting added to the cassandra-embedded project the Scala 2.12 integration test can compile. Additionally the SparkRepl class exists in Scala Specific source directories, there was no Scala 2.12 version so that is added as well. The Cassandra Java Util test also required a small fix because it was doing a strict string comparison of typetag's string representations. In Scala 2.12 java.lang is no longer added to Java types (getSimpleName) vs (getName) causing a bunch of mismatches. We remove all "java.lang." from the strings before comparing to fix the mismatch and be compatible with both Scala versions. In Scala 2.12 there seems to be a sensitivity to untyped implicits and their position in the code. Previously, the unannotated implicit connector in the CassandraSourceRelation was declared after it's usage in "saveToCassandra", in Scala 2.12 this caused the implicit to be ignored and the default of (CassandraConnector(sparkConf)) to be used instead. This broke configuring the DataSource using an alternative connection conf. In Scala 2.11 there is no problem with the ordering. To fix this we move and explicitly type the implicit val declarations to the top of the file. Adds travis configuration for running 2.12 Integration Tests
1 parent be21845 commit dc5383d

File tree

5 files changed

+82
-21
lines changed

5 files changed

+82
-21
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ sudo: required
77
dist: trusty
88
scala:
99
- 2.11.12
10+
- 2.12.10
1011

1112
env:
1213
- CASSANDRA_VERSION=2.1.15

project/SparkCassandraConnectorBuild.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ object CassandraSparkBuild extends Build {
3939
lazy val cassandraServerProject = Project(
4040
id = "cassandra-server",
4141
base = file(namespace),
42-
settings = defaultSettings ++ Seq(
42+
settings = defaultSettings ++ sparkPackageSettings ++ Seq(
4343
libraryDependencies ++= Seq(Artifacts.cassandraServer % "it", Artifacts.airlift),
4444
Testing.cassandraServerClasspath := {
4545
(fullClasspath in IntegrationTest).value.map(_.data.getAbsoluteFile).mkString(File.pathSeparator)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.datastax.spark.connector.embedded
2+
3+
import java.io._
4+
import java.net.URLClassLoader
5+
6+
import org.apache.spark.SparkConf
7+
import org.apache.spark.repl.{Main, SparkILoop}
8+
9+
import scala.collection.mutable.ArrayBuffer
10+
import scala.tools.nsc.GenericRunnerSettings
11+
12+
object SparkRepl {
13+
14+
def runInterpreter(input: String, conf: SparkConf): String = {
15+
val in = new BufferedReader(new StringReader(input + "\n"))
16+
val out = new StringWriter()
17+
val cl = getClass.getClassLoader
18+
var paths = new ArrayBuffer[String]
19+
cl match {
20+
case urlLoader: URLClassLoader =>
21+
for (url <- urlLoader.getURLs) {
22+
if (url.getProtocol == "file") {
23+
paths += url.getFile
24+
}
25+
}
26+
case _ =>
27+
}
28+
29+
Main.conf.setAll(conf.getAll)
30+
val interp = new SparkILoop(Some(in), new PrintWriter(out))
31+
Main.interp = interp
32+
val separator = System.getProperty("path.separator")
33+
val settings = new GenericRunnerSettings(s => throw new RuntimeException(s"Scala options error: $s"))
34+
settings.processArguments(List("-classpath", paths.mkString(separator)), true)
35+
interp.process(settings) // Repl starts and goes in loop of R.E.P.L
36+
Main.interp = null
37+
Option(Main.sparkContext).foreach(_.stop())
38+
System.clearProperty("spark.driver.port")
39+
out.toString
40+
}
41+
42+
}

spark-cassandra-connector/src/it/java/com/datastax/spark/connector/CassandraJavaUtilTest.java

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
import java.util.Map;
66
import java.util.Set;
77

8+
import com.datastax.spark.connector.japi.CassandraJavaUtil;
9+
import org.hamcrest.BaseMatcher;
10+
import org.hamcrest.Description;
11+
import org.hamcrest.Matcher;
812
import scala.reflect.api.TypeTags;
913

1014
import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;
@@ -31,61 +35,71 @@
3135
@SuppressWarnings("unchecked")
3236
public class CassandraJavaUtilTest {
3337

38+
/**
39+
* Scala refelection type tags change the string reprsentation of some types, in scala 2.11 java.lang
40+
* is included, in scala 2.12 it is removed. To remove this conflict we just always remove the java.lang
41+
* portion
42+
*/
43+
private String removeJavaLang(String target) {
44+
return target.replaceAll("java.lang.", "");
45+
}
46+
47+
private final String STRING = removeJavaLang(String.class.getName());
48+
private final String LIST_STRING =
49+
removeJavaLang(String.format("%s[%s]", List.class.getName(), String.class.getName()));
50+
private final String MAP_STRING_INT =
51+
removeJavaLang(String.format("%s[%s,%s]", Map.class.getName(), String.class.getName(), Integer.class.getName()));
52+
private final String LIST_SET_MAP_STRING_INT =
53+
removeJavaLang(String.format("%s[%s[%s[%s,%s]]]", List.class.getName(), Set.class.getName(), Map.class.getName(), String.class.getName(), Integer.class.getName()));
54+
3455
@Test
3556
public void testTypeTag1() throws Exception {
3657
TypeTags.TypeTag<String> tt = typeTag(String.class);
37-
assertThat(tt.tpe().toString(), is(String.class.getName()));
58+
assertThat(removeJavaLang(tt.tpe().toString()), is(STRING));
3859
}
3960

4061
@Test
4162
public void testTypeTag2() throws Exception {
4263
TypeTags.TypeTag<List> tt1 = typeTag(List.class, String.class);
43-
assertThat(tt1.tpe().toString(), is(String.format("%s[%s]",
44-
List.class.getName(), String.class.getName())));
64+
assertThat(removeJavaLang(removeJavaLang(tt1.tpe().toString())), is(LIST_STRING));
4565

4666
TypeTags.TypeTag<Map> tt2 = typeTag(Map.class, String.class, Integer.class);
47-
assertThat(tt2.tpe().toString(), is(String.format("%s[%s,%s]",
48-
Map.class.getName(), String.class.getName(), Integer.class.getName())));
67+
assertThat(removeJavaLang(removeJavaLang(tt2.tpe().toString())), is(MAP_STRING_INT));
4968
}
5069

5170
@Test
5271
public void testTypeTag3() throws Exception {
5372
TypeTags.TypeTag<List> tt = typeTag(List.class, typeTag(Set.class, typeTag(Map.class, typeTag(String.class), typeTag(Integer.class))));
54-
assertThat(tt.tpe().toString(), is(String.format("%s[%s[%s[%s,%s]]]",
55-
List.class.getName(), Set.class.getName(), Map.class.getName(), String.class.getName(), Integer.class.getName())));
73+
assertThat(removeJavaLang(tt.tpe().toString()), is(LIST_SET_MAP_STRING_INT));
5674
}
5775

5876
@Test
5977
public void testTypeConverter1() throws Exception {
6078
TypeConverter<List<String>> tc = typeConverter(String.class);
61-
assertThat(tc.targetTypeName(), is(String.class.getSimpleName()));
79+
assertThat(removeJavaLang(tc.targetTypeName()), is(STRING));
6280
}
6381

6482
@Test
6583
public void testTypeConverter2() throws Exception {
6684
TypeConverter<List<String>> tc1 = typeConverter(List.class, String.class);
67-
assertThat(tc1.targetTypeName(), is(String.format("%s[%s]",
68-
List.class.getName(), String.class.getSimpleName())));
85+
assertThat(removeJavaLang(tc1.targetTypeName()), is(LIST_STRING));
6986

7087
TypeConverter<Map<String, Integer>> tc2 = typeConverter(Map.class, String.class, Integer.class);
71-
assertThat(tc2.targetTypeName(), is(String.format("%s[%s,%s]",
72-
Map.class.getName(), String.class.getSimpleName(), Integer.class.getName())));
88+
assertThat(removeJavaLang(tc2.targetTypeName()), is(MAP_STRING_INT));
7389

7490
}
7591

7692
@Test
7793
public void testTypeConverter3() throws Exception {
7894
TypeConverter<List> tc = typeConverter(List.class, typeTag(Set.class, typeTag(Map.class, typeTag(String.class), typeTag(Integer.class))));
79-
assertThat(tc.targetTypeName(), is(String.format("%s[%s[%s[%s,%s]]]",
80-
List.class.getName(), Set.class.getName(), Map.class.getName(), String.class.getSimpleName(), Integer.class.getName())));
95+
assertThat(removeJavaLang(tc.targetTypeName()), is(LIST_SET_MAP_STRING_INT));
8196
}
8297

8398
@Test
8499
public void testTypeConverter4() throws Exception {
85100
TypeTags.TypeTag<List> tt = typeTag(List.class, typeTag(Set.class, typeTag(Map.class, typeTag(String.class), typeTag(Integer.class))));
86101
TypeConverter<List> tc = typeConverter(tt);
87-
assertThat(tc.targetTypeName(), is(String.format("%s[%s[%s[%s,%s]]]",
88-
List.class.getName(), Set.class.getName(), Map.class.getName(), String.class.getSimpleName(), Integer.class.getName())));
102+
assertThat(removeJavaLang(tc.targetTypeName()), is(LIST_SET_MAP_STRING_INT));
89103
}
90104

91105
@Test

spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSourceRelation.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import com.datastax.spark.connector.rdd.partitioner.dht.TokenFactory.forSystemLo
1010
import com.datastax.spark.connector.types.{InetType, UUIDType, VarIntType}
1111
import com.datastax.spark.connector.util.Quote._
1212
import com.datastax.spark.connector.util.{ConfigParameter, Logging, ReflectionUtil}
13-
import com.datastax.spark.connector.writer.{SqlRowWriter, WriteConf}
13+
import com.datastax.spark.connector.writer.{RowWriterFactory, SqlRowWriter, WriteConf}
1414
import com.datastax.spark.connector.{SomeColumns, _}
1515
import org.apache.spark.SparkConf
1616
import org.apache.spark.rdd.RDD
@@ -43,6 +43,10 @@ private[cassandra] class CassandraSourceRelation(
4343
with PrunedFilteredScan
4444
with Logging {
4545

46+
implicit val readconf: ReadConf = readConf
47+
implicit val rwf: RowWriterFactory[Row] = SqlRowWriter.Factory
48+
implicit val cassandraConnector: CassandraConnector = connector
49+
4650
private[this] val tableDef = Schema.tableFromCassandra(
4751
connector,
4852
tableRef.keyspace,
@@ -52,6 +56,7 @@ private[cassandra] class CassandraSourceRelation(
5256
userSpecifiedSchema.getOrElse(StructType(tableDef.columns.map(toStructField)))
5357
}
5458

59+
5560
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
5661
if (overwrite) {
5762
if (confirmTruncate) {
@@ -71,18 +76,17 @@ private[cassandra] class CassandraSourceRelation(
7176

7277
}
7378

74-
implicit val rwf = SqlRowWriter.Factory
7579
val columns = SomeColumns(data.columns.map(x => x: ColumnRef): _*)
7680
data.rdd.saveToCassandra(tableRef.keyspace, tableRef.table, columns, writeConf)
7781
}
7882

83+
84+
7985
override def sizeInBytes: Long = {
8086
// If it's not found, use SQLConf default setting
8187
tableSizeInBytes.getOrElse(sqlContext.conf.defaultSizeInBytes)
8288
}
8389

84-
implicit val cassandraConnector = connector
85-
implicit val readconf = readConf
8690
private[this] val baseRdd =
8791
sqlContext.sparkContext.cassandraTable[CassandraSQLRow](tableRef.keyspace, tableRef.table)
8892

0 commit comments

Comments
 (0)