Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit 4bdda74

Browse files
committed
Added RDFGraphPIClustering example for Spark
1 parent e8a9111 commit 4bdda74

File tree

2 files changed

+65
-0
lines changed

2 files changed

+65
-0
lines changed

sansa-examples-spark/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@
3131
<artifactId>spark-sql_${scala.binary.version}</artifactId>
3232
<version>${spark.version}</version>
3333
</dependency>
34+
<dependency>
35+
<groupId>org.apache.spark</groupId>
36+
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
37+
<version>${spark.version}</version>
38+
</dependency>
3439

3540
<!-- Apache JENA 3.x -->
3641
<dependency>
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package net.sansa_stack.examples.spark.ml.clustering
2+
3+
import scala.collection.mutable
4+
import org.apache.spark.sql.SparkSession
5+
import org.apache.log4j.{ Level, Logger }
6+
import org.apache.spark.graphx.GraphLoader
7+
import net.sansa_stack.ml.spark.clustering.{ RDFGraphPICClustering => RDFGraphPICClusteringAlg }
8+
9+
object RDFGraphPIClustering {
10+
def main(args: Array[String]) = {
11+
if (args.length < 3) {
12+
System.err.println(
13+
"Usage: RDFGraphPIClustering <input> <k> <numIterations>")
14+
System.exit(1)
15+
}
16+
val input = args(0) //"src/main/resources/BorderFlow_Sample1.txt"
17+
val k = args(1).toInt
18+
val numIterations = args(2).toInt
19+
val optionsList = args.drop(3).map { arg =>
20+
arg.dropWhile(_ == '-').split('=') match {
21+
case Array(opt, v) => (opt -> v)
22+
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
23+
}
24+
}
25+
val options = mutable.Map(optionsList: _*)
26+
27+
options.foreach {
28+
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
29+
}
30+
println("============================================")
31+
println("| Power Iteration Clustering example |")
32+
println("============================================")
33+
34+
val sparkSession = SparkSession.builder
35+
.master("local[*]")
36+
.appName(" Power Iteration Clustering example (" + input + ")")
37+
.getOrCreate()
38+
Logger.getRootLogger.setLevel(Level.ERROR)
39+
40+
// Load the graph
41+
val graph = GraphLoader.edgeListFile(sparkSession.sparkContext, input)
42+
43+
val model = RDFGraphPICClusteringAlg(sparkSession, graph, k, numIterations).run()
44+
45+
val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
46+
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
47+
val assignmentsStr = assignments
48+
.map {
49+
case (k, v) =>
50+
s"$k -> ${v.sorted.mkString("[", ",", "]")}"
51+
}.mkString(",")
52+
val sizesStr = assignments.map {
53+
_._2.size
54+
}.sorted.mkString("(", ",", ")")
55+
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
56+
57+
sparkSession.stop
58+
}
59+
60+
}

0 commit comments

Comments
 (0)