1+ package net .sansa_stack .examples .spark .rdf
2+
3+ import org .apache .spark .sql .SparkSession
4+ import java .net .URI
5+ import net .sansa_stack .rdf .spark .io .NTripleReader
6+ import scala .collection .mutable
7+ import java .io .File
8+ import net .sansa_stack .rdf .spark .stats .RDFStatistics
9+
10+ object RDFStats {
11+ def main (args : Array [String ]) = {
12+ if (args.length < 2 ) {
13+ System .err.println(
14+ " Usage: RDF Statistics <input> <output>" )
15+ System .exit(1 )
16+ }
17+ val input = args(0 )// "src/main/resources/rdf.nt"
18+ val rdf_stats_file = new File (input).getName
19+ val output = args(1 )
20+ val optionsList = args.drop(1 ).map { arg =>
21+ arg.dropWhile(_ == '-' ).split('=' ) match {
22+ case Array (opt, v) => (opt -> v)
23+ case _ => throw new IllegalArgumentException (" Invalid argument: " + arg)
24+ }
25+ }
26+ val options = mutable.Map (optionsList : _* )
27+
28+ options.foreach {
29+ case (opt, _) => throw new IllegalArgumentException (" Invalid option: " + opt)
30+ }
31+ println(" ======================================" )
32+ println(" | RDF Statistic example |" )
33+ println(" ======================================" )
34+
35+ val sparkSession = SparkSession .builder
36+ .master(" local[*]" )
37+ .config(" spark.serializer" , " org.apache.spark.serializer.KryoSerializer" )
38+ .appName(" RDF Dataset Statistics example (" + rdf_stats_file + " )" )
39+ .getOrCreate()
40+
41+ val triples = NTripleReader .load(sparkSession, URI .create(input))
42+
43+ // compute criterias
44+ val rdf_statistics = RDFStatistics (triples, sparkSession)
45+ val stats = rdf_statistics.run()
46+ rdf_statistics.voidify(stats, rdf_stats_file, output)
47+
48+ sparkSession.stop
49+
50+ }
51+ }
0 commit comments