Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit 903a5ef

Browse files
authored
Merge pull request #26 from CescWang1991/spark-graph
Spark graph
2 parents 750f3fe + cabb9db commit 903a5ef

File tree

1 file changed

+173
-0
lines changed
  • sansa-examples-spark/src/main/scala/net/sansa_stack/examples/spark/query

1 file changed

+173
-0
lines changed
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package net.sansa_stack.examples.spark.query
2+
3+
import net.sansa_stack.query.spark.graph.jena.SparqlParser
4+
import net.sansa_stack.query.spark.graph.jena.model.{IntermediateResult, SparkExecutionModel, Config => modelConfig}
5+
import net.sansa_stack.rdf.spark.partition.graph.algo._
6+
import org.apache.jena.graph.Node
7+
import org.apache.jena.riot.Lang
8+
import org.apache.log4j.Logger
9+
import org.apache.spark.graphx.Graph
10+
11+
import scala.concurrent.duration.Duration
12+
13+
object GraphQuery {
14+
15+
def main(args: Array[String]): Unit = {
16+
17+
parser.parse(args, Config()) match {
18+
case Some(config) => run(config)
19+
case None => println(parser.usage)
20+
}
21+
}
22+
23+
def run(config: Config): Unit = {
24+
25+
println("===========================================")
26+
println("| SANSA - Graph query example |")
27+
println("===========================================")
28+
29+
val log = Logger.getLogger(GraphQuery.getClass)
30+
31+
// set configures for query engine model
32+
modelConfig.setAppName("SANSA Graph Query")
33+
.setInputGraphFile(config.input)
34+
.setInputQueryFile(config.query.head)
35+
.setLang(Lang.NTRIPLES)
36+
.setMaster("local[*]")
37+
38+
// load graph
39+
log.info("Start to load graph")
40+
41+
SparkExecutionModel.createSparkSession()
42+
val session = SparkExecutionModel.getSession
43+
44+
// apply graph partitioning algorithm
45+
val prevG = SparkExecutionModel.getGraph
46+
var g: Graph[Node, Node] = null
47+
var msg: String = null
48+
var numParts: Int = 0
49+
var numIters: Int = 0
50+
51+
// Set number of partitions (if config.numParts is 0, number of partitions equals to that of previous graph)
52+
config.numParts match {
53+
case 0 => numParts = prevG.edges.partitions.length
54+
case other => numParts = other
55+
}
56+
57+
config.numIters match {
58+
case 0 =>
59+
case other => numIters = other
60+
}
61+
62+
var partAlgo: PartitionAlgo[Node, Node] = null
63+
64+
config.algo match {
65+
case "SSHP" =>
66+
if(numIters == 0){
67+
// Partition algorithm will use default number of iterations
68+
partAlgo = new SubjectHashPartition[Node, Node](prevG, session, numParts)
69+
} else {
70+
partAlgo = new SubjectHashPartition[Node, Node](prevG, session, numParts).setNumIterations(numIters)
71+
}
72+
msg = "Start to execute subject semantic hash partitioning"
73+
case "OSHP" =>
74+
if(numIters == 0){
75+
partAlgo = new ObjectHashPartition[Node, Node](prevG, session, numParts)
76+
} else {
77+
partAlgo = new ObjectHashPartition[Node, Node](prevG, session, numParts).setNumIterations(numIters)
78+
}
79+
msg = "Start to execute object semantic hash partitioning"
80+
case "SOSHP" =>
81+
if(numIters == 0){
82+
partAlgo = new SOHashPartition[Node, Node](prevG, session, numParts)
83+
} else {
84+
partAlgo = new SOHashPartition[Node, Node](prevG, session, numParts).setNumIterations(numIters)
85+
}
86+
msg = "Start to execute subject-object semantic hash partitioning"
87+
case "PP" =>
88+
if(numIters == 0){
89+
partAlgo = new PathPartition[Node, Node](prevG, session, numParts)
90+
} else {
91+
partAlgo = new PathPartition[Node, Node](prevG, session, numParts).setNumIterations(numIters)
92+
}
93+
msg = "Start to execute path partitioning"
94+
case "" =>
95+
case other => println(s"the input $other doesn't match any options, no algorithm will be applied.")
96+
}
97+
98+
var start = 0L
99+
var end = 0L
100+
101+
if(partAlgo != null) {
102+
log.info(msg)
103+
start = System.currentTimeMillis()
104+
g = partAlgo.partitionBy().cache()
105+
//SparkExecutionModel.loadGraph(g)
106+
end = System.currentTimeMillis()
107+
log.info("Graph partitioning execution time: "+Duration(end - start, "millis").toMillis+" ms")
108+
}
109+
110+
// query executing
111+
log.info("Start to execute queries")
112+
113+
config.query.foreach{ path =>
114+
log.info("Query file: "+path)
115+
modelConfig.setInputQueryFile(path)
116+
val sp = new SparqlParser(modelConfig.getInputQueryFile)
117+
sp.getOps.foreach{ ops =>
118+
val tag = ops.getTag
119+
log.info("Operation "+tag+" start")
120+
start = System.currentTimeMillis()
121+
ops.execute()
122+
end = System.currentTimeMillis()
123+
log.info(tag+" execution time: "+Duration(end - start, "millis").toMillis+" ms")
124+
}
125+
}
126+
127+
// print results to console
128+
if(config.print){
129+
log.info("print final result")
130+
val results = IntermediateResult.getFinalResult.cache()
131+
if(results.count() >= 10){
132+
log.info("Too long results(more than 10)")
133+
} else {
134+
results.collect().foreach(println(_))
135+
}
136+
results.unpersist()
137+
}
138+
}
139+
140+
case class Config(input: String = "", query: Seq[String] = null, print: Boolean = false, algo: String = "",
141+
numParts: Int = 0, numIters: Int = 0)
142+
143+
val parser: scopt.OptionParser[Config] = new scopt.OptionParser[Config]("Spark-Graph-Example") {
144+
145+
head("SANSA-Query-Graph-Example")
146+
147+
opt[String]('i', "input").required().valueName("<path>").
148+
action((x, c) => c.copy(input = x)).
149+
text("path to file that contains the data (in N-Triples format).")
150+
151+
opt[Seq[String]]('q', "query").required().valueName("<query1>, <query2>...").
152+
action((x, c) => c.copy(query = x)).
153+
text("files that contain SPARQL queries.")
154+
155+
opt[Boolean]('p', "print").optional().valueName("Boolean").
156+
action((_, c) => c.copy(print = true)).
157+
text("print the result to the console(maximum 10 rows), default: false.")
158+
159+
opt[String]('a', "algorithm").optional().valueName("<SSHP | OSHP | SOSHP | PP>").
160+
action((x, c) => c.copy(algo = x)).
161+
text("choose one graph partitioning algorithm, default: no algorithm applied.")
162+
163+
opt[Int]('n', "number of partitions").optional().valueName("<Int>")
164+
.action((x, c) => c.copy(numParts = x))
165+
.text("set the number of partitions.")
166+
167+
opt[Int]('t', "number of iterations").optional().valueName("<Int>")
168+
.action((x, c) => c.copy(numIters = x))
169+
.text("set the number of iterations.")
170+
171+
help("help").text("prints this usage text")
172+
}
173+
}

0 commit comments

Comments
 (0)