1+ package net .sansa_stack .examples .flink .rdf
2+
3+ import scala .collection .mutable
4+ import java .io .File
5+ import org .apache .flink .api .scala .ExecutionEnvironment
6+ import net .sansa_stack .rdf .flink .data .RDFGraphLoader
7+ import net .sansa_stack .rdf .flink .stats .RDFStatistics
8+
9+ object RDFStats {
10+ def main (args : Array [String ]) = {
11+ if (args.length < 2 ) {
12+ System .err.println(
13+ " Usage: RDF Statistics <input> <output>" )
14+ System .exit(1 )
15+ }
16+ val input = args(0 ) // "src/main/resources/rdf.nt"
17+ val rdf_stats_file = new File (input).getName
18+ val output = args(1 )
19+ val optionsList = args.drop(1 ).map { arg =>
20+ arg.dropWhile(_ == '-' ).split('=' ) match {
21+ case Array (opt, v) => (opt -> v)
22+ case _ => throw new IllegalArgumentException (" Invalid argument: " + arg)
23+ }
24+ }
25+ val options = mutable.Map (optionsList : _* )
26+
27+ options.foreach {
28+ case (opt, _) => throw new IllegalArgumentException (" Invalid option: " + opt)
29+ }
30+ println(" ======================================" )
31+ println(" | RDF Statistic example |" )
32+ println(" ======================================" )
33+
34+ val env = ExecutionEnvironment .getExecutionEnvironment
35+
36+ val rdfgraph = RDFGraphLoader .loadFromFile(input, env)
37+
38+ // compute criterias
39+ val rdf_statistics = RDFStatistics (rdfgraph, env)
40+ val stats = rdf_statistics.run()
41+ rdf_statistics.voidify(stats, rdf_stats_file, output)
42+
43+ }
44+
45+ }
0 commit comments