@@ -6,13 +6,13 @@ import java.nio.file.attribute.BasicFileAttributes
66
77import net .sansa_stack .query .spark .semantic .QuerySystem
88import net .sansa_stack .rdf .spark .io ._
9- import net .sansa_stack .rdf .spark .partition .semantic .RdfPartition
9+ import net .sansa_stack .rdf .spark .partition ._
10+ import net .sansa_stack .rdf .common .partition .utils .Symbols
1011import org .apache .jena .graph .Triple
1112import org .apache .jena .riot .Lang
1213import org .apache .spark .rdd .RDD
1314import org .apache .spark .sql .SparkSession
1415
15-
1616/**
1717 * Run SPARQL queries over Spark using Semantic partitioning approach.
1818 *
@@ -23,83 +23,47 @@ object Semantic {
2323 def main (args : Array [String ]) {
2424 parser.parse(args, Config ()) match {
2525 case Some (config) =>
26- run(config.in, config.queries, config.partitions, config. out)
26+ run(config.in, config.queries, config.out)
2727 case None =>
2828 println(parser.usage)
2929 }
3030 }
3131
32- def run (input : String , queries : String , partitions : String , output : String ): Unit = {
32+ def run (input : String , queries : String , output : String ): Unit = {
3333
3434 println(" ===========================================" )
3535 println(" | SANSA - Semantic Partioning example |" )
3636 println(" ===========================================" )
3737
38- // variables initialization
39- val numOfFilesPartition : Int = 1
40- // val queryInputPath: String = queries //args(1)
41- // val partitionedDataPath: String = "src/main/resources/output/partitioned-data/"
42- // val queryResultPath: String = "src/main/resources/output/query-result/"
43- val symbol = Map (
44- " space" -> " " * 5 ,
45- " blank" -> " " ,
46- " tabs" -> " \t " ,
47- " newline" -> " \n " ,
48- " colon" -> " :" ,
49- " comma" -> " ," ,
50- " hash" -> " #" ,
51- " slash" -> " /" ,
52- " question-mark" -> " ?" ,
53- " exclamation-mark" -> " !" ,
54- " curly-bracket-left" -> " {" ,
55- " curly-bracket-right" -> " }" ,
56- " round-bracket-left" -> " (" ,
57- " round-bracket-right" -> " )" ,
58- " less-than" -> " <" ,
59- " greater-than" -> " >" ,
60- " at" -> " @" ,
61- " dot" -> " ." ,
62- " dots" -> " ..." ,
63- " asterisk" -> " *" ,
64- " up-arrows" -> " ^^" )
65-
6638 val spark = SparkSession .builder
6739 .master(" local[*]" )
6840 .config(" spark.serializer" , " org.apache.spark.serializer.KryoSerializer" )
6941 .appName(" SANSA - Semantic Partioning" )
7042 .getOrCreate()
7143
72- // N-Triples reader
7344 val lang = Lang .NTRIPLES
74- val nTriplesRDD = spark.rdf(lang)(input)
45+ val triples = spark.rdf(lang)(input)
7546
7647 println(" ----------------------" )
7748 println(" Phase 1: RDF Partition" )
7849 println(" ----------------------" )
7950
80- // class instance: Class RDFPartition and set the partition data
81- val partitionData = new RdfPartition (
82- symbol,
83- nTriplesRDD,
84- partitions,
85- numOfFilesPartition).partitionGraph
51+ val partitionData = triples.partitionGraphAsSemantic()
8652
8753 // count total number of N-Triples
88- countNTriples(Left (nTriplesRDD ))
54+ countNTriples(Left (triples ))
8955 countNTriples(Right (partitionData))
9056
91- println(symbol(" newline" ))
9257 println(" ----------------------" )
9358 println(" Phase 2: SPARQL System" )
9459 println(" ----------------------" )
9560
96- // class instance: Class QuerySystem
9761 val qs = new QuerySystem (
98- symbol,
62+ symbol = Symbols .symbol ,
9963 partitionData,
10064 input,
10165 output,
102- numOfFilesPartition)
66+ numOfFilesPartition = 1 )
10367 qs.run()
10468
10569 spark.close()
@@ -130,7 +94,7 @@ object Semantic {
13094 case Right (x) => println(s " Number of N-Triples after partition: ${x.distinct.count}" )
13195 }
13296 }
133- case class Config (in : String = " " , queries : String = " " , partitions : String = " " , out : String = " " )
97+ case class Config (in : String = " " , queries : String = " " , out : String = " " )
13498
13599 val parser = new scopt.OptionParser [Config ](" SANSA - Semantic Partioning example" ) {
136100
@@ -148,10 +112,6 @@ object Semantic {
148112 action((x, c) => c.copy(out = x)).
149113 text(" the output directory" )
150114
151- opt[String ]('p' , " partitions" ).required().valueName(" <directory>" ).
152- action((x, c) => c.copy(partitions = x)).
153- text(" the partitions directory" )
154-
155115 help(" help" ).text(" prints this usage text" )
156116 }
157117
0 commit comments