Skip to content

Commit d81d95a

Browse files
uzadudesrowen
authored andcommitted
[SPARK-19368][MLLIB] BlockMatrix.toIndexedRowMatrix() optimization for sparse matrices
## What changes were proposed in this pull request? Optimization [SPARK-12869] was made for dense matrices but caused great performance issue for sparse matrices because manipulating them is very inefficient. When manipulating sparse matrices in Breeze we better use VectorBuilder. ## How was this patch tested? checked it against a use case that we have that after moving to Spark 2 took 6.5 hours instead of 20 mins. After the change it is back to 20 mins again. Closes apache#16732 from uzadude/SparseVector_optimization. Authored-by: oraviv <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent dd8c179 commit d81d95a

File tree

1 file changed

+29
-16
lines changed

1 file changed

+29
-16
lines changed

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@
1717

1818
package org.apache.spark.mllib.linalg.distributed
1919

20+
import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Matrix => BM}
2021
import scala.collection.mutable.ArrayBuffer
2122

22-
import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Matrix => BM, SparseVector => BSV, Vector => BV}
23-
2423
import org.apache.spark.{Partitioner, SparkException}
2524
import org.apache.spark.annotation.Since
2625
import org.apache.spark.internal.Logging
2726
import org.apache.spark.mllib.linalg._
2827
import org.apache.spark.rdd.RDD
2928
import org.apache.spark.storage.StorageLevel
3029

30+
3131
/**
3232
* A grid partitioner, which uses a regular grid to partition coordinates.
3333
*
@@ -273,24 +273,37 @@ class BlockMatrix @Since("1.3.0") (
273273
require(cols < Int.MaxValue, s"The number of columns should be less than Int.MaxValue ($cols).")
274274

275275
val rows = blocks.flatMap { case ((blockRowIdx, blockColIdx), mat) =>
276-
mat.rowIter.zipWithIndex.map {
276+
mat.rowIter.zipWithIndex.filter(_._1.size > 0).map {
277277
case (vector, rowIdx) =>
278-
blockRowIdx * rowsPerBlock + rowIdx -> ((blockColIdx, vector.asBreeze))
278+
blockRowIdx * rowsPerBlock + rowIdx -> ((blockColIdx, vector))
279279
}
280280
}.groupByKey().map { case (rowIdx, vectors) =>
281-
val numberNonZeroPerRow = vectors.map(_._2.activeSize).sum.toDouble / cols.toDouble
282-
283-
val wholeVector = if (numberNonZeroPerRow <= 0.1) { // Sparse at 1/10th nnz
284-
BSV.zeros[Double](cols)
285-
} else {
286-
BDV.zeros[Double](cols)
287-
}
281+
val numberNonZero = vectors.map(_._2.numActives).sum
282+
val numberNonZeroPerRow = numberNonZero.toDouble / cols.toDouble
283+
284+
val wholeVector =
285+
if (numberNonZeroPerRow <= 0.1) { // Sparse at 1/10th nnz
286+
val arrBufferIndices = new ArrayBuffer[Int](numberNonZero)
287+
val arrBufferValues = new ArrayBuffer[Double](numberNonZero)
288+
289+
vectors.foreach { case (blockColIdx: Int, vec: Vector) =>
290+
val offset = colsPerBlock * blockColIdx
291+
vec.foreachActive { case (colIdx: Int, value: Double) =>
292+
arrBufferIndices += offset + colIdx
293+
arrBufferValues += value
294+
}
295+
}
296+
Vectors.sparse(cols, arrBufferIndices.toArray, arrBufferValues.toArray)
297+
} else {
298+
val wholeVectorBuf = BDV.zeros[Double](cols)
299+
vectors.foreach { case (blockColIdx: Int, vec: Vector) =>
300+
val offset = colsPerBlock * blockColIdx
301+
wholeVectorBuf(offset until Math.min(cols, offset + colsPerBlock)) := vec.asBreeze
302+
}
303+
Vectors.fromBreeze(wholeVectorBuf)
304+
}
288305

289-
vectors.foreach { case (blockColIdx: Int, vec: BV[_]) =>
290-
val offset = colsPerBlock * blockColIdx
291-
wholeVector(offset until Math.min(cols, offset + colsPerBlock)) := vec
292-
}
293-
new IndexedRow(rowIdx, Vectors.fromBreeze(wholeVector))
306+
IndexedRow(rowIdx, wholeVector)
294307
}
295308
new IndexedRowMatrix(rows)
296309
}

0 commit comments

Comments
 (0)