Skip to content

Commit da52698

Browse files
huonwsrowen
authored andcommitted
[SPARK-26757][GRAPHX] Return 0 for count on empty Edge/Vertex RDDs
## What changes were proposed in this pull request? Previously a "java.lang.UnsupportedOperationException: empty collection" exception would be thrown due to using `reduce`, rather than `fold` or similar that can tolerate empty RDDs. This behaviour has existed for the Vertex RDDs since it was introduced in b30e0ae. It seems this behaviour was inherited by the Edge RDDs via copy-paste in ee29ef3. ## How was this patch tested? Two new unit tests. Closes apache#23681 from huonw/empty-graphx. Authored-by: Huon Wilson <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 2514163 commit da52698

File tree

6 files changed

+33
-3
lines changed

6 files changed

+33
-3
lines changed

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
8787

8888
/** The number of edges in the RDD. */
8989
override def count(): Long = {
90-
partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
90+
partitionsRDD.map(_._2.size.toLong).fold(0)(_ + _)
9191
}
9292

9393
override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDDImpl[ED2, VD] =

graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class VertexRDDImpl[VD] private[graphx] (
8787

8888
/** The number of vertices in the RDD. */
8989
override def count(): Long = {
90-
partitionsRDD.map(_.size.toLong).reduce(_ + _)
90+
partitionsRDD.map(_.size.toLong).fold(0)(_ + _)
9191
}
9292

9393
override private[graphx] def mapVertexPartitions[VD2: ClassTag](

graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ object SVDPlusPlus {
7272

7373
// calculate global rating mean
7474
edges.cache()
75-
val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, a._2 + b._2))
75+
val (rs, rc) = edges.map(e => (e.attr, 1L)).fold((0, 0))((a, b) => (a._1 + b._1, a._2 + b._2))
7676
val u = rs / rc
7777

7878
// construct graph

graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,14 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
6060
}
6161
}
6262

63+
test("count") {
64+
withSpark { sc =>
65+
val empty = EdgeRDD.fromEdges(sc.emptyRDD[Edge[Int]])
66+
assert(empty.count === 0)
67+
68+
val edges = List(Edge(0, 1, ()), Edge(1, 2, ()), Edge(2, 0, ()))
69+
val nonempty = EdgeRDD.fromEdges(sc.parallelize(edges))
70+
assert(nonempty.count === edges.size)
71+
}
72+
}
6373
}

graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,4 +223,15 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
223223
assert(verts.collect().toSeq === data) // test checkpointed RDD
224224
}
225225
}
226+
227+
test("count") {
228+
withSpark { sc =>
229+
val empty = VertexRDD(sc.emptyRDD[(Long, Unit)])
230+
assert(empty.count === 0)
231+
232+
val n = 100
233+
val nonempty = vertices(sc, n)
234+
assert(nonempty.count === n + 1)
235+
}
236+
}
226237
}

graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,13 @@ class SVDPlusPlusSuite extends SparkFunSuite with LocalSparkContext {
4040
}
4141
}
4242

43+
test("Test SVD++ with no edges") {
44+
withSpark { sc =>
45+
val edges = sc.emptyRDD[Edge[Double]]
46+
val conf = new SVDPlusPlus.Conf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) // 2 iterations
47+
val (graph, _) = SVDPlusPlus.run(edges, conf)
48+
assert(graph.vertices.count == 0)
49+
assert(graph.edges.count == 0)
50+
}
51+
}
4352
}

0 commit comments

Comments
 (0)