Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit cd51c92

Browse files
committed
Add main method for sparql query on GraphX
1 parent a32e5b6 commit cd51c92

File tree

1 file changed

+107
-0
lines changed
  • sansa-examples-spark/src/main/scala/net/sansa_stack/examples/spark/query

1 file changed

+107
-0
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package net.sansa_stack.examples.spark.query
2+
3+
import net.sansa_stack.query.spark.graph.jena.SparqlParser
4+
import net.sansa_stack.query.spark.graph.jena.model.{IntermediateResult, SparkExecutionModel, Config => modelConfig}
5+
import org.apache.jena.riot.Lang
6+
import org.apache.log4j.Logger
7+
8+
import scala.concurrent.duration._
9+
10+
object graph {
11+
12+
def main(args: Array[String]): Unit = {
13+
14+
parser.parse(args, Config()) match {
15+
case Some(config) => run(config)
16+
case None =>
17+
}
18+
}
19+
20+
def run(config: Config): Unit = {
21+
22+
println("===========================================")
23+
println("| SANSA - Graph query example |")
24+
println("===========================================")
25+
26+
val log = Logger.getLogger(graph.getClass)
27+
28+
// set configures for query engine model
29+
modelConfig.setAppName("SANSA Graph Query")
30+
.setInputGraphFile(config.input)
31+
.setInputQueryFile(config.query.head)
32+
.setLang(Lang.NTRIPLES)
33+
.setMaster("local[*]")
34+
35+
// load graph
36+
log.info("Start to load graph")
37+
38+
SparkExecutionModel.createSparkSession()
39+
40+
// apply graph partitioning algorithm
41+
config.algo match {
42+
case "SSHP" => //compiler here
43+
case "OSHP" => //compiler here
44+
case "SOSHP" => //compiler here
45+
case "PP" => //compiler here
46+
case "" =>
47+
case other => println("the input of algorithm doesn't any options, no algorithm will be applied.")
48+
}
49+
50+
// query executing
51+
log.info("Start to execute queries")
52+
53+
var start = 0L
54+
var end = 0L
55+
56+
config.query.foreach{ path =>
57+
log.info("Query file: "+path)
58+
modelConfig.setInputQueryFile(path)
59+
val sp = new SparqlParser(modelConfig.getInputQueryFile)
60+
sp.getOps.foreach{ ops =>
61+
val tag = ops.getTag
62+
log.info("Operation "+tag+" start")
63+
start = System.currentTimeMillis()
64+
ops.execute()
65+
end = System.currentTimeMillis()
66+
log.info(tag+" execution time: "+Duration(end - start, "millis").toMillis+" ms")
67+
}
68+
}
69+
70+
// print results to console
71+
if(config.print){
72+
log.info("print final result")
73+
val results = IntermediateResult.getFinalResult.cache()
74+
if(results.count() >= 10){
75+
log.info("Too long results(more than 10)")
76+
} else {
77+
results.foreach(println(_))
78+
}
79+
results.unpersist()
80+
}
81+
}
82+
83+
case class Config(input: String = "", query: Seq[String] = null, print: Boolean = false, algo: String = "")
84+
85+
val parser: scopt.OptionParser[Config] = new scopt.OptionParser[Config]("Spark-Graph-Example") {
86+
87+
head("SANSA-Query-Graph-Example")
88+
89+
opt[String]('i', "input").required().valueName("<path>").
90+
action((x, c) => c.copy(input = x)).
91+
text("path to file that contains the data (in N-Triples format)")
92+
93+
opt[Seq[String]]('q', "query").required().valueName("<query1>, <query2>...").
94+
action((x, c) => c.copy(query = x)).
95+
text("files that contain the SPARQL query")
96+
97+
opt[Boolean]('p', "print").optional().valueName("Boolean").
98+
action((_, c) => c.copy(print = true)).
99+
text("print the result to the console, default: false")
100+
101+
opt[String]('a', "algorithm").optional().valueName("SSHP | OSHP | SOSHP | PP").
102+
action((x, c) => c.copy(algo = x)).
103+
text("choose one graph partitioning algorithm (default no algorithm applied)")
104+
105+
help("help").text("prints this usage text")
106+
}
107+
}

0 commit comments

Comments
 (0)