11package net .sansa_stack .examples .spark .ml .outliers .anomalydetection
22
33import scala .collection .mutable
4- import org .apache .spark .sql .SparkSession
5- import java .net .{ URI => JavaURI }
6- import net .sansa_stack .ml .spark .outliers .anomalydetection .{ AnomalyDetection => AlgAnomalyDetection }
74import org .apache .jena .riot .Lang
85import net .sansa_stack .rdf .spark .io ._
9- import org .apache .spark .sql .Dataset
10- import org .apache .spark .sql .Row
11- import org .apache .spark .sql .SaveMode
6+ import org .apache .spark .sql .{ SparkSession , Dataset , Row , SaveMode }
7+ import org .apache .spark .storage .StorageLevel
8+ import net .sansa_stack .ml .spark .outliers .anomalydetection ._
9+ import org .apache .spark .rdd .RDD
1210
1311object AnomalyDetection {
1412 def main (args : Array [String ]) {
@@ -20,9 +18,12 @@ object AnomalyDetection {
2018 }
2119 }
2220
23- def run (input : String , threshold : Double ,
24- anomalyListLimit : Int ,
25- numofpartition : Int , output : String ): Unit = {
21+ def run (
22+ input : String ,
23+ JSimThreshold : Double ,
24+ anomalyListLimit : Int ,
25+ numofpartition : Int ,
26+ output : String ): Unit = {
2627
2728 println(" ==================================================" )
2829 println(" | Distributed Anomaly Detection |" )
@@ -37,15 +38,18 @@ object AnomalyDetection {
3738 // N-Triples Reader
3839 val lang = Lang .NTRIPLES
3940 val triplesRDD = spark.rdf(lang)(input).repartition(numofpartition).persist()
40- // constant parameters defined
41- val JSimThreshold = 0.6
4241
42+ // predicated that are not interesting for evaluation
43+ val wikiList = List (" wikiPageRevisionID,wikiPageID" )
44+
45+ // filtering numeric literal having xsd type double,integer,nonNegativeInteger and squareKilometre
4346 val objList = List (
44- " http://www.w3.org/1999/02/22-rdf-syntax-ns#langString" ,
45- " http://www.w3.org/2001/XMLSchema#date" )
47+ " http://www.w3.org/2001/XMLSchema#double" ,
48+ " http://www.w3.org/2001/XMLSchema#integer" ,
49+ " http://www.w3.org/2001/XMLSchema#nonNegativeInteger" ,
50+ " http://dbpedia.org/datatype/squareKilometre" )
4651
47- // clustering of subjects are on the basis of rdf:type specially object with wikidata and dbpedia.org
48- // val triplesType = List("http://www.wikidata.org", "http://dbpedia.org/ontology")
52+ // helful for considering only Dbpedia type as their will be yago type,wikidata type also
4953 val triplesType = List (" http://dbpedia.org/ontology" )
5054
5155 // some of the supertype which are present for most of the subject
@@ -57,57 +61,62 @@ object AnomalyDetection {
5761 " http://dbpedia.org/ontology/PopulatedPlace" , " http://dbpedia.org/ontology/Region" ,
5862 " http://dbpedia.org/ontology/Species" , " http://dbpedia.org/ontology/Eukaryote" ,
5963 " http://dbpedia.org/ontology/Location" )
64+
6065 // hypernym URI
6166 val hypernym = " http://purl.org/linguistics/gold/hypernym"
6267
63- val outDetection = new AlgAnomalyDetection (triplesRDD, objList, triplesType, JSimThreshold , listSuperType, spark, hypernym, numofpartition)
68+ var clusterOfSubject : RDD [(Set [(String , String , Object )])] = null
69+ println(" AnomalyDetection-using ApproxSimilarityJoin function with the help of HashingTF " )
6470
65- val clusterOfSubject = outDetection.run()
71+ val outDetection = new AnomalyWithHashingTF (triplesRDD, objList, triplesType, JSimThreshold , listSuperType, spark, hypernym, numofpartition)
72+ clusterOfSubject = outDetection.run()
6673
67- clusterOfSubject.take(10 ).foreach(println)
74+ val setData = clusterOfSubject.repartition(1000 ).persist(StorageLevel .MEMORY_AND_DISK )
75+ val setDataStore = setData.map(f => f.toSeq)
6876
69- val setData = clusterOfSubject.repartition(numofpartition).persist.map( f => f._2.toSeq )
77+ val setDataSize = setDataStore.filter( f => f.size > anomalyListLimit )
7078
71- // calculating IQR and saving output to the file
72- val listofDataArray = setData.collect()
73- var a : Dataset [Row ] = null
79+ val test = setDataSize.map(f => outDetection.iqr2(f, anomalyListLimit))
7480
75- for (listofDatavalue <- listofDataArray) {
76- a = outDetection.iqr1(listofDatavalue, anomalyListLimit)
77- if (a != null )
78- a.select(" dt" ).coalesce(1 ).write.format(" text" ).mode(SaveMode .Append ) save (output)
79- }
81+ val testfilter = test.filter(f => f.size > 0 ) // .distinct()
82+ val testfilterDistinct = testfilter.flatMap(f => f)
83+ testfilterDistinct.saveAsTextFile(output)
8084 setData.unpersist()
85+
8186 spark.stop()
8287 }
8388
8489 case class Config (
8590 in : String = " " ,
8691 threshold : Double = 0.0 ,
8792 anomalyListLimit : Int = 0 ,
88- numofpartition : Int = 4 ,
93+ numofpartition : Int = 0 ,
8994 out : String = " " )
9095
91- val parser = new scopt.OptionParser [Config ](" Anomaly Detection example " ) {
96+ val parser = new scopt.OptionParser [Config ](" SANSA -Outlier Detection " ) {
9297
93- head(" Anomaly Detection example " )
98+ head(" Detecting Numerical Outliers in dataset " )
9499
95100 opt[String ]('i' , " input" ).required().valueName(" <path>" ).
96101 action((x, c) => c.copy(in = x)).
97- text(" path to file that contains the data" )
102+ text(" path to file that contains RDF data (in N-Triples format) " )
98103
104+ // Jaccard similarity threshold value
99105 opt[Double ]('t' , " threshold" ).required().
100106 action((x, c) => c.copy(threshold = x)).
101107 text(" the Jaccard Similarity value" )
102108
109+ // number of partition
103110 opt[Int ]('a' , " numofpartition" ).required().
104111 action((x, c) => c.copy(numofpartition = x)).
105112 text(" Number of partition" )
106113
114+ // List limit for calculating IQR
107115 opt[Int ]('c' , " anomalyListLimit" ).required().
108116 action((x, c) => c.copy(anomalyListLimit = x)).
109117 text(" the outlier List Limit" )
110118
119+ // output file path
111120 opt[String ]('o' , " output" ).required().valueName(" <directory>" ).
112121 action((x, c) => c.copy(out = x)).
113122 text(" the output directory" )
0 commit comments