Skip to content

Commit 95d95d0

Browse files
chore: improve report performance
added `time_from_last_event` and `time_to_next_event` fields to events to that are precomputed when the event is created. Signed-off-by: Henry Gressmann <[email protected]>
1 parent 8380bf6 commit 95d95d0

File tree

12 files changed

+215
-124
lines changed

12 files changed

+215
-124
lines changed

src/app/core/events.rs

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ impl LiwanEvents {
2727
}
2828

2929
/// Get the daily salt, generating a new one if the current one is older than 24 hours
30-
pub async fn get_salt(&self) -> Result<String> {
30+
pub fn get_salt(&self) -> Result<String> {
3131
let (salt, updated_at) = {
3232
let salt = self.daily_salt.read().map_err(|_| eyre::eyre!("Failed to acquire read lock"))?;
3333
salt.clone()
@@ -55,10 +55,15 @@ impl LiwanEvents {
5555
pub fn append(&self, events: impl Iterator<Item = Event>) -> Result<()> {
5656
let conn = self.duckdb.get()?;
5757
let mut appender = conn.appender("events")?;
58+
let mut first_event_time = OffsetDateTime::now_utc();
5859
for event in events {
5960
appender.append_row(event_params![event])?;
61+
if first_event_time > event.created_at {
62+
first_event_time = event.created_at;
63+
}
6064
}
6165
appender.flush()?;
66+
update_event_times(&conn, first_event_time)?;
6267
Ok(())
6368
}
6469

@@ -69,15 +74,26 @@ impl LiwanEvents {
6974
Ok(event) => {
7075
let conn = self.duckdb.get()?;
7176
let mut appender = conn.appender("events")?;
77+
let mut first_event_time = event.created_at;
7278
appender.append_row(event_params![event])?;
7379

7480
// Non-blockingly drain the remaining events in the queue if there are any
7581
let mut count = 1;
7682
for event in events.try_iter() {
7783
appender.append_row(event_params![event])?;
7884
count += 1;
85+
86+
if first_event_time > event.created_at {
87+
first_event_time = event.created_at;
88+
}
89+
90+
// always flush after 5000 events
91+
if count >= 5000 {
92+
break;
93+
}
7994
}
8095
appender.flush()?;
96+
update_event_times(&conn, first_event_time)?;
8197
tracing::debug!("Processed {} events", count);
8298

8399
// Sleep to allow more events to be received before the next batch
@@ -88,3 +104,38 @@ impl LiwanEvents {
88104
}
89105
}
90106
}
107+
108+
use duckdb::{params, Connection, Result as DuckResult};
109+
110+
pub fn update_event_times(conn: &Connection, from_time: OffsetDateTime) -> DuckResult<()> {
111+
// this can probably be simplified, sadly the where clause can't contain window functions
112+
let sql = "--sql
113+
with
114+
filtered_events as (
115+
select *
116+
from events
117+
where created_at >= ?::timestamp or visitor_id in (
118+
select visitor_id
119+
from events
120+
where created_at >= now()::timestamp - interval '24 hours' and created_at < ?::timestamp and time_to_next_event is null
121+
)
122+
),
123+
cte as (
124+
select
125+
visitor_id,
126+
created_at,
127+
created_at - lag(created_at) over (partition by visitor_id order by created_at) as time_from_last_event,
128+
lead(created_at) over (partition by visitor_id order by created_at) - created_at as time_to_next_event
129+
from filtered_events
130+
)
131+
update events
132+
set
133+
time_from_last_event = cte.time_from_last_event,
134+
time_to_next_event = cte.time_to_next_event
135+
from cte
136+
where events.visitor_id = cte.visitor_id and events.created_at = cte.created_at;
137+
";
138+
139+
conn.execute(sql, params![&from_time, &from_time])?;
140+
Ok(())
141+
}

src/app/core/reports.rs

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -192,18 +192,18 @@ fn metric_sql(metric: Metric) -> String {
192192
}
193193
Metric::BounceRate => {
194194
// total sessions: no time_to_next_event / time_to_next_event is null
195-
// bounce sessions: time to next / time to prev are both null or both > 1800
195+
// bounce sessions: time to next / time to prev are both null or both > interval '30 minutes'
196196
"--sql
197197
count(distinct sd.visitor_id)
198-
filter (where (sd.time_until_next_event is null or sd.time_until_next_event > 1800) and
199-
(sd.time_since_previous_event is null or sd.time_since_previous_event > 1800)) /
200-
count(distinct sd.visitor_id) filter (where sd.time_until_next_event is null or sd.time_until_next_event > 1800)
198+
filter (where (sd.time_to_next_event is null or sd.time_to_next_event > interval '30 minutes') and
199+
(sd.time_from_last_event is null or sd.time_from_last_event > interval '30 minutes')) /
200+
count(distinct sd.visitor_id) filter (where sd.time_to_next_event is null or sd.time_to_next_event > interval '30 minutes')
201201
"
202202
}
203203
Metric::AvgTimeOnSite => {
204-
// avg time_until_next_event where time_until_next_event <= 1800 and time_until_next_event is not null
204+
// avg time_to_next_event where time_to_next_event <= 1800 and time_to_next_event is not null
205205
"--sql
206-
coalesce(avg(sd.time_until_next_event) filter (where sd.time_until_next_event is not null and sd.time_until_next_event <= 1800), 0)"
206+
coalesce(avg(extract(epoch from sd.time_to_next_event)) filter (where sd.time_to_next_event is not null and sd.time_to_next_event <= interval '30 minutes'), 0)"
207207
}
208208
}
209209
.to_owned()
@@ -278,7 +278,8 @@ pub fn overall_report(
278278
params.extend_from_params(filters_params);
279279
params.push(range.end);
280280

281-
let query = format!("--sql
281+
let query = format!(
282+
"--sql
282283
with
283284
params as (
284285
select
@@ -296,10 +297,8 @@ pub fn overall_report(
296297
select
297298
visitor_id,
298299
created_at,
299-
-- the time to the next event for the same visitor
300-
extract(epoch from (lead(created_at) over (partition by visitor_id order by created_at) - created_at)) as time_until_next_event,
301-
-- the time to the previous event for the same visitor
302-
extract(epoch from (created_at - lag(created_at) over (partition by visitor_id order by created_at))) as time_since_previous_event
300+
time_from_last_event,
301+
time_to_next_event,
303302
from events, params
304303
where
305304
event = ?::text and
@@ -326,7 +325,8 @@ pub fn overall_report(
326325
left join event_bins eb on tb.bin_start = eb.bin_start
327326
order by
328327
tb.bin_start;
329-
");
328+
"
329+
);
330330

331331
let mut stmt = conn.prepare_cached(&query)?;
332332

@@ -371,21 +371,20 @@ pub fn overall_stats(
371371
params.extend(entities);
372372
params.extend_from_params(filters_params);
373373

374-
let query = format!("--sql
374+
let query = format!(
375+
"--sql
375376
with
376377
params as (
377378
select
378379
?::timestamp as start_time,
379-
?::timestamp as end_time
380+
?::timestamp as end_time,
380381
),
381382
session_data as (
382383
select
383384
visitor_id,
384385
created_at,
385-
-- the time to the next event for the same visitor
386-
extract(epoch from (lead(created_at) over (partition by visitor_id order by created_at) - created_at)) as time_until_next_event,
387-
-- the time to the previous event for the same visitor
388-
extract(epoch from (created_at - lag(created_at) over (partition by visitor_id order by created_at))) as time_since_previous_event
386+
time_from_last_event,
387+
time_to_next_event,
389388
from events, params
390389
where
391390
event = ?::text and
@@ -400,7 +399,8 @@ pub fn overall_stats(
400399
{metric_avg_time_on_site} as avg_time_on_site
401400
from
402401
session_data sd;
403-
");
402+
"
403+
);
404404

405405
let mut stmt = conn.prepare_cached(&query)?;
406406
let result = stmt.query_row(duckdb::params_from_iter(params), |row| {
@@ -456,30 +456,29 @@ pub fn dimension_report(
456456
params.extend(entities);
457457
params.extend_from_params(filters_params);
458458

459-
let query = format!("--sql
459+
let query = format!(
460+
"--sql
460461
with
461462
params as (
462463
select
463464
?::timestamp as start_time,
464-
?::timestamp as end_time
465+
?::timestamp as end_time,
465466
),
466467
session_data as (
467468
select
468469
coalesce({dimension_column}, 'Unknown') as dimension_value,
469470
visitor_id,
470471
created_at,
471-
-- the time to the next event for the same visitor
472-
extract(epoch from (lead(created_at) over (partition by visitor_id order by created_at) - created_at)) as time_until_next_event,
473-
-- the time to the previous event for the same visitor
474-
extract(epoch from (created_at - lag(created_at) over (partition by visitor_id order by created_at))) as time_since_previous_event
472+
time_from_last_event,
473+
time_to_next_event,
475474
from events sd, params
476475
where
477476
event = ?::text and
478477
created_at between params.start_time and params.end_time and
479478
entity_id in ({entity_vars})
480479
{filters_sql}
481480
group by
482-
{group_by_columns}, visitor_id, created_at
481+
{group_by_columns}, visitor_id, created_at, time_from_last_event, time_to_next_event
483482
)
484483
select
485484
dimension_value,

src/app/models.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ macro_rules! event_params {
9292
$event.utm_medium,
9393
$event.utm_campaign,
9494
$event.utm_content,
95-
$event.utm_term
95+
$event.utm_term,
96+
None::<std::time::Duration>,
97+
None::<std::time::Duration>,
9698
]
9799
};
98100
}

src/cli.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ pub fn handle_command(mut config: Config, cmd: Command) -> Result<()> {
133133
#[cfg(any(debug_assertions, test, feature = "_enable_seeding"))]
134134
Command::SeedDatabase(_) => {
135135
let app = Liwan::try_new(config)?;
136-
app.seed_database(100000)?;
136+
app.seed_database(1000000)?;
137137
println!("Database seeded with test data");
138138
}
139139
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
alter table events add column time_from_last_event interval;
2+
alter table events add column time_to_next_event interval;
3+
4+
with cte as (
5+
select
6+
visitor_id,
7+
created_at,
8+
created_at - lag(created_at) over (partition by visitor_id order by created_at) as time_from_last_event,
9+
lead(created_at) over (partition by visitor_id order by created_at) - created_at as time_to_next_event
10+
from events
11+
)
12+
update events
13+
set
14+
time_from_last_event = cte.time_from_last_event,
15+
time_to_next_event = cte.time_to_next_event
16+
from cte
17+
where events.visitor_id = cte.visitor_id and events.created_at = cte.created_at;

src/utils/seed.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ pub fn random_events(
4242
}
4343
generated += 1;
4444

45-
let created_at = random_date(time_range.0, time_range.1, 0.5);
45+
// let created_at = random_date(time_range.0, time_range.1, 0.5);
4646

47-
// let time_slice = time_range.1 - time_range.0;
48-
// let skew_factor = 2.0;
49-
// let normalized = 1.0 - (1.0 - (generated as f64 / count as f64)).powf(skew_factor);
50-
// let created_at = time_range.0 + time_slice * normalized;
47+
let time_slice = time_range.1 - time_range.0;
48+
let skew_factor = 2.0;
49+
let normalized = 1.0 - (1.0 - (generated as f64 / count as f64)).powf(skew_factor);
50+
let created_at = time_range.0 + time_slice * normalized;
5151

5252
let path = random_el(PATHS, 0.5);
5353
let referrer = random_el(REFERRERS, 0.5);
@@ -78,16 +78,16 @@ pub fn random_events(
7878
})
7979
}
8080

81-
fn random_date(min: OffsetDateTime, max: OffsetDateTime, scale: f64) -> OffsetDateTime {
82-
let mut rng = rand::thread_rng();
83-
let uniform_random: f64 = rng.gen();
84-
let weighted_random = (uniform_random.powf(1.0 - scale)).min(1.0);
85-
let duration = max - min;
86-
let duration_seconds = duration.as_seconds_f64();
87-
let weighted_duration_seconds = duration_seconds * weighted_random;
88-
let weighted_duration = time::Duration::seconds(weighted_duration_seconds as i64);
89-
min + weighted_duration
90-
}
81+
// fn random_date(min: OffsetDateTime, max: OffsetDateTime, scale: f64) -> OffsetDateTime {
82+
// let mut rng = rand::thread_rng();
83+
// let uniform_random: f64 = rng.gen();
84+
// let weighted_random = (uniform_random.powf(1.0 - scale)).min(1.0);
85+
// let duration = max - min;
86+
// let duration_seconds = duration.as_seconds_f64();
87+
// let weighted_duration_seconds = duration_seconds * weighted_random;
88+
// let weighted_duration = time::Duration::seconds(weighted_duration_seconds as i64);
89+
// min + weighted_duration
90+
// }
9191

9292
fn random_el<T>(slice: &[T], scale: f64) -> &T {
9393
let mut rng = rand::thread_rng();

0 commit comments

Comments
 (0)