|
| 1 | +package net.sansa_stack.examples.spark.inference.axioms |
| 2 | + |
| 3 | +import net.sansa_stack.inference.rules.{ RDFSLevel, ReasoningProfile } |
| 4 | +import net.sansa_stack.inference.rules.ReasoningProfile._ |
| 5 | +import net.sansa_stack.inference.spark.forwardchaining.axioms.{ ForwardRuleReasonerOWLHorst, ForwardRuleReasonerRDFS, TransitiveReasoner } |
| 6 | +import net.sansa_stack.owl.spark.owl._ |
| 7 | +import org.apache.spark.rdd.RDD |
| 8 | +import org.apache.spark.sql.SparkSession |
| 9 | +import org.semanticweb.owlapi.model.OWLAxiom |
| 10 | + |
| 11 | +object RDFGraphInference { |
| 12 | + |
| 13 | + def main(args: Array[String]) { |
| 14 | + parser.parse(args, Config()) match { |
| 15 | + case Some(config) => |
| 16 | + run(config.in, config.profile, config.parallelism) |
| 17 | + case None => |
| 18 | + println(parser.usage) |
| 19 | + } |
| 20 | + } |
| 21 | + |
| 22 | + def run(input: String, profile: ReasoningProfile, parallelism: Int): Unit = { |
| 23 | + |
| 24 | + // the SPARK config |
| 25 | + val spark = SparkSession.builder |
| 26 | + .appName(s"SPARK $profile Reasoning") |
| 27 | + .master("local[*]") |
| 28 | + .config("spark.hadoop.validateOutputSpecs", "false") // override output files |
| 29 | + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") |
| 30 | + .config("spark.default.parallelism", parallelism) |
| 31 | + .config("spark.ui.showConsoleProgress", "false") |
| 32 | + .config("spark.sql.shuffle.partitions", parallelism) |
| 33 | + .getOrCreate() |
| 34 | + |
| 35 | + // load axioms from disk |
| 36 | + var owlAxioms = spark.owl(Syntax.FUNCTIONAL)(input) |
| 37 | + println(s"|G| = ${owlAxioms.count()}") |
| 38 | + |
| 39 | + // create reasoner and compute inferred graph |
| 40 | + val inferredGraph = profile match { |
| 41 | + case RDFS => new ForwardRuleReasonerRDFS(spark.sparkContext, parallelism)(owlAxioms) |
| 42 | + case OWL_HORST => new ForwardRuleReasonerOWLHorst(spark.sparkContext, parallelism)(owlAxioms) |
| 43 | + case _ => |
| 44 | + throw new RuntimeException("Invalid profile: '" + profile + "'") |
| 45 | + } |
| 46 | + |
| 47 | + println(s"|G_inf| = ${inferredGraph.count()}") |
| 48 | + |
| 49 | + spark.stop() |
| 50 | + } |
| 51 | + |
| 52 | + case class Config( |
| 53 | + in: String = "", |
| 54 | + profile: ReasoningProfile = ReasoningProfile.RDFS, |
| 55 | + parallelism: Int = 4) |
| 56 | + |
| 57 | + // read ReasoningProfile enum |
| 58 | + implicit val profilesRead: scopt.Read[ReasoningProfile.Value] = |
| 59 | + scopt.Read.reads(ReasoningProfile forName _.toLowerCase()) |
| 60 | + |
| 61 | + // the CLI parser |
| 62 | + val parser = new scopt.OptionParser[Config]("RDFGraphMaterializer") { |
| 63 | + |
| 64 | + head("RDFGraphMaterializer (axioms)", "0.5.0") |
| 65 | + |
| 66 | + opt[String]('i', "input").required().valueName("<path>"). |
| 67 | + action((x, c) => c.copy(in = x)). |
| 68 | + text("path to file or directory that contains the input files") |
| 69 | + |
| 70 | + opt[ReasoningProfile]('p', "profile").required().valueName("{rdfs | owl-horst}"). |
| 71 | + action((x, c) => c.copy(profile = x)). |
| 72 | + text("the reasoning profile") |
| 73 | + |
| 74 | + opt[Int]("parallelism").optional().action((x, c) => |
| 75 | + c.copy(parallelism = x)).text("the degree of parallelism, i.e. the number of Spark partitions used in the Spark operations") |
| 76 | + |
| 77 | + help("help").text("prints this usage text") |
| 78 | + } |
| 79 | +} |
0 commit comments