@@ -10,9 +10,10 @@ import org.apache.jena.graph.Triple
1010import org .apache .jena .vocabulary .{RDF , RDFS }
1111import org .apache .spark .SparkContext
1212import org .slf4j .LoggerFactory
13-
1413import scala .collection .mutable
1514
15+ import org .apache .spark .rdd .RDD
16+
1617/**
1718 * A forward chaining implementation of the RDFS entailment regime.
1819 *
@@ -39,8 +40,10 @@ class ForwardRuleReasonerRDFS(sc: SparkContext, parallelism: Int = 2) extends Tr
3940
4041 // as an optimization, we can extract all schema triples first which avoids to run on the whole dataset
4142 // for each schema triple later
42- val schemaTriples = if (extractSchemaTriplesInAdvance) new RDFSSchemaExtractor ().extract(triplesRDD)
43+ val schemaTriples = if (extractSchemaTriplesInAdvance) new RDFSSchemaExtractor ().extract(triplesRDD).cache()
4344 else triplesRDD
45+ schemaTriples.setName(" schema triples" )
46+ // println(s"#schema: ${schemaTriples.count()}")
4447
4548
4649 // 1. we first compute the transitive closure of rdfs:subPropertyOf and rdfs:subClassOf
@@ -49,14 +52,14 @@ class ForwardRuleReasonerRDFS(sc: SparkContext, parallelism: Int = 2) extends Tr
4952 * rdfs11 xxx rdfs:subClassOf yyy .
5053 * yyy rdfs:subClassOf zzz . xxx rdfs:subClassOf zzz .
5154 */
52- val subClassOfTriples = extractTriples(schemaTriples, RDFS .subClassOf.asNode()) // extract rdfs:subClassOf triples
55+ val subClassOfTriples = extractTriples(schemaTriples, RDFS .subClassOf.asNode()).cache() // extract rdfs:subClassOf triples
5356 val subClassOfTriplesTrans = computeTransitiveClosure(subClassOfTriples, RDFS .subClassOf.asNode()).setName(" rdfs11" )// mutable.Set()++subClassOfTriples.collect())
5457
5558 /*
5659 rdfs5 xxx rdfs:subPropertyOf yyy .
5760 yyy rdfs:subPropertyOf zzz . xxx rdfs:subPropertyOf zzz .
5861 */
59- val subPropertyOfTriples = extractTriples(schemaTriples, RDFS .subPropertyOf.asNode()) // extract rdfs:subPropertyOf triples
62+ val subPropertyOfTriples = extractTriples(schemaTriples, RDFS .subPropertyOf.asNode()).cache() // extract rdfs:subPropertyOf triples
6063 val subPropertyOfTriplesTrans = computeTransitiveClosure(subPropertyOfTriples, RDFS .subPropertyOf.asNode()).setName(" rdfs5" )// extractTriples(mutable.Set()++subPropertyOfTriples.collect(), RDFS.subPropertyOf.getURI))
6164
6265 // a map structure should be more efficient
@@ -71,7 +74,9 @@ class ForwardRuleReasonerRDFS(sc: SparkContext, parallelism: Int = 2) extends Tr
7174 // split by rdf:type
7275 val split = triplesRDD.partitionBy(t => t.p == RDF .`type`.asNode)
7376 var typeTriples = split._1
77+ typeTriples.setName(" rdf:type triples" )
7478 var otherTriples = split._2
79+ otherTriples.setName(" other triples" )
7580
7681// val formatter = java.text.NumberFormat.getIntegerInstance
7782// println("triples" + formatter.format(triplesRDD.count()))
@@ -92,7 +97,7 @@ class ForwardRuleReasonerRDFS(sc: SparkContext, parallelism: Int = 2) extends Tr
9297 .setName(" rdfs7" )
9398
9499 // add triples
95- otherTriples = otherTriples.union(triplesRDFS7)
100+ otherTriples = otherTriples.union(triplesRDFS7).setName( " other triples with rdfs7 " )
96101
97102 // 3. Domain and Range inheritance according to rdfs2 and rdfs3 is computed
98103
@@ -125,10 +130,10 @@ class ForwardRuleReasonerRDFS(sc: SparkContext, parallelism: Int = 2) extends Tr
125130 .setName(" rdfs3" )
126131
127132 // rdfs2 and rdfs3 generated rdf:type triples which we'll add to the existing ones
128- val triples23 = triplesRDFS2.union(triplesRDFS3)
133+ val triples23 = triplesRDFS2.union(triplesRDFS3).setName( " rdfs2 + rdfs3 " )
129134
130135 // all rdf:type triples here as intermediate result
131- typeTriples = typeTriples.union(triples23)
136+ typeTriples = typeTriples.union(triples23).setName( " rdf:type + rdfs2 + rdfs3 " )
132137
133138
134139 // 4. SubClass inheritance according to rdfs9
@@ -168,8 +173,9 @@ class ForwardRuleReasonerRDFS(sc: SparkContext, parallelism: Int = 2) extends Tr
168173 subClassOfTriplesTrans,
169174 subPropertyOfTriplesTrans,
170175 typeTriples,
171- triplesRDFS7,
176+ // triplesRDFS7,
172177 triplesRDFS9))
178+ .setName(" rdf:type + other + rdfs2 + rdfs3 + rdfs5 + rdfs7 + rdfs9 + rdfs11" )
173179 .distinct(parallelism)
174180
175181 // we perform also additional rules if enabled
@@ -180,7 +186,7 @@ class ForwardRuleReasonerRDFS(sc: SparkContext, parallelism: Int = 2) extends Tr
180186
181187 // rdfs4a: (s p o) => (s rdf:type rdfs:Resource)
182188 // rdfs4b: (s p o) => (o rdf:type rdfs:Resource) // filter by literals
183- // TODO not sure which version is more effcient , using a FILTER + UNION, or doing it via faltMap but creating Set objects
189+ // TODO not sure which version is more efficient , using a FILTER + UNION, or doing it via faltMap but creating Set objects
184190// val rdfs4 = allTriples.map(t => Triple.create(t.s, RDF.`type`.asNode(), RDFS.Resource.asNode()))
185191// .union(
186192// allTriples.filter(!_.getObject.isLiteral).map(t => Triple.create(t.o, RDF.`type`.asNode(), RDFS.Resource.asNode())))
0 commit comments