11package net .sansa_stack .examples .spark .query
22
33import net .sansa_stack .query .spark .graph .jena .SparqlParser
4- import net .sansa_stack .query .spark .graph .jena .model .{IntermediateResult , SparkExecutionModel , Config => modelConfig }
4+ import net .sansa_stack .query .spark .graph .jena .model .{ IntermediateResult , SparkExecutionModel , Config => modelConfig }
55import net .sansa_stack .rdf .spark .partition .graph .algo ._
66import org .apache .jena .graph .Node
77import org .apache .jena .riot .Lang
@@ -16,7 +16,7 @@ object GraphQuery {
1616
1717 parser.parse(args, Config ()) match {
1818 case Some (config) => run(config)
19- case None => println(parser.usage)
19+ case None => println(parser.usage)
2020 }
2121 }
2222
@@ -50,85 +50,85 @@ object GraphQuery {
5050
5151 // Set number of partitions (if config.numParts is 0, number of partitions equals to that of previous graph)
5252 config.numParts match {
53- case 0 => numParts = prevG.edges.partitions.length
53+ case 0 => numParts = prevG.edges.partitions.length
5454 case other => numParts = other
5555 }
5656
5757 config.numIters match {
58- case 0 =>
58+ case 0 =>
5959 case other => numIters = other
6060 }
6161
6262 var partAlgo : PartitionAlgo [Node , Node ] = null
6363
6464 config.algo match {
6565 case " SSHP" =>
66- if (numIters == 0 ){
66+ if (numIters == 0 ) {
6767 // Partition algorithm will use default number of iterations
6868 partAlgo = new SubjectHashPartition [Node , Node ](prevG, session, numParts)
6969 } else {
7070 partAlgo = new SubjectHashPartition [Node , Node ](prevG, session, numParts).setNumIterations(numIters)
7171 }
7272 msg = " Start to execute subject semantic hash partitioning"
7373 case " OSHP" =>
74- if (numIters == 0 ){
74+ if (numIters == 0 ) {
7575 partAlgo = new ObjectHashPartition [Node , Node ](prevG, session, numParts)
7676 } else {
7777 partAlgo = new ObjectHashPartition [Node , Node ](prevG, session, numParts).setNumIterations(numIters)
7878 }
7979 msg = " Start to execute object semantic hash partitioning"
8080 case " SOSHP" =>
81- if (numIters == 0 ){
81+ if (numIters == 0 ) {
8282 partAlgo = new SOHashPartition [Node , Node ](prevG, session, numParts)
8383 } else {
8484 partAlgo = new SOHashPartition [Node , Node ](prevG, session, numParts).setNumIterations(numIters)
8585 }
8686 msg = " Start to execute subject-object semantic hash partitioning"
8787 case " PP" =>
88- if (numIters == 0 ){
88+ if (numIters == 0 ) {
8989 partAlgo = new PathPartition [Node , Node ](prevG, session, numParts)
9090 } else {
9191 partAlgo = new PathPartition [Node , Node ](prevG, session, numParts).setNumIterations(numIters)
9292 }
9393 msg = " Start to execute path partitioning"
94- case " " =>
94+ case " " =>
9595 case other => println(s " the input $other doesn't match any options, no algorithm will be applied. " )
9696 }
9797
9898 var start = 0L
9999 var end = 0L
100100
101- if (partAlgo != null ) {
101+ if (partAlgo != null ) {
102102 log.info(msg)
103103 start = System .currentTimeMillis()
104104 g = partAlgo.partitionBy().cache()
105- // SparkExecutionModel.loadGraph(g)
105+ SparkExecutionModel .loadGraph(g)
106106 end = System .currentTimeMillis()
107- log.info(" Graph partitioning execution time: " + Duration (end - start, " millis" ).toMillis+ " ms" )
107+ log.info(" Graph partitioning execution time: " + Duration (end - start, " millis" ).toMillis + " ms" )
108108 }
109109
110110 // query executing
111111 log.info(" Start to execute queries" )
112112
113- config.query.foreach{ path =>
114- log.info(" Query file: " + path)
113+ config.query.foreach { path =>
114+ log.info(" Query file: " + path)
115115 modelConfig.setInputQueryFile(path)
116116 val sp = new SparqlParser (modelConfig.getInputQueryFile)
117- sp.getOps.foreach{ ops =>
117+ sp.getOps.foreach { ops =>
118118 val tag = ops.getTag
119- log.info(" Operation " + tag+ " start" )
119+ log.info(" Operation " + tag + " start" )
120120 start = System .currentTimeMillis()
121121 ops.execute()
122122 end = System .currentTimeMillis()
123- log.info(tag+ " execution time: " + Duration (end - start, " millis" ).toMillis+ " ms" )
123+ log.info(tag + " execution time: " + Duration (end - start, " millis" ).toMillis + " ms" )
124124 }
125125 }
126126
127127 // print results to console
128- if (config.print){
128+ if (config.print) {
129129 log.info(" print final result" )
130130 val results = IntermediateResult .getFinalResult.cache()
131- if (results.count() >= 10 ){
131+ if (results.count() >= 10 ) {
132132 log.info(" Too long results(more than 10)" )
133133 } else {
134134 results.collect().foreach(println(_))
0 commit comments