Skip to content

Commit 55cad4a

Browse files
authored
perf: improve performance of filter_period_intersect (#436)
* perf: improve performance of filter_period_intersect * forgot some files * fix lint errors
1 parent b6dac17 commit 55cad4a

File tree

3 files changed

+85
-24
lines changed

3 files changed

+85
-24
lines changed

aw-query/src/functions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,7 @@ mod qfunctions {
506506
let events: Vec<Event> = (&args[0]).try_into()?;
507507
let filter_events: Vec<Event> = (&args[1]).try_into()?;
508508

509-
let mut filtered_events = aw_transform::filter_period_intersect(&events, &filter_events);
509+
let mut filtered_events = aw_transform::filter_period_intersect(events, filter_events);
510510
let mut filtered_tagged_events = Vec::new();
511511
for event in filtered_events.drain(..) {
512512
filtered_tagged_events.push(DataType::Event(event));

aw-transform/benches/bench.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ fn bench_filter_period_intersect(c: &mut Criterion) {
4545
c.bench_function("1000 events", |b| {
4646
b.iter(|| {
4747
let events1 = create_events(1000);
48-
filter_period_intersect(&events1, &events2);
48+
filter_period_intersect(events1, events2.clone());
4949
})
5050
});
5151
}

aw-transform/src/filter_period.rs

Lines changed: 83 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use aw_models::Event;
2-
use chrono::{DateTime, Utc};
2+
use chrono::Duration;
3+
4+
use crate::sort_by_timestamp;
35

46
/// Removes events not intersecting with the provided filter_events
57
///
@@ -15,33 +17,55 @@ use chrono::{DateTime, Utc};
1517
/// filter_events: [ ] [ ]
1618
/// output: [a ] [a ][b ]
1719
/// ```
18-
pub fn filter_period_intersect(events: &[Event], filter_events: &[Event]) -> Vec<Event> {
20+
pub fn filter_period_intersect(events: Vec<Event>, filter_events: Vec<Event>) -> Vec<Event> {
21+
if events.len() == 0 || filter_events.len() == 0 {
22+
return Vec::new();
23+
}
24+
1925
let mut filtered_events = Vec::new();
26+
let events = sort_by_timestamp(events);
27+
let filter_events = sort_by_timestamp(filter_events);
2028

21-
// Start with pre-calculating endtimes of events
22-
let mut events_with_endtimes: Vec<(&Event, DateTime<Utc>)> = Vec::new();
23-
for event in events {
24-
events_with_endtimes.push((event, event.calculate_endtime()));
25-
}
29+
let mut events_iter = events.into_iter();
30+
let mut filter_events_iter = filter_events.into_iter();
31+
let mut cur_event = events_iter.next().unwrap();
32+
let mut cur_filter_event = filter_events_iter.next().unwrap();
2633

27-
// Do actual filtering
28-
for filter in filter_events {
29-
let filter_endtime = filter.calculate_endtime();
30-
for (event, event_endtime) in &events_with_endtimes {
31-
if event.timestamp > filter_endtime {
32-
continue;
34+
loop {
35+
let event_endtime = cur_event.calculate_endtime();
36+
let filter_endtime = cur_filter_event.calculate_endtime();
37+
if cur_event.duration == Duration::seconds(0) || event_endtime <= cur_filter_event.timestamp
38+
{
39+
match events_iter.next() {
40+
Some(e) => {
41+
cur_event = e;
42+
continue;
43+
}
44+
None => return filtered_events,
3345
}
34-
if *event_endtime < filter.timestamp {
35-
continue;
46+
}
47+
if cur_event.timestamp >= cur_filter_event.calculate_endtime() {
48+
match filter_events_iter.next() {
49+
Some(e) => {
50+
cur_filter_event = e;
51+
continue;
52+
}
53+
None => return filtered_events,
3654
}
37-
let mut e = (*event).clone();
38-
e.timestamp = std::cmp::max(e.timestamp, filter.timestamp);
39-
let endtime = std::cmp::min(*event_endtime, filter_endtime);
40-
e.duration = endtime - e.timestamp;
41-
filtered_events.push(e);
4255
}
56+
57+
let mut e = cur_event.clone();
58+
e.timestamp = std::cmp::max(e.timestamp, cur_filter_event.timestamp);
59+
let endtime = std::cmp::min(event_endtime, filter_endtime);
60+
e.duration = endtime - e.timestamp;
61+
62+
// trim current event
63+
let old_timestamp = cur_event.timestamp;
64+
cur_event.timestamp = e.timestamp + e.duration;
65+
cur_event.duration = old_timestamp + cur_event.duration - cur_event.timestamp;
66+
67+
filtered_events.push(e);
4368
}
44-
filtered_events
4569
}
4670

4771
#[cfg(test)]
@@ -81,7 +105,8 @@ mod tests {
81105
data: json_map! {"test": json!(1)},
82106
};
83107

84-
let filtered_events = filter_period_intersect(&vec![e1, e2, e3, e4, e5], &[filter_event]);
108+
let filtered_events =
109+
filter_period_intersect(vec![e1, e2, e3, e4, e5], vec![filter_event.clone()]);
85110
assert_eq!(filtered_events.len(), 3);
86111
assert_eq!(filtered_events[0].duration, Duration::milliseconds(500));
87112
assert_eq!(filtered_events[1].duration, Duration::milliseconds(1000));
@@ -93,5 +118,41 @@ mod tests {
93118
assert_eq!(filtered_events[1].timestamp, dt);
94119
let dt: DateTime<Utc> = DateTime::from_str("2000-01-01T00:00:04.000Z").unwrap();
95120
assert_eq!(filtered_events[2].timestamp, dt);
121+
122+
let timestamp_01s = DateTime::from_str("2000-01-01T00:00:01Z").unwrap();
123+
let e = Event {
124+
id: None,
125+
timestamp: timestamp_01s,
126+
duration: Duration::seconds(1),
127+
data: json_map! {"test": json!(1)},
128+
};
129+
let mut f2 = filter_event.clone();
130+
f2.timestamp = DateTime::from_str("2000-01-01T00:00:00Z").unwrap();
131+
f2.duration = Duration::milliseconds(1500);
132+
let res = filter_period_intersect(vec![e.clone()], vec![f2]);
133+
assert_eq!(res[0].timestamp, timestamp_01s);
134+
assert_eq!(res[0].duration, Duration::milliseconds(500));
135+
136+
let timestamp_01_5s = DateTime::from_str("2000-01-01T00:00:01.5Z").unwrap();
137+
let mut f3 = filter_event.clone();
138+
f3.timestamp = timestamp_01_5s;
139+
f3.duration = Duration::milliseconds(1000);
140+
let res = filter_period_intersect(vec![e.clone()], vec![f3]);
141+
assert_eq!(res[0].timestamp, timestamp_01_5s);
142+
assert_eq!(res[0].duration, Duration::milliseconds(500));
143+
144+
let mut f4 = filter_event.clone();
145+
f4.timestamp = DateTime::from_str("2000-01-01T00:00:01.5Z").unwrap();
146+
f4.duration = Duration::milliseconds(100);
147+
let res = filter_period_intersect(vec![e.clone()], vec![f4]);
148+
assert_eq!(res[0].timestamp, timestamp_01_5s);
149+
assert_eq!(res[0].duration, Duration::milliseconds(100));
150+
151+
let mut f5 = filter_event.clone();
152+
f5.timestamp = DateTime::from_str("2000-01-01T00:00:00Z").unwrap();
153+
f5.duration = Duration::seconds(10);
154+
let res = filter_period_intersect(vec![e.clone()], vec![f5]);
155+
assert_eq!(res[0].timestamp, timestamp_01s);
156+
assert_eq!(res[0].duration, Duration::milliseconds(1000));
96157
}
97158
}

0 commit comments

Comments
 (0)