Skip to content

Commit 0d61b94

Browse files
authored
Merge pull request #12 from mavroudo/last_checked
Last checked
2 parents d861ad0 + 76f6c91 commit 0d61b94

File tree

14 files changed

+536
-143
lines changed

14 files changed

+536
-143
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ Otherwise, they are located in the **docs/** folder and you can access it by ope
157157
in a browser.
158158

159159
# Change Log
160+
### [2.1.1] - 2023-07-29
161+
- Hotfix in indexing process for Cassandra and S3
162+
- Introduce partitions for LastChecked to handle incremental indexing
163+
- Simpler way to extract pairs and not n-tuples
160164

161165
### [2.1.0] - 2023-06-18
162166
- Added FastAPI to submit preprocessing jobs using api calls

src/main/scala/auth/datalab/siesta/BusinessLogic/DBConnector/DBConnector.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,24 @@ trait DBConnector {
137137
combined
138138
}
139139

140+
/**
141+
* Returns data from LastChecked Table only the required partitions
142+
* Loads data from the LastChecked Table, which contains the information of the last timestamp per event type pair
143+
* per trace.
144+
* @param metaData Object containing the metadata
145+
* @return An RDD with the last timestamps per event type pair per trace
146+
*/
147+
def read_last_checked_partitioned_table(metaData: MetaData,partitions:List[Long]):RDD[LastChecked]
148+
140149
/**
141150
* Returns data from LastChecked Table
142151
* Loads data from the LastChecked Table, which contains the information of the last timestamp per event type pair
143152
* per trace.
153+
*
144154
* @param metaData Object containing the metadata
145155
* @return An RDD with the last timestamps per event type pair per trace
146156
*/
147-
def read_last_checked_table(metaData: MetaData):RDD[LastChecked]
157+
def read_last_checked_table(metaData: MetaData): RDD[LastChecked]
148158

149159
/**
150160
* Stores new records for last checked back in the database

src/main/scala/auth/datalab/siesta/BusinessLogic/ExtractPairs/ExtractPairs.scala

Lines changed: 71 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package auth.datalab.siesta.BusinessLogic.ExtractPairs
22

33
import auth.datalab.siesta.BusinessLogic.Model.Structs
44
import auth.datalab.siesta.BusinessLogic.Model.Structs.LastChecked
5+
import org.apache.log4j.{Level, Logger}
56
import org.apache.spark.broadcast.Broadcast
67
import org.apache.spark.rdd.RDD
78
import org.apache.spark.sql.SparkSession
@@ -27,50 +28,59 @@ object ExtractPairs {
2728
* For example: the trace t,,1,, = (a,b,c,a,b) contains 2 occurrences of the event type (a,b). However, the trace
2829
* t,,2,, = (a,a,c,b,b) only contains one occurrence of the previous event type pair.
2930
*
30-
* @param singleRDD The RDD that contains the complete single inverted index
31+
* @param singleRDD The RDD that contains the complete single inverted index
3132
* @param last_checked The loaded values from the LastChecked table (it should be null if it is the first time that
3233
* events are indexed in this log database).
33-
* @param intervals The list of the intervals
34-
* @param lookback The parameter that describes the maximum time difference that two events can have in order to
35-
* create an event type pair.
34+
* @param intervals The list of the intervals
35+
* @param lookback The parameter that describes the maximum time difference that two events can have in order to
36+
* create an event type pair.
3637
* @return Two RDDs: (1) The first one that contains all the extracted event type pairs and (2) the second one contains
3738
* the last timestamp of each event type pair for each trace.
3839
*/
3940
def extract(singleRDD: RDD[Structs.InvertedSingleFull], last_checked: RDD[Structs.LastChecked],
40-
intervals: List[Structs.Interval], lookback: Int):(RDD[Structs.PairFull],RDD[Structs.LastChecked]) = {
41+
intervals: List[Structs.Interval], lookback: Int): (RDD[Structs.PairFull], RDD[Structs.LastChecked]) = {
4142

4243
val spark = SparkSession.builder().getOrCreate()
4344
//broadcasts the intervals so they can be available to all workers during event pair extraction
4445
val bintervals = spark.sparkContext.broadcast(intervals)
45-
if (last_checked == null) {
46-
val full = singleRDD.groupBy(_.id)
46+
47+
Logger.getLogger("Pair Extraction")
48+
.log(Level.INFO, s"Number of input traces ${singleRDD.map(_.id).count()} ")
49+
Logger.getLogger("Pair Extraction")
50+
.log(Level.INFO, s"Number of unique input traces ${singleRDD.map(_.id).distinct().count()}")
51+
52+
val full = if (last_checked == null) {
53+
singleRDD.groupBy(_.id)
4754
.map(x => {
4855
this.calculate_pairs_stnm(x._2, null, lookback, bintervals)
4956
})
50-
(full.flatMap(_._1),full.flatMap(_._2))
5157
} else {
52-
val full =singleRDD.groupBy(_.id).leftOuterJoin(last_checked.groupBy(_.id))
58+
singleRDD.groupBy(_.id).leftOuterJoin(last_checked.groupBy(_.id))
5359
.map(x => {
5460
val last = x._2._2.orNull
5561
this.calculate_pairs_stnm(x._2._1, last, lookback, bintervals)
5662
})
57-
(full.flatMap(_._1),full.flatMap(_._2))
5863
}
5964

60-
65+
val pairs = full.flatMap(_._1)
66+
val last_checked_pairs = full.flatMap(_._2)
67+
Logger.getLogger("Pair Extraction").log(Level.INFO, s"Extracted ${pairs.count()} event pairs")
68+
Logger.getLogger("Pair Extraction").log(Level.INFO, s"Extracted ${last_checked_pairs.count()} last checked")
69+
(pairs, last_checked_pairs)
6170
}
6271

6372
/**
6473
* Extract the event type pairs from the single inverted index
65-
* @param single The complete single inverted index
66-
* @param last The list with all the last timestamps that correspond to this event
67-
* @param lookback The parameter that describes the maximum time difference between two events in a pair
74+
*
75+
* @param single The complete single inverted index
76+
* @param last The list with all the last timestamps that correspond to this event
77+
* @param lookback The parameter that describes the maximum time difference between two events in a pair
6878
* @param intervals The list with the intervals
6979
* @return A tuple, where the first element is the extracted event type pairs and the second element is the last
7080
* timestamps for each event type.
7181
*/
7282
private def calculate_pairs_stnm(single: Iterable[Structs.InvertedSingleFull], last: Iterable[Structs.LastChecked],
73-
lookback: Int, intervals: Broadcast[List[Structs.Interval]]):(List[Structs.PairFull],List[Structs.LastChecked]) = {
83+
lookback: Int, intervals: Broadcast[List[Structs.Interval]]): (List[Structs.PairFull], List[Structs.LastChecked]) = {
7484
val singleMap: Map[String, Iterable[Structs.InvertedSingleFull]] = single.groupBy(_.event_name)
7585
val newLastChecked = new ListBuffer[Structs.PairFull]()
7686
val lastMap = if (last != null) last.groupBy(x => (x.eventA, x.eventB)) else null
@@ -87,15 +97,15 @@ object ExtractPairs {
8797
case _: Exception => null
8898
}
8999
//detects all the occurrences of this event type pair using the ts1 and ts2
90-
val nres = this.createTuples(List(key._1, key._2), List(ts1, ts2), intervals, lookback, last_checked,single.head.id)
100+
val nres = this.createTuples(List(key._1, key._2), List(ts1, ts2), intervals, lookback, last_checked, single.head.id)
91101
//if there are any append them and also keep the last timestamp that they occurred
92102
if (nres.nonEmpty) {
93103
newLastChecked += nres.last
94104
results ++= nres
95105
}
96106
})
97-
val l = newLastChecked.map(x=>{
98-
Structs.LastChecked(x.eventA,x.eventB,x.id,timestamp = x.timeB.toString)
107+
val l = newLastChecked.map(x => {
108+
Structs.LastChecked(x.eventA, x.eventB, x.id, timestamp = x.timeB.toString)
99109
})
100110
//returns the two rdds
101111
(results.toList, l.toList)
@@ -105,56 +115,58 @@ object ExtractPairs {
105115
/**
106116
* Extract all the occurrences of a particular event type pair by combining the two lists of the occurrences for each
107117
* event type.
108-
* @param events A list with two elements, which are the event types
109-
* @param timestamps A list with two elements, which are the two lists of occurrences for each event type
110-
* @param bintervals The broadcasted intervals
111-
* @param lookback The parameter that defines the maximum time difference that two events can have in order to form
112-
* an event type pair
118+
*
119+
* @param events A list with two elements, which are the event types
120+
* @param timestamps A list with two elements, which are the two lists of occurrences for each event type
121+
* @param bintervals The broadcasted intervals
122+
* @param lookback The parameter that defines the maximum time difference that two events can have in order to form
123+
* an event type pair
113124
* @param last_checked The last timestamp that this event type pair occurred in this trace (null if it has not occurred
114125
* yet)
115-
* @param id The trace id, used to create the required objects
126+
* @param id The trace id, used to create the required objects
116127
* @return The list with all the extracted event type pairs
117128
*/
118129
private def createTuples(events: List[String], timestamps: List[List[(String, Int)]], bintervals: Broadcast[List[Structs.Interval]],
119-
lookback: Int, last_checked: Structs.LastChecked, id:Long): List[Structs.PairFull] = {
130+
lookback: Int, last_checked: Structs.LastChecked, id: Long): List[Structs.PairFull] = {
120131
var e = Structs.EventWithPosition("", null, -1)
121132
val oc: ListBuffer[Structs.EventWithPosition] = new ListBuffer[Structs.EventWithPosition]
122133
var list_id = 0 //begin at first
123134
//remove all events that have timestamp less than the one in last checked (consider null for first time)
124135
val nextEvents = this.startAfterLastChecked(timestamps.head, timestamps(1), last_checked)
125136
while (e != null) {
126137
val l = timestamps(list_id)
127-
val x = this.getNextEvent(events(list_id), l, nextEvents(list_id), if(oc.nonEmpty) oc.last.timestamp else null,oc.size)
138+
val x = this.getNextEvent(events(list_id), l, nextEvents(list_id), if (oc.nonEmpty) oc.last.timestamp else null, oc.size)
128139
nextEvents(list_id) = x._1
129140
e = x._2
130141
if (e != null) {
131142
oc += e
132143
}
133144
list_id = (list_id + 1) % 2
134145
}
135-
val testSliding = oc.toList.sliding(2,2).toList
136-
val pairsInit:List[Structs.PairFull] = testSliding.filter(_.size==2).map(x=>{
146+
val testSliding = oc.toList.sliding(2, 2).toList
147+
val pairsInit: List[Structs.PairFull] = testSliding.filter(_.size == 2).map(x => {
137148
val e1 = x.head
138-
val e2=x(1)
139-
val interval:Structs.Interval = this.chooseInterval(bintervals,e2.timestamp)
140-
Structs.PairFull(events.head,events(1),id,e1.timestamp,e2.timestamp,e1.position,e2.position,interval)
141-
})
142-
.filter(p=>{
143-
ChronoUnit.DAYS.between(p.timeA.toInstant,p.timeB.toInstant)<=lookback
149+
val e2 = x(1)
150+
val interval: Structs.Interval = this.chooseInterval(bintervals, e2.timestamp)
151+
Structs.PairFull(events.head, events(1), id, e1.timestamp, e2.timestamp, e1.position, e2.position, interval)
144152
})
153+
.filter(p => {
154+
ChronoUnit.DAYS.between(p.timeA.toInstant, p.timeB.toInstant) <= lookback
155+
})
145156
pairsInit
146157
}
147158

148159

149160
/**
150161
* Chooses the appropriate time interval based on the timestamp of the second event in the event type pair
162+
*
151163
* @param bintervals The broadcasted time intervals
152-
* @param ts The timestamp of the second event in the event type pair
164+
* @param ts The timestamp of the second event in the event type pair
153165
* @return The appropriate interval
154166
*/
155-
private def chooseInterval(bintervals: Broadcast[List[Structs.Interval]],ts:Timestamp):Structs.Interval={
156-
for(i<-bintervals.value.indices){
157-
if(ts.before(bintervals.value(i).end) ){
167+
private def chooseInterval(bintervals: Broadcast[List[Structs.Interval]], ts: Timestamp): Structs.Interval = {
168+
for (i <- bintervals.value.indices) {
169+
if (ts.before(bintervals.value(i).end)) {
158170
return bintervals.value(i)
159171
}
160172
}
@@ -163,30 +175,31 @@ object ExtractPairs {
163175

164176
/**
165177
* Used in the createTuples function to get the next viable event from a timestamp list.
166-
* @param event The event type
167-
* @param ts The list of the timestamps that this event type occurred in the trace
168-
* @param start The starting location (in the ts)
178+
*
179+
* @param event The event type
180+
* @param ts The list of the timestamps that this event type occurred in the trace
181+
* @param start The starting location (in the ts)
169182
* @param timestamp The timestamp of the last event that was added in the occurrences
170-
* @param ocs_size The occurrences length. If it is even then the previous event has been completed, whereas if
171-
* it is odd it searches for the second event type to complete the pair.
183+
* @param ocs_size The occurrences length. If it is even then the previous event has been completed, whereas if
184+
* it is odd it searches for the second event type to complete the pair.
172185
* @return A tuple, where the first value is the next position (start+1) and the event (if one is found)
173186
*/
174187
private def getNextEvent(event: String, ts: List[(String, Int)], start: Int, timestamp: Timestamp, ocs_size: Int): (Int, Structs.EventWithPosition) = {
175188
var i = start
176-
if(i>=ts.size){ //there is no such event => terminate
177-
return (i,null)
189+
if (i >= ts.size) { //there is no such event => terminate
190+
return (i, null)
178191
}
179-
if(timestamp==null){ //there is no previous event => get the next available
180-
return (start+1,Structs.EventWithPosition(event, Timestamp.valueOf(ts(i)._1), ts(i)._2))
192+
if (timestamp == null) { //there is no previous event => get the next available
193+
return (start + 1, Structs.EventWithPosition(event, Timestamp.valueOf(ts(i)._1), ts(i)._2))
181194
}
182195
// ocs_size%2==0 means that the last event type pair has been completed and we are searching for the next first event
183196
// We differentiate the look for the first or the second event because in the latter version we allow for the same
184197
// event to appear in two consecutive event type pairs if the pair consists of the same event type pair.
185198
// For example the trace t=(a,a,a) has two occurrences of (a,a).
186-
while (i < ts.size ) {
187-
if (ocs_size%2==1 &&Timestamp.valueOf(ts(i)._1).after(timestamp)) {
199+
while (i < ts.size) {
200+
if (ocs_size % 2 == 1 && Timestamp.valueOf(ts(i)._1).after(timestamp)) {
188201
return (i + 1, Structs.EventWithPosition(event, Timestamp.valueOf(ts(i)._1), ts(i)._2))
189-
} else if(ocs_size%2==0 && !Timestamp.valueOf(ts(i)._1).before(timestamp)){
202+
} else if (ocs_size % 2 == 0 && !Timestamp.valueOf(ts(i)._1).before(timestamp)) {
190203
return (i + 1, Structs.EventWithPosition(event, Timestamp.valueOf(ts(i)._1), ts(i)._2))
191204
}
192205
else {
@@ -202,17 +215,17 @@ object ExtractPairs {
202215
* that correspond to events which timestamps are after the last checked timestamp. If the last checked is null,
203216
* meaning that this is the first time that this event type appears in this trace, it will return (0,0).
204217
*
205-
* @param ts1 The timestamp and positions of the first event in the trace
206-
* @param ts2 The timestamp and positions of the second event in the trace
218+
* @param ts1 The timestamp and positions of the first event in the trace
219+
* @param ts2 The timestamp and positions of the second event in the trace
207220
* @param lastChecked The last timestmap that this event occurred in this trace
208221
* @return A list with two integers that represent the first positions in the ts1 and ts2, respectively, that correspond
209222
* to events that occur after the lastChecked timestamp.
210223
*/
211224
private def startAfterLastChecked(ts1: List[(String, Int)], ts2: List[(String, Int)], lastChecked: LastChecked): ListBuffer[Int] = {
212225
val k = new ListBuffer[Int]
213226
if (lastChecked == null) {
214-
k+=0
215-
k+=0
227+
k += 0
228+
k += 0
216229
k
217230
}
218231
else {
@@ -221,17 +234,18 @@ object ExtractPairs {
221234
p1 += 1
222235
}
223236
var p2 = 0
224-
while (p2<ts2.size && Timestamp.valueOf(ts2(p2)._1).before(Timestamp.valueOf(lastChecked.timestamp))) {
237+
while (p2 < ts2.size && Timestamp.valueOf(ts2(p2)._1).before(Timestamp.valueOf(lastChecked.timestamp))) {
225238
p2 += 1
226239
}
227-
k+=p1
228-
k+=p2
240+
k += p1
241+
k += p2
229242
k
230243
}
231244
}
232245

233246
/**
234247
* Extracts all the possible event type pairs that can occur in a trace based on the unique event types
248+
*
235249
* @param event_types The unique event types in this trace
236250
* @return The possible event type pairs that can occur in this trace
237251
*/

0 commit comments

Comments
 (0)