diff --git a/extension/src/heartbeat_agg.rs b/extension/src/heartbeat_agg.rs index c547b443..bb49c187 100644 --- a/extension/src/heartbeat_agg.rs +++ b/extension/src/heartbeat_agg.rs @@ -395,6 +395,85 @@ pub fn uptime(agg: HeartbeatAgg<'static>) -> Interval { agg.sum_live_intervals().into() } +#[pg_extern] +pub fn average_downtime(agg: HeartbeatAgg<'static>) -> f64 { + let num_intervals = agg.num_intervals; + (agg.end_time - agg.start_time - agg.sum_live_intervals()) as f64 / num_intervals as f64 +} + +#[pg_extern] +pub fn average_uptime(agg: HeartbeatAgg<'static>) -> f64 { + let num_intervals = agg.num_intervals; + agg.sum_live_intervals() as f64 / num_intervals as f64 +} + +fn mean(data: &Vec) -> Option { + let sum = data.iter().sum::() as f64; + let count = data.len(); + + match count { + positive if positive > 0 => Some(sum / count as f64), + _ => None, + } +} + +fn std_deviation(data: &Vec) -> Option { + match (mean(data), data.len()) { + (Some(data_mean), count) if count > 0 => { + let variance = data + .iter() + .map(|value| { + let diff = data_mean - (*value as f64); + + diff * diff + }) + .sum::() + / count as f64; + + Some(variance.sqrt()) + } + _ => None, + } +} + +#[pg_extern] +pub fn stddev_downtime(agg: HeartbeatAgg<'static>) -> Option { + // Dead ranges are the opposite of the intervals stored in the aggregate + let mut starts = agg.interval_ends.clone().into_vec(); + let mut ends = agg.interval_starts.clone().into_vec(); + + // Fix the first point depending on whether the aggregate starts in a live or dead range + if ends[0] == agg.start_time { + ends.remove(0); + } else { + starts.insert(0, agg.start_time); + } + + // Fix the last point depending on whether the aggregate starts in a live or dead range + if *starts.last().unwrap() == agg.end_time { + starts.pop(); + } else { + ends.push(agg.end_time); + } + + let mut data: Vec = vec![]; + for i in 0..agg.num_intervals as usize { + data.push(ends[i] - starts[i]); + } + std_deviation(&data) +} + +#[pg_extern] +pub fn stddev_uptime(agg: HeartbeatAgg<'static>) -> Option { + let starts = agg.interval_starts.as_slice(); + let ends = agg.interval_ends.as_slice(); + let mut data: Vec = vec![]; + for i in 0..agg.num_intervals as usize { + data.push(ends[i] - starts[i]); + } + std_deviation(&data) +} + #[pg_operator(immutable, parallel_safe)] #[opname(->)] pub fn arrow_heartbeat_agg_uptime( @@ -1014,7 +1093,7 @@ mod tests { let (result1, result2, result3) = client.update( "WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness) - SELECT live_at(agg, '01-01-2020 00:01:00 UTC')::TEXT, + SELECT live_at(agg, '01-01-2020 00:01:00 UTC')::TEXT, live_at(agg, '01-01-2020 00:05:00 UTC')::TEXT, live_at(agg, '01-01-2020 00:30:00 UTC')::TEXT FROM agg", None, None) .unwrap().first() @@ -1035,7 +1114,7 @@ mod tests { let (result1, result2, result3) = client.update( "WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness) - SELECT (agg -> live_at('01-01-2020 00:01:00 UTC'))::TEXT, + SELECT (agg -> live_at('01-01-2020 00:01:00 UTC'))::TEXT, (agg -> live_at('01-01-2020 00:05:00 UTC'))::TEXT, (agg -> live_at('01-01-2020 00:30:00 UTC'))::TEXT FROM agg", None, None) .unwrap().first() @@ -1110,7 +1189,7 @@ mod tests { .update( "WITH aggs AS ( SELECT heartbeat_agg(time, batch, '1h', '1m') - FROM heartbeats + FROM heartbeats GROUP BY batch ) SELECT rollup(heartbeat_agg)::TEXT FROM aggs", None, @@ -1348,12 +1427,12 @@ mod tests { .update( "WITH s AS ( SELECT start, - heartbeat_agg(heartbeat, start, '30m', '10m') AS agg - FROM liveness + heartbeat_agg(heartbeat, start, '30m', '10m') AS agg + FROM liveness GROUP BY start), t AS ( SELECT start, - interpolate(agg, LAG (agg) OVER (ORDER BY start)) AS agg + interpolate(agg, LAG (agg) OVER (ORDER BY start)) AS agg FROM s) SELECT downtime(agg)::TEXT FROM t;", None, @@ -1395,12 +1474,12 @@ mod tests { .update( "WITH s AS ( SELECT start, - heartbeat_agg(heartbeat, start, '30m', '10m') AS agg - FROM liveness + heartbeat_agg(heartbeat, start, '30m', '10m') AS agg + FROM liveness GROUP BY start), t AS ( SELECT start, - interpolate(agg, LAG (agg) OVER (ORDER BY start)) AS agg + interpolate(agg, LAG (agg) OVER (ORDER BY start)) AS agg FROM s) SELECT live_ranges(agg)::TEXT FROM t;", None, @@ -1455,12 +1534,12 @@ mod tests { .update( "WITH s AS ( SELECT start, - heartbeat_agg(heartbeat, start, '30m', '10m') AS agg - FROM liveness + heartbeat_agg(heartbeat, start, '30m', '10m') AS agg + FROM liveness GROUP BY start), t AS ( SELECT start, - agg -> interpolate(LAG (agg) OVER (ORDER BY start)) AS agg + agg -> interpolate(LAG (agg) OVER (ORDER BY start)) AS agg FROM s) SELECT live_ranges(agg)::TEXT FROM t;", None, @@ -1515,8 +1594,8 @@ mod tests { .update( "WITH s AS ( SELECT start, - heartbeat_agg(heartbeat, start, '30m', '10m') AS agg - FROM liveness + heartbeat_agg(heartbeat, start, '30m', '10m') AS agg + FROM liveness GROUP BY start) SELECT interpolated_uptime(agg, LAG (agg) OVER (ORDER BY start))::TEXT FROM s", @@ -1559,8 +1638,8 @@ mod tests { .update( "WITH s AS ( SELECT start, - heartbeat_agg(heartbeat, start, '30m', '10m') AS agg - FROM liveness + heartbeat_agg(heartbeat, start, '30m', '10m') AS agg + FROM liveness GROUP BY start) SELECT (agg -> interpolated_uptime(LAG (agg) OVER (ORDER BY start)))::TEXT FROM s", @@ -1603,8 +1682,8 @@ mod tests { .update( "WITH s AS ( SELECT start, - heartbeat_agg(heartbeat, start, '30m', '10m') AS agg - FROM liveness + heartbeat_agg(heartbeat, start, '30m', '10m') AS agg + FROM liveness GROUP BY start) SELECT interpolated_downtime(agg, LAG (agg) OVER (ORDER BY start))::TEXT FROM s", @@ -1647,8 +1726,8 @@ mod tests { .update( "WITH s AS ( SELECT start, - heartbeat_agg(heartbeat, start, '30m', '10m') AS agg - FROM liveness + heartbeat_agg(heartbeat, start, '30m', '10m') AS agg + FROM liveness GROUP BY start) SELECT (agg -> interpolated_downtime(LAG (agg) OVER (ORDER BY start)))::TEXT FROM s", @@ -1836,12 +1915,12 @@ mod tests { let output = client .update( "WITH rollups AS ( - SELECT heartbeat_agg(ts, batch, '2h', '20m') - FROM poc - GROUP BY batch + SELECT heartbeat_agg(ts, batch, '2h', '20m') + FROM poc + GROUP BY batch ORDER BY batch ) - SELECT live_ranges(rollup(heartbeat_agg))::TEXT + SELECT live_ranges(rollup(heartbeat_agg))::TEXT FROM rollups", None, None, diff --git a/extension/src/stabilization_info.rs b/extension/src/stabilization_info.rs index 3b5e1fb3..58054e83 100644 --- a/extension/src/stabilization_info.rs +++ b/extension/src/stabilization_info.rs @@ -29,6 +29,10 @@ crate::functions_stabilized_at! { num_live_ranges(heartbeatagg), trim_to(heartbeatagg,timestamp with time zone,interval), trim_to(timestamp with time zone,interval), + average_downtime(heartbeatagg), + average_uptime(heartbeatagg), + stddev_downtime(heartbeatagg), + stddev_uptime(heartbeatagg), accessorpercentilearray_in(cstring), accessorpercentilearray_out(accessorpercentilearray), arrow_uddsketch_approx_percentile_array(uddsketch,accessorpercentilearray), @@ -274,6 +278,7 @@ crate::functions_stabilized_at! { live_ranges(heartbeatagg), rollup(heartbeatagg), uptime(heartbeatagg), + accessordeadranges_in(cstring), accessordeadranges_out(accessordeadranges), accessordowntime_in(cstring),