Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit 8de2064

Browse files
authored
Merge pull request #14 from SANSA-Stack/feature/flink-jena
Feature/flink jena
2 parents 0f9741f + 50260fe commit 8de2064

File tree

32 files changed

+1024
-523
lines changed

32 files changed

+1024
-523
lines changed
Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,21 @@
11
package net.sansa_stack.inference.utils
22

33
import org.apache.jena.graph.Triple
4+
import org.apache.jena.shared.PrefixMapping
5+
import org.apache.jena.sparql.util.FmtUtils
46

57

68
/**
79
* Convert a Jena Triple to an N-Triples string.
810
*
11+
* @note it turns out, that it might be more efficient to use the Jena stream based writer API per partition.
12+
*
913
* @author Lorenz Buehmann
1014
*/
1115
class JenaTripleToNTripleString
1216
extends Function[Triple, String]
1317
with java.io.Serializable {
14-
override def apply(t: Triple): String = {
15-
val subStr =
16-
if (t.getSubject.isBlank) {
17-
s"_:${t.getSubject.getBlankNodeLabel}"
18-
} else {
19-
s"<${t.getSubject.getURI}>"
20-
}
2118

22-
val objStr =
23-
if (t.getObject.isLiteral) {
24-
t.getObject
25-
} else if (t.getObject.isBlank) {
26-
s"_:${t.getObject}"
27-
} else {
28-
s"<${t.getObject}>"
29-
}
30-
s"$subStr <${t.getPredicate}> $objStr ."
31-
}
19+
override def apply(t: Triple): String = s"${FmtUtils.stringForTriple(t, null.asInstanceOf[PrefixMapping])} ."
3220
}
3321

sansa-inference-common/src/main/scala/net/sansa_stack/inference/utils/NTriplesStringToRDFTriple.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package net.sansa_stack.inference.utils
22

33
import java.io.ByteArrayInputStream
44

5-
import org.apache.jena.graph.Triple
65
import org.apache.jena.riot.{Lang, RDFDataMgr}
76

87
import net.sansa_stack.inference.data.RDFTriple
@@ -13,7 +12,7 @@ import net.sansa_stack.inference.data.RDFTriple
1312
* @author Lorenz Buehmann
1413
*/
1514
class NTriplesStringToRDFTriple
16-
extends Function1[String, Option[RDFTriple]]
15+
extends ((String) => Option[RDFTriple])
1716
with java.io.Serializable {
1817
override def apply(s: String): Option[RDFTriple] = {
1918
val t = RDFDataMgr.createIteratorTriples(new ByteArrayInputStream(s.getBytes), Lang.NTRIPLES, null).next()
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package net.sansa_stack.inference.utils
2+
3+
/**
4+
* Some utils for logical combinations of boolean functions.
5+
*/
6+
object PredicateUtils {
7+
8+
implicit class RichPredicate[A](f: A => Boolean) extends (A => Boolean) {
9+
def apply(v: A): Boolean = f(v)
10+
11+
/**
12+
* Logical 'and'.
13+
*
14+
* @param g
15+
* @return
16+
*/
17+
def &&(g: A => Boolean): A => Boolean = { x: A =>
18+
f(x) && g(x)
19+
}
20+
21+
/**
22+
* Logical 'or'.
23+
*
24+
* @param g
25+
* @return
26+
*/
27+
def ||(g: A => Boolean): A => Boolean = { x: A =>
28+
f(x) || g(x)
29+
}
30+
31+
/**
32+
* Logical 'not'
33+
*
34+
* @return
35+
*/
36+
def unary_! : A => Boolean = { x: A =>
37+
!f(x)
38+
}
39+
}
40+
}

sansa-inference-flink/pom.xml

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,19 +45,26 @@ under the License.
4545
<scope>test</scope>
4646
</dependency>
4747

48-
<!-- Apache Flink -->
49-
<dependency>
50-
<groupId>org.apache.flink</groupId>
51-
<artifactId>flink-scala_${scala.binary.version}</artifactId>
52-
</dependency>
48+
<!-- RDF Layer -->
5349
<dependency>
54-
<groupId>org.apache.flink</groupId>
55-
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
50+
<groupId>net.sansa-stack</groupId>
51+
<artifactId>sansa-rdf-flink_${scala.binary.version}</artifactId>
52+
<version>${project.version}</version>
5653
</dependency>
54+
55+
<!-- Apache Flink -->
5756
<dependency>
5857
<groupId>org.apache.flink</groupId>
59-
<artifactId>flink-clients_${scala.binary.version}</artifactId>
58+
<artifactId>flink-scala_${scala.binary.version}</artifactId>
6059
</dependency>
60+
<!-- <dependency>-->
61+
<!-- <groupId>org.apache.flink</groupId>-->
62+
<!-- <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>-->
63+
<!-- </dependency>-->
64+
<!-- <dependency>-->
65+
<!-- <groupId>org.apache.flink</groupId>-->
66+
<!-- <artifactId>flink-clients_${scala.binary.version}</artifactId>-->
67+
<!-- </dependency>-->
6168

6269
<!-- Apache JENA 3.x-->
6370
<dependency>
@@ -517,12 +524,12 @@ under the License.
517524
<exclude>org.apache.commons:commons-math</exclude>
518525
<exclude>org.apache.sling:org.apache.sling.commons.json</exclude>
519526
<exclude>commons-logging:commons-logging</exclude>
520-
<exclude>org.apache.httpcomponents:httpclient</exclude>
521-
<exclude>org.apache.httpcomponents:httpcore</exclude>
527+
<!-- <exclude>org.apache.httpcomponents:httpclient</exclude>-->
528+
<!-- <exclude>org.apache.httpcomponents:httpcore</exclude>-->
522529
<exclude>commons-codec:commons-codec</exclude>
523-
<exclude>com.fasterxml.jackson.core:jackson-core</exclude>
524-
<exclude>com.fasterxml.jackson.core:jackson-databind</exclude>
525-
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
530+
<!--<exclude>com.fasterxml.jackson.core:jackson-core</exclude>-->
531+
<!--<exclude>com.fasterxml.jackson.core:jackson-databind</exclude>-->
532+
<!--<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>-->
526533
<exclude>org.codehaus.jettison:jettison</exclude>
527534
<exclude>stax:stax-api</exclude>
528535
<exclude>com.typesafe:config</exclude>

sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/RDFGraphMaterializer.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import net.sansa_stack.inference.rules.ReasoningProfile._
2121
import net.sansa_stack.inference.rules.{RDFSLevel, ReasoningProfile}
2222

2323
/**
24-
* The class to compute the RDFS materialization of a given RDF graph.
24+
* A class to compute the materialization of a given RDF graph for a given reasoning profile.
25+
* Basically, used as the main class for inference.
2526
*
2627
* @author Lorenz Buehmann
2728
*
@@ -66,6 +67,7 @@ object RDFGraphMaterializer {
6667

6768
// set up the execution environment
6869
val env = ExecutionEnvironment.getExecutionEnvironment
70+
// and disable logging to standard out
6971
env.getConfig.disableSysoutLogging()
7072
// env.setParallelism(4)
7173

@@ -74,7 +76,6 @@ object RDFGraphMaterializer {
7476

7577
// load triples from disk
7678
val graph = RDFGraphLoader.loadFromDisk(input, env)
77-
// println(s"|G| = ${graph.size()}")
7879

7980
// create reasoner
8081
val reasoner = profile match {
@@ -90,17 +91,22 @@ object RDFGraphMaterializer {
9091

9192
// compute inferred graph
9293
val inferredGraph = reasoner.apply(graph)
93-
println(s"|G_inf| = ${inferredGraph.size()}")
94+
// println(s"|G_inf| = ${inferredGraph.size}")
95+
96+
// println(env.getExecutionPlan())
9497

9598
// write triples to disk
96-
// RDFGraphWriter.writeToDisk(inferredGraph, output, writeToSingleFile, sortedOutput)
99+
RDFGraphWriter.writeToDisk(inferredGraph, output, writeToSingleFile, sortedOutput)
97100

98-
// println(env.getExecutionPlan())
101+
// println(env.getExecutionPlan())
99102

100-
val jn = if (jobName.isEmpty) s"${profile} Reasoning" else jobName
103+
val jn = if (jobName.isEmpty) s"$profile Reasoning" else jobName
101104

102105
// run the program
103106
env.execute(jn)
107+
108+
109+
104110
}
105111

106112
// the config object
@@ -119,7 +125,7 @@ object RDFGraphMaterializer {
119125

120126
// the CLI parser
121127
val parser = new scopt.OptionParser[Config]("RDFGraphMaterializer") {
122-
head("RDFGraphMaterializer", "0.1.0")
128+
head("RDFGraphMaterializer", "0.6.0")
123129

124130
// opt[Seq[File]]('i', "input").required().valueName("<path1>,<path2>,...").
125131
// action((x, c) => c.copy(in = x)).
@@ -128,7 +134,7 @@ object RDFGraphMaterializer {
128134
.required()
129135
.valueName("<path>")
130136
.action((x, c) => c.copy(in = x))
131-
.text("path to file or directory that contains the input files (in N-Triple format)")
137+
.text("path to file or directory that contains the input files (in N-Triples format)")
132138

133139
opt[URI]('o', "out")
134140
.required()

sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/data/RDFGraph.scala

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
package net.sansa_stack.inference.flink.data
22

3-
import net.sansa_stack.inference.flink.utils.DataSetUtils
43
import org.apache.flink.api.scala.{DataSet, _}
5-
import org.apache.jena.graph.Triple
6-
import net.sansa_stack.inference.data.RDFTriple
4+
import org.apache.jena.graph.{Node, Triple}
5+
76
import net.sansa_stack.inference.flink.utils.DataSetUtils.DataSetOps
87

98
/**
@@ -12,7 +11,7 @@ import net.sansa_stack.inference.flink.utils.DataSetUtils.DataSetOps
1211
* @author Lorenz Buehmann
1312
*
1413
*/
15-
case class RDFGraph(triples: DataSet[RDFTriple]) {
14+
case class RDFGraph(triples: DataSet[Triple]) {
1615

1716
/**
1817
* Returns a DataSet of triples that match with the given input.
@@ -22,11 +21,11 @@ case class RDFGraph(triples: DataSet[RDFTriple]) {
2221
* @param o the object
2322
* @return DataSet of triples
2423
*/
25-
def find(s: Option[String] = None, p: Option[String] = None, o: Option[String] = None): DataSet[RDFTriple] = {
24+
def find(s: Option[Node] = None, p: Option[Node] = None, o: Option[Node] = None): DataSet[Triple] = {
2625
triples.filter(t =>
27-
(s == None || t.s == s.get) &&
28-
(p == None || t.p == p.get) &&
29-
(o == None || t.o == o.get)
26+
(s.isEmpty || t.subjectMatches(s.get)) &&
27+
(p.isEmpty || t.predicateMatches(p.get)) &&
28+
(o.isEmpty || t.objectMatches(o.get))
3029
)
3130
}
3231

@@ -35,11 +34,11 @@ case class RDFGraph(triples: DataSet[RDFTriple]) {
3534
*
3635
* @return DataSet of triples
3736
*/
38-
def find(triple: Triple): DataSet[RDFTriple] = {
37+
def find(triple: Triple): DataSet[Triple] = {
3938
find(
40-
if (triple.getSubject.isVariable) None else Option(triple.getSubject.toString),
41-
if (triple.getPredicate.isVariable) None else Option(triple.getPredicate.toString),
42-
if (triple.getObject.isVariable) None else Option(triple.getObject.toString)
39+
if (triple.getSubject.isVariable) None else Option(triple.getSubject),
40+
if (triple.getPredicate.isVariable) None else Option(triple.getPredicate),
41+
if (triple.getObject.isVariable) None else Option(triple.getObject)
4342
)
4443
}
4544

@@ -68,7 +67,5 @@ case class RDFGraph(triples: DataSet[RDFTriple]) {
6867
*
6968
* @return the number of triples
7069
*/
71-
def size(): Long = {
72-
triples.count()
73-
}
70+
lazy val size: Long = triples.count()
7471
}
Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
package net.sansa_stack.inference.flink.data
22

3-
import java.io.File
43
import java.net.URI
54

6-
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
7-
8-
import net.sansa_stack.inference.data.RDFTriple
9-
import org.apache.flink.configuration.Configuration
105
import scala.language.implicitConversions
116

12-
import net.sansa_stack.inference.utils.NTriplesStringToRDFTriple
7+
import net.sansa_stack.rdf.flink.io.ntriples.NTriplesReader
8+
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
9+
1310

1411
/**
1512
* @author Lorenz Buehmann
@@ -18,39 +15,31 @@ object RDFGraphLoader {
1815

1916
implicit def pathURIsConverter(uris: Seq[URI]): String = uris.map(p => p.toString).mkString(",")
2017

21-
def loadFromFile(path: String, env: ExecutionEnvironment): RDFGraph = {
22-
val triples = env.readTextFile(path)
23-
.map(line => line.replace(">", "").replace("<", "").split("\\s+")) // line to tokens
24-
.map(tokens => RDFTriple(tokens(0), tokens(1), tokens(2))) // tokens to triple
2518

26-
RDFGraph(triples)
19+
def loadFromDisk(path: String, env: ExecutionEnvironment): RDFGraph = {
20+
loadFromDisk(URI.create(path), env)
2721
}
2822

2923
def loadFromDisk(path: URI, env: ExecutionEnvironment): RDFGraph = {
30-
// create a configuration object
31-
val parameters = new Configuration
32-
33-
// set the recursive enumeration parameter
34-
parameters.setBoolean("recursive.file.enumeration", true)
35-
36-
// pass the configuration to the data source
37-
val triples = env.readTextFile(path.toString).withParameters(parameters)
38-
.map(line => line.replace(">", "").replace("<", "").split("\\s+")) // line to tokens
39-
.map(tokens => RDFTriple(tokens(0), tokens(1), tokens(2)))
40-
.name("triples") // tokens to triple
41-
42-
RDFGraph(triples)
24+
loadFromDisk(Seq(path), env)
4325
}
4426

4527
def loadFromDisk(paths: Seq[URI], env: ExecutionEnvironment): RDFGraph = {
28+
RDFGraph(NTriplesReader.load(env, paths).name("triples"))
29+
}
30+
31+
def main(args: Array[String]): Unit = {
32+
if (args.length == 0) println("Usage: RDFGraphLoader <PATH_TO_FILE>")
4633

47-
val tmp: List[String] = paths.map(path => path.toString).toList
34+
val path = args(0)
4835

49-
val converter = new NTriplesStringToRDFTriple()
36+
// val env = ExecutionEnvironment.getExecutionEnvironment
37+
val env = ExecutionEnvironment.createLocalEnvironment(parallelism = 2)
5038

51-
val triples = tmp.map(f => env.readTextFile(f).flatMap(line => converter.apply(line))).reduce(_ union _).name("triples")
39+
val ds = RDFGraphLoader.loadFromDisk(path, env).triples
5240

53-
RDFGraph(triples)
41+
println(s"size:${ds.count}")
42+
println("sample data:\n" + ds.first(10).map { _.toString.replaceAll("[\\x00-\\x1f]", "???")}.collect().mkString("\n"))
5443
}
5544

5645
}

0 commit comments

Comments
 (0)