Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 79a4dab

Browse files
travishegnerNick Pentreath
authored andcommitted
[SPARK-21958][ML] Word2VecModel save: transform data in the cluster
## What changes were proposed in this pull request? Change a data transformation while saving a Word2VecModel to happen with distributed data instead of local driver data. ## How was this patch tested? Unit tests for the ML sub-component still pass. Running this patch against v2.2.0 in a fully distributed production cluster allows a 4.0G model to save and load correctly, where it would not do so without the patch. Author: Travis Hegner <[email protected]> Closes apache#19191 from travishegner/master.
1 parent 3c6198c commit 79a4dab

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,14 +337,17 @@ object Word2VecModel extends MLReadable[Word2VecModel] {
337337
DefaultParamsWriter.saveMetadata(instance, path, sc)
338338

339339
val wordVectors = instance.wordVectors.getVectors
340-
val dataSeq = wordVectors.toSeq.map { case (word, vector) => Data(word, vector) }
341340
val dataPath = new Path(path, "data").toString
342341
val bufferSizeInBytes = Utils.byteStringAsBytes(
343342
sc.conf.get("spark.kryoserializer.buffer.max", "64m"))
344343
val numPartitions = Word2VecModelWriter.calculateNumberOfPartitions(
345344
bufferSizeInBytes, instance.wordVectors.wordIndex.size, instance.getVectorSize)
346-
sparkSession.createDataFrame(dataSeq)
345+
val spark = sparkSession
346+
import spark.implicits._
347+
spark.createDataset[(String, Array[Float])](wordVectors.toSeq)
347348
.repartition(numPartitions)
349+
.map { case (word, vector) => Data(word, vector) }
350+
.toDF()
348351
.write
349352
.parquet(dataPath)
350353
}

0 commit comments

Comments
 (0)