@@ -6,6 +6,7 @@ import org.apache.spark.SparkContext
66import org .apache .spark .rdd .{RDD , UnionRDD }
77import org .apache .spark .sql .SparkSession
88import org .semanticweb .owlapi .apibinding .OWLManager
9+ import org .semanticweb .owlapi .model .AxiomType
910import org .semanticweb .owlapi .model ._
1011
1112import scala .collection .JavaConverters ._
@@ -22,24 +23,24 @@ import scala.collection.JavaConverters._
2223 * @param parallelism The degree of parallelism
2324 * @author Heba Mohamed
2425 */
25- class ForwardRuleReasonerOWLHorst (sc : SparkContext , parallelism : Int = 2 ) extends TransitiveReasoner {
26+ class ForwardRuleReasonerOWLHorst (sc : SparkContext , parallelism : Int = 30 ) extends TransitiveReasoner {
2627
2728 def this (sc : SparkContext ) = this (sc, sc.defaultParallelism)
2829
2930 def apply (axioms : RDD [OWLAxiom ]): RDD [OWLAxiom ] = {
3031
31- val startTime = System .currentTimeMillis()
32+ val axiomsRDD = axioms.cache() // cache this RDD because it will be used quite often
3233
3334 val manager = OWLManager .createOWLOntologyManager()
3435 val dataFactory = manager.getOWLDataFactory
3536
36- val axiomsRDD = axioms.cache() // cache this RDD because it will be used quite often
37+ val startTime = System .currentTimeMillis()
3738
3839 // ------------ extract the schema elements -------------------
39- val classes : RDD [OWLClass ] = axiomsRDD.flatMap {
40- case axiom : HasClassesInSignature => axiom.classesInSignature().iterator().asScala
41- case _ => null
42- }.filter(_ != null ).distinct()
40+ // val classes: RDD[OWLClass] = axiomsRDD.flatMap {
41+ // case axiom: HasClassesInSignature => axiom.classesInSignature().iterator().asScala
42+ // case _ => null
43+ // }.filter(_ != null).distinct()
4344
4445 var subClassof = extractAxiom(axiomsRDD, AxiomType .SUBCLASS_OF )
4546 var subDataProperty = extractAxiom(axiomsRDD, AxiomType .SUB_DATA_PROPERTY )
@@ -84,15 +85,16 @@ class ForwardRuleReasonerOWLHorst (sc: SparkContext, parallelism: Int = 2) exten
8485 subClassof = sc.union(subClassof,
8586 subC1.asInstanceOf [RDD [OWLAxiom ]],
8687 subC2.asInstanceOf [RDD [OWLAxiom ]])
87- .distinct(parallelism)
88+ .distinct(parallelism)
8889
89- // 2. we compute the transitive closure of rdfs:subPropertyOf and rdfs:subClassOf
90+ // 2. we compute the transitive closure of rdfs:subPropertyOf and rdfs:subClassOf
9091 // R1: (x rdfs:subClassOf y), (y rdfs:subClassOf z) -> (x rdfs:subClassOf z)
92+
9193 val tr = new TransitiveReasoner ()
9294
9395 val subClassOfAxiomsTrans = tr.computeTransitiveClosure(subClassof, AxiomType .SUBCLASS_OF )
94- .asInstanceOf [RDD [OWLSubClassOfAxiom ]]
95- .filter(a => a.getSubClass != a.getSuperClass) // to exclude axioms with (C owl:subClassOf C)
96+ .asInstanceOf [RDD [OWLSubClassOfAxiom ]]
97+ .filter(a => a.getSubClass != a.getSuperClass) // to exclude axioms with (C owl:subClassOf C)
9698
9799 // Convert all RDDs into maps which should be more efficient later on
98100 val subClassMap = CollectionUtils
@@ -117,9 +119,6 @@ class ForwardRuleReasonerOWLHorst (sc: SparkContext, parallelism: Int = 2) exten
117119 val equClassMap = equClass_Pairs
118120 .map(a => (a.getOperandsAsList.get(0 ), a.getOperandsAsList.get(1 ))).collect.toMap
119121
120- val equClassSwapMap = equClass_Pairs
121- .map(a => (a.getOperandsAsList.get(1 ), a.getOperandsAsList.get(0 ))).collect.toMap
122-
123122 // distribute the schema data structures by means of shared variables
124123 // Assume that the schema data is less than the instance data
125124
@@ -130,7 +129,7 @@ class ForwardRuleReasonerOWLHorst (sc: SparkContext, parallelism: Int = 2) exten
130129 val dataRangeMapBC = sc.broadcast(dataRangeMap)
131130 val objRangeMapBC = sc.broadcast(objRangeMap)
132131 val equClassMapBC = sc.broadcast(equClassMap)
133- val equClassSwapMapBC = sc.broadcast(equClassSwapMap)
132+ // val equClassSwapMapBC = sc.broadcast(equClassSwapMap)
134133
135134 // Compute the equivalence of classes and properties
136135 // O11c: (C rdfs:subClassOf D ), (D rdfs:subClassOf C ) -> (C owl:equivalentClass D)
@@ -176,16 +175,10 @@ class ForwardRuleReasonerOWLHorst (sc: SparkContext, parallelism: Int = 2) exten
176175 .asInstanceOf [RDD [OWLSubDataPropertyOfAxiom ]]
177176 .filter(a => a.getSubProperty != a.getSuperProperty) // to exclude axioms with (C owl:subDataPropertyOf C)
178177
179- // println("\n Transitive subDataPropOfAxiom closures: \n----------------\n")
180- // subDataPropOfAxiomsTrans.collect().foreach(println)
181-
182178 val subObjPropOfAxiomsTrans = tr.computeTransitiveClosure(subObjProperty, AxiomType .SUB_OBJECT_PROPERTY )
183179 .asInstanceOf [RDD [OWLSubObjectPropertyOfAxiom ]]
184180 .filter(a => a.getSubProperty != a.getSuperProperty) // to exclude axioms with (C owl:subObjectPropertyOf C)
185181
186- // println("\n Transitive subObjPropOfAxiom closures: \n----------------\n")
187- // subObjPropOfAxiomsTrans.collect().foreach(println)
188-
189182 val subAnnPropOfAxiomsTrans = tr.computeTransitiveClosure(subAnnProp, AxiomType .SUB_ANNOTATION_PROPERTY_OF )
190183 .asInstanceOf [RDD [OWLSubAnnotationPropertyOfAxiom ]]
191184 .filter(a => a.getSubProperty != a.getSuperProperty) // to exclude axioms with (C owl:subAnnotationPropertyOf C)
@@ -212,10 +205,6 @@ class ForwardRuleReasonerOWLHorst (sc: SparkContext, parallelism: Int = 2) exten
212205 .filter(a => subDataPropertyBC.value.getOrElse(a.getSuperProperty, Set .empty).contains(a.getSubProperty))
213206 .map(a => dataFactory.getOWLEquivalentDataPropertiesAxiom(a.getSubProperty, a.getSuperProperty))
214207
215- // val equivDP = equDataProp.union(eqDP.asInstanceOf[RDD[OWLAxiom]]).distinct(parallelism)
216- // println("\n O12c : \n----------------\n")
217- // equivDP.collect().foreach(println)
218-
219208 val eqOP = subObjPropOfAxiomsTrans
220209 .filter(a => subObjectPropertyBC.value.getOrElse(a.getSuperProperty, Set .empty).contains(a.getSubProperty))
221210 .map(a => dataFactory.getOWLEquivalentObjectPropertiesAxiom(a.getSubProperty, a.getSuperProperty))
@@ -429,12 +418,13 @@ class ForwardRuleReasonerOWLHorst (sc: SparkContext, parallelism: Int = 2) exten
429418 val subOperands = subClassof.asInstanceOf [RDD [OWLSubClassOfAxiom ]]
430419 .map(a => (a.getSubClass, a.getSuperClass))
431420
432- val sd = subOperands.filter(s => s._2.isInstanceOf [OWLDataHasValue ])
433- .map(s => (s._1, s._2.asInstanceOf [OWLDataHasValue ]))
421+ val sd : RDD [( OWLClassExpression , OWLDataHasValue )] = subOperands.filter(s => s._2.isInstanceOf [OWLDataHasValue ])
422+ .map(s => (s._1, s._2.asInstanceOf [OWLDataHasValue ])) // (S, OWLDataHasValue(P, w))
434423
435424 val O14_data_b = typeAxioms.filter(a => subClassOfBC.value.contains(a.getClassExpression))
436- .map(a => (a.getClassExpression, a.getIndividual))
437- .join(sd).filter(x => x._2._2.isInstanceOf [OWLDataHasValue ])
425+ .map(a => (a.getClassExpression, a.getIndividual)) // (S, i)
426+ .join(sd) // (S, (i, (OWLDataHasValue(P, w))))
427+ .filter(x => x._2._2.isInstanceOf [OWLDataHasValue ])
438428 .map(a => dataFactory.getOWLDataPropertyAssertionAxiom(a._2._2.getProperty, a._2._1, a._2._2.getFiller))
439429
440430 // case c: OWLEquivelantClass(E, OWLDataHasValue(P, w)), OWLClassAssertion(E, i)
@@ -487,16 +477,16 @@ class ForwardRuleReasonerOWLHorst (sc: SparkContext, parallelism: Int = 2) exten
487477 // O13: (R owl:hasValue V), (R owl:onProperty P), (U P V) -> (U rdf:type R)
488478 // case a: OWLEquivalentClasses(E, OWLDataHasValue(P, w)), OWLDataPropertyAssertion(P, i, w) --> OWLClassAssertion(E, i)
489479
490- val e_swap = e.map(a => (a._2.getProperty, a._1))
480+ val e_swap = e.map(a => (a._2.getProperty, a._1)) // (P, E)
491481 val O13_data_a = dataPropAssertion.filter(a => dataHasValBC.value.contains(a.getProperty))
492- .map(a => (a.getProperty, a.getSubject))
493- .join(e_swap)
482+ .map(a => (a.getProperty, a.getSubject)) // (P, i)
483+ .join(e_swap) // (P, (E, i))
494484 .map(a => dataFactory.getOWLClassAssertionAxiom(a._2._2, a._2._1))
495485
496486 // case b: OWLSubClassOf(S, OWLDataHasValue(P, w)) , OWLClassAssertion(S, i)
497- val s_swap = sd.map(a => (a._2.getProperty, a._1))
498- val O13_data_b = dataPropAssertion.map(a => (a.getProperty, a.getSubject))
499- .join(s_swap)
487+ val s_swap = sd.map(a => (a._2.getProperty, a._1)) // (P, S)
488+ val O13_data_b = dataPropAssertion.map(a => (a.getProperty, a.getSubject)) // (P, U)
489+ .join(s_swap) // (P, (S, U))
500490 .map(a => dataFactory.getOWLClassAssertionAxiom(a._2._2, a._2._1))
501491
502492 // case a: OWLEquivalentClasses(E, OWLObjectHasValue(P, w)), OWLObjectPropertyAssertion(P, i, w) --> OWLClassAssertion(E, i)
@@ -603,47 +593,51 @@ class ForwardRuleReasonerOWLHorst (sc: SparkContext, parallelism: Int = 2) exten
603593
604594 val time = System .currentTimeMillis() - startTime
605595
606- val inferedAxioms = sc.union(typeAxioms.asInstanceOf [RDD [OWLAxiom ]], sameAsAxioms.asInstanceOf [RDD [OWLAxiom ]], SPOAxioms )
596+ val inferredAxioms = sc.union(typeAxioms.asInstanceOf [RDD [OWLAxiom ]], sameAsAxioms.asInstanceOf [RDD [OWLAxiom ]], SPOAxioms )
607597 .subtract(axioms)
608598 .distinct(parallelism)
609599
610- // println("\n Finish with " + inferedAxioms .count + " Inferred Axioms after adding SameAs rules")
600+ println(" \n Finish with " + inferredAxioms .count + " Inferred Axioms after adding SameAs rules" )
611601 println(" \n ...finished materialization in " + (time/ 1000 ) + " sec." )
612602
613- inferedAxioms
603+ val inferredGraph = inferredAxioms.union(axioms).distinct(parallelism)
604+
605+ // inferredAxioms
606+ inferredGraph
614607 }
615608
616609 def extractAxiom (axiom : RDD [OWLAxiom ], T : AxiomType [_]): RDD [OWLAxiom ] = {
617610 axiom.filter(a => a.getAxiomType.equals(T ))
618611 }
619- }
612+ }
620613
621- object ForwardRuleReasonerOWLHorst {
614+ //
615+ object ForwardRuleReasonerOWLHorst {
622616
623617 def main (args : Array [String ]): Unit = {
624618
625- val input = getClass.getResource(" /ont_functional.owl" ).getPath
626-
627- println(" =====================================" )
628- println(" | OWLAxioms Forward Rule Reasoner |" )
629- println(" =====================================" )
630-
631619 val sparkSession = SparkSession .builder
620+ // .master("spark://172.18.160.16:3090")
632621 .master(" local[*]" )
633622 .config(" spark.serializer" , " org.apache.spark.serializer.KryoSerializer" )
634- // .config("spark.kryo.registrator", "net.sansa_stack.inference.spark.forwardchaining.axioms.Registrator")
635- .appName(" OWL Axiom Forward Chaining Rule Reasoner" )
623+ .appName(" OWLAxiom Forward Chaining Rule Reasoner" )
636624 .getOrCreate()
637625
638626 val sc : SparkContext = sparkSession.sparkContext
627+ // val sparkConf = new SparkConf().setMaster("spark://172.18.160.16:3077")
628+
629+ val input = getClass.getResource(" /ont_functional.owl" ).getPath
630+ // val input = args(0)
631+
632+ println(" =====================================" )
633+ println(" | OWLAxioms Forward Rule Reasoner |" )
634+ println(" =====================================" )
639635
640636 // Call the functional syntax OWLAxiom builder
641637 val owlAxiomsRDD : OWLAxiomsRDD = FunctionalSyntaxOWLAxiomsRDDBuilder .build(sparkSession, input)
642- // OWLAxiomsRDD.collect().foreach(println)
643638
644639 val ruleReasoner = new ForwardRuleReasonerOWLHorst (sc, 2 )
645640 val res : RDD [OWLAxiom ] = ruleReasoner(owlAxiomsRDD)
646- // res.collect().foreach(println)
647641
648642 sparkSession.stop
649643 }
0 commit comments