Skip to content

Commit ffacc86

Browse files
kevinhartmanc-w
authored andcommitted
Add test coverage for de-duplication logic (#49)
1 parent eceda8c commit ffacc86

File tree

2 files changed

+91
-4
lines changed

2 files changed

+91
-4
lines changed

project-fortis-spark/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/CassandraExtensions.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra
22

3-
import com.datastax.spark.connector.cql.{CassandraConnector, Schema}
3+
import com.datastax.spark.connector.cql.{CassandraConnector, Schema, TableDef}
44
import com.datastax.spark.connector.writer._
5-
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
5+
import org.apache.spark.rdd.RDD
66

77
import scala.reflect.ClassTag
88

@@ -12,11 +12,15 @@ object CassandraExtensions {
1212
(implicit connector: CassandraConnector = CassandraConnector(rdd.sparkContext), rwf: RowWriterFactory[V], kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K]): RDD[(K, V)] =
1313
{
1414
val tableDef = Schema.tableFromCassandra(connector, keyspaceName, tableName)
15+
rdd.deDupValuesByCassandraTable(tableDef)
16+
}
17+
18+
def deDupValuesByCassandraTable(tableDef: TableDef)
19+
(implicit rwf: RowWriterFactory[V], kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K]): RDD[(K, V)] =
20+
{
1521
val rowWriter = implicitly[RowWriterFactory[V]].rowWriter(tableDef, tableDef.primaryKey.map(_.ref))
1622
val primaryKeySize = tableDef.primaryKey.length
1723

18-
//import org.apache.spark.rdd.PairRDDFunctions
19-
2024
rdd.groupByKey().mapValues(eventRows => {
2125
eventRows.groupBy(value => {
2226
// Group by an ordered list of primary key column values.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra
2+
3+
import com.datastax.spark.connector.cql._
4+
import com.datastax.spark.connector.types.{DoubleType, IntType, TextType}
5+
import org.apache.spark.{SparkConf, SparkContext}
6+
import org.scalatest.{BeforeAndAfter, FlatSpec}
7+
import com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra.CassandraExtensions._
8+
import com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra.CassandraExtensionsTests.TestInstance
9+
10+
class CassandraExtensionsTests extends FlatSpec with BeforeAndAfter {
11+
private val tableDef = TableDef(
12+
keyspaceName = "fortis",
13+
tableName = "test",
14+
partitionKey = List(
15+
ColumnDef("a", PartitionKeyColumn, IntType)
16+
),
17+
clusteringColumns = List(
18+
ColumnDef("b", ClusteringColumn(0), TextType)
19+
),
20+
21+
// Note: regular columns aren't part of Primary Key / uniqueness
22+
regularColumns = List(
23+
ColumnDef("c", RegularColumn, DoubleType)
24+
)
25+
)
26+
27+
private val conf = new SparkConf()
28+
.setAppName(this.getClass.getSimpleName)
29+
.setMaster("local[*]")
30+
.set("output.consistency.level", "LOCAL_ONE")
31+
32+
private var sc: SparkContext = _
33+
34+
before {
35+
sc = new SparkContext(conf)
36+
}
37+
38+
after {
39+
sc.stop()
40+
}
41+
42+
it should "remove duplicates only among pairs with the same key" in {
43+
val duplicateRow = TestInstance(1, "foo", 1.23)
44+
val testRdd = sc.makeRDD(Seq(
45+
("key1", duplicateRow),
46+
("key1", duplicateRow),
47+
("key2", duplicateRow),
48+
("key2", duplicateRow)
49+
))
50+
51+
val deDupedRdd = testRdd.deDupValuesByCassandraTable(tableDef)
52+
val rows = deDupedRdd.collect()
53+
54+
assert(rows.length == 2
55+
&& rows.exists(_._1 == "key1")
56+
&& rows.exists(_._1 == "key2")
57+
)
58+
}
59+
60+
it should "consider duplicates without respect to regular columns (non-primary key)" in {
61+
val testRdd = sc.makeRDD(Seq(
62+
("key", TestInstance(a = 1, b = "foo", 1.23)),
63+
("key", TestInstance(a = 1, b = "foo", 4.56))
64+
))
65+
66+
val deDupedRdd = testRdd.deDupValuesByCassandraTable(tableDef)
67+
val rows = deDupedRdd.collect()
68+
69+
assert(rows.length == 1)
70+
}
71+
72+
it should "handle empty and single-element RDDs" in {
73+
val emptyRdd = sc.makeRDD[(String, TestInstance)](Seq.empty)
74+
val testRdd = sc.makeRDD(Seq(("key", TestInstance(a = 1, b = "foo", c = 1.23))))
75+
76+
assert(emptyRdd.deDupValuesByCassandraTable(tableDef).collect().isEmpty)
77+
assert(testRdd.deDupValuesByCassandraTable(tableDef).collect().length == 1)
78+
}
79+
}
80+
81+
object CassandraExtensionsTests {
82+
private case class TestInstance(a: Int, b: String, c: Double)
83+
}

0 commit comments

Comments
 (0)