11package net .sansa_stack .inference .spark
22
3- import net .sansa_stack .inference .utils .{NTriplesStringToJenaTriple , NTriplesStringToRDFTriple }
43import org .apache .jena .graph .{Node , Triple }
54import org .apache .spark .SparkConf
6- import org .apache .spark .scheduler .{SparkListener , SparkListenerJobEnd , SparkListenerStageCompleted }
5+ import org .apache .spark .scheduler .{SparkListener , SparkListenerJobEnd }
76import org .apache .spark .sql .{Encoder , Encoders , Row , SparkSession }
8- // import org.apache.spark.groupon.metrics.{SparkMeter, SparkTimer, UserMetricsSystem}
7+
8+ import net .sansa_stack .inference .utils .{NTriplesStringToJenaTriple , NTriplesStringToRDFTriple }
9+ import net .sansa_stack .rdf .spark .io .NTripleReader
10+ // import org.apache.spark.groupon.metrics.{SparkMeter, SparkTimer, UserMetricsSystem}
911
1012import scala .reflect .ClassTag
1113
@@ -30,7 +32,7 @@ object DatastructureSerializationPerformanceTests {
3032 conf.registerKryoClasses(Array (classOf [org.apache.jena.graph.Triple ], classOf [org.apache.jena.graph.Node ]))
3133 conf.set(" spark.extraListeners" , " net.sansa_stack.inference.spark.utils.CustomSparkListener" )
3234
33- val parallelism = 4
35+ val parallelism = 20
3436
3537 class JobListener extends SparkListener {
3638 override def onJobEnd (jobEnd : SparkListenerJobEnd ): Unit = {
@@ -40,7 +42,7 @@ object DatastructureSerializationPerformanceTests {
4042
4143 // the SPARK config
4244 val session = SparkSession .builder
43- .appName(s " SPARK RDFS Reasoning " )
45+ .appName(s " RDF Triple Encoder Performance " )
4446 .master(" local[4]" )
4547 .config(" spark.eventLog.enabled" , " true" )
4648 .config(" spark.hadoop.validateOutputSpecs" , " false" ) // override output files
@@ -74,10 +76,8 @@ object DatastructureSerializationPerformanceTests {
7476 .getOrCreate()
7577
7678
77- def loadAndDictinctJena (path : String ): Unit = {
78- val triples = session.sparkContext
79- .textFile(path, 4 ) // read the text file
80- .map(new NTriplesStringToJenaTriple ())
79+ def loadAndDistinctJena (path : String ): Unit = {
80+ val triples = NTripleReader .load(session, path)
8181
8282 triples.cache()
8383
@@ -88,11 +88,12 @@ object DatastructureSerializationPerformanceTests {
8888 val pair = triples.map(t => (t.getSubject, (t.getPredicate, t.getObject))) // map to PairRDD
8989 val joinCount = pair.join(pair).count()
9090
91- logger.info(distinctCount)
92- logger.info(joinCount)
91+ logger.info(" Jena RDD[Triple]" )
92+ logger.info(s " #triples: $distinctCount" )
93+ logger.info(s " #joined triples(s-s): $joinCount" )
9394 }
9495
95- def loadAndDictinctPlain (path : String ): Unit = {
96+ def loadAndDistinctPlain (path : String ): Unit = {
9697 val triples = session.sparkContext
9798 .textFile(path, 4 ) // read the text file
9899 .flatMap(line => new NTriplesStringToRDFTriple ().apply(line))
@@ -124,10 +125,9 @@ object DatastructureSerializationPerformanceTests {
124125 implicit def tuple3 [A1 , A2 , A3 ](implicit e1 : Encoder [A1 ], e2 : Encoder [A2 ], e3 : Encoder [A3 ]): Encoder [(A1 , A2 , A3 )] =
125126 Encoders .tuple[A1 , A2 , A3 ](e1, e2, e3)
126127
127- val triples = session.sparkContext
128- .textFile(path, 4 ) // read the text file
129- .map(new NTriplesStringToJenaTriple ())
130- .map(t => (t.getSubject, t.getPredicate, t.getObject))
128+ val triplesRDD = NTripleReader .load(session, path)
129+
130+ val tripleNodesRDD = triplesRDD.map(t => (t.getSubject, t.getPredicate, t.getObject))
131131
132132 val conv = new NTriplesStringToJenaTriple ()
133133 var tripleDS =
@@ -136,29 +136,38 @@ object DatastructureSerializationPerformanceTests {
136136// val t = conv.apply(row.getString(0))
137137// (t.getSubject, t.getPredicate, t.getObject)
138138// })
139- session.createDataset(triples )
139+ session.createDataset(tripleNodesRDD )
140140 .toDF(" s" , " p" , " o" )
141141 .as[JenaTripleEncoded ]
142142
143+ tripleDS.printSchema()
143144 tripleDS.cache()
144145
146+ // show 10 triples
147+ tripleDS.show()
148+
145149 // DISTINCT and COUNT
146150 val distinctCount = tripleDS.distinct().count()
147151
148- // self JOIN on subject and COUNT
149- val joinCount = tripleDS.alias(" A" ).join(tripleDS.alias(" B" ), $" A.s" === $" B.s" , " inner" ).count()
150152
151- logger.info(distinctCount)
152- logger.info(joinCount)
153+ // self JOIN on subject and COUNT
154+ val triplesA = tripleDS.alias(" A" )
155+ val triplesB = tripleDS.alias(" B" )
156+ val triplesJoined = triplesA.joinWith(triplesB, $" A.s" === $" B.s" )
157+ val joinCount = triplesJoined.count()
158+
159+ logger.info(" DataFrame[(Node, Node, Node)]" )
160+ logger.info(s " #triples: $distinctCount" )
161+ logger.info(s " #joined triples(s-s): $joinCount" )
153162 }
154163
155164 def main (args : Array [String ]): Unit = {
156165
157166 val path = args(0 )
158-
159- loadAndDictinctJena (path)
160-
161- loadAndDictinctPlain (path)
167+ //
168+ // loadAndDistinctJena (path)
169+ //
170+ // loadAndDistinctPlain (path)
162171
163172 loadAndDistinctDatasetJena(path)
164173
0 commit comments