Skip to content

Commit 82bf3bb

Browse files
committed
feat(timeline): add derived check state timeline query
1 parent dcbd9a2 commit 82bf3bb

File tree

3 files changed

+189
-8
lines changed

3 files changed

+189
-8
lines changed

internal/model/result.go

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,23 @@ type CheckExecutionBucketRecord struct {
100100
AvgDurationUs int64
101101
}
102102

103+
type CheckExecutionTimelineFilter struct {
104+
From time.Time
105+
To time.Time
106+
ServiceID string
107+
CheckID string
108+
Bucket CheckExecutionBucket
109+
Interval time.Duration
110+
}
111+
112+
type CheckExecutionTimelineRecord struct {
113+
BucketStart time.Time
114+
BucketEnd time.Time
115+
LastObservedAt *time.Time
116+
LastExecutionStatus *CheckExecutionStatus
117+
State CheckStateStatus
118+
}
119+
103120
func (f *CheckExecutionFilter) ApplyConditions(fieldFn func(string) string) ([]string, []any) {
104121
var (
105122
conditions []string
@@ -118,18 +135,12 @@ func (f *CheckExecutionFilter) ApplyConditions(fieldFn func(string) string) ([]s
118135

119136
if f.From != nil {
120137
args = append(args, *f.From)
121-
conditions = append(
122-
conditions,
123-
fmt.Sprintf("%s >= TIMESTAMPTZ $%d", fieldFn("finished_at"), len(args)),
124-
)
138+
conditions = append(conditions, fmt.Sprintf("%s >= $%d", fieldFn("finished_at"), len(args)))
125139
}
126140

127141
if f.To != nil {
128142
args = append(args, *f.To)
129-
conditions = append(
130-
conditions,
131-
fmt.Sprintf("%s <= TIMESTAMPTZ $%d", fieldFn("finished_at"), len(args)),
132-
)
143+
conditions = append(conditions, fmt.Sprintf("%s <= $%d", fieldFn("finished_at"), len(args)))
133144
}
134145

135146
return conditions, args
@@ -155,3 +166,31 @@ func (f *CheckExecutionFilter) Apply(query string, fieldFn func(string) string)
155166

156167
return query, args
157168
}
169+
170+
func (f *CheckExecutionTimelineFilter) Validate() error {
171+
if f.ServiceID == "" {
172+
return fmt.Errorf("service_id is required")
173+
}
174+
175+
if f.CheckID == "" {
176+
return fmt.Errorf("check_id is required")
177+
}
178+
179+
if f.From.IsZero() {
180+
return fmt.Errorf("from is required")
181+
}
182+
183+
if f.To.IsZero() {
184+
return fmt.Errorf("to is required")
185+
}
186+
187+
if !f.To.After(f.From) {
188+
return fmt.Errorf("to must be after from")
189+
}
190+
191+
if f.Interval <= 0 {
192+
return fmt.Errorf("interval must be greater than zero")
193+
}
194+
195+
return nil
196+
}

internal/repository/check/execution.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,129 @@ FROM pulse.check_executions
220220
return result, nil
221221
}
222222

223+
func (e *ExecutionCheck) ListExecutionTimeline(
224+
ctx context.Context,
225+
filter model.CheckExecutionTimelineFilter,
226+
) ([]model.CheckExecutionTimelineRecord, error) {
227+
if err := filter.Validate(); err != nil {
228+
return nil, err
229+
}
230+
231+
bucketStepUs, err := executionBucketStepMicros(filter.Bucket)
232+
if err != nil {
233+
return nil, err
234+
}
235+
staleAfterUs := 2 * filter.Interval.Microseconds()
236+
237+
query := `
238+
WITH params AS (
239+
SELECT
240+
$1::text AS service_id,
241+
$2::text AS check_id,
242+
$3::timestamptz AS from_ts,
243+
$4::timestamptz AS to_ts,
244+
$5::bigint AS bucket_step_us,
245+
$6::bigint AS stale_after_us
246+
),
247+
buckets AS (
248+
SELECT
249+
gs AS bucket_start,
250+
gs + (p.bucket_step_us * interval '1 microsecond') AS bucket_end
251+
FROM params p,
252+
LATERAL generate_series(
253+
p.from_ts,
254+
p.to_ts,
255+
p.bucket_step_us * interval '1 microsecond'
256+
) AS gs
257+
),
258+
last_execution_per_bucket AS (
259+
SELECT
260+
b.bucket_start,
261+
b.bucket_end,
262+
e.finished_at AS last_observed_at,
263+
e.status AS last_execution_status
264+
FROM buckets b
265+
CROSS JOIN params p
266+
LEFT JOIN LATERAL (
267+
SELECT
268+
ce.finished_at,
269+
ce.status
270+
FROM pulse.check_executions ce
271+
WHERE ce.service_id = p.service_id
272+
AND ce.check_id = p.check_id
273+
AND ce.finished_at <= b.bucket_end
274+
ORDER BY ce.finished_at DESC
275+
LIMIT 1
276+
) e ON TRUE
277+
)
278+
SELECT
279+
bucket_start,
280+
bucket_end,
281+
last_observed_at,
282+
last_execution_status,
283+
CASE
284+
WHEN last_observed_at IS NULL THEN 'unknown'::pulse.check_state_status
285+
WHEN bucket_end - last_observed_at > (
286+
(SELECT stale_after_us FROM params) * interval '1 microsecond'
287+
) THEN 'unknown'::pulse.check_state_status
288+
WHEN last_execution_status = 'success' THEN 'healthy'::pulse.check_state_status
289+
WHEN last_execution_status = 'failure' THEN 'unhealthy'::pulse.check_state_status
290+
ELSE 'unknown'::pulse.check_state_status
291+
END AS timeline_state
292+
FROM last_execution_per_bucket
293+
ORDER BY bucket_start
294+
`
295+
296+
rows, err := e.db.Query(
297+
ctx,
298+
query,
299+
filter.ServiceID,
300+
filter.CheckID,
301+
filter.From,
302+
filter.To,
303+
bucketStepUs,
304+
staleAfterUs,
305+
)
306+
if err != nil {
307+
return nil, err
308+
}
309+
defer rows.Close()
310+
311+
var result []model.CheckExecutionTimelineRecord
312+
313+
for rows.Next() {
314+
var (
315+
row model.CheckExecutionTimelineRecord
316+
rawObservedAt *time.Time
317+
rawExecStatus *string
318+
)
319+
320+
err = rows.Scan(
321+
&row.BucketStart,
322+
&row.BucketEnd,
323+
&rawObservedAt,
324+
&rawExecStatus,
325+
&row.State,
326+
)
327+
if err != nil {
328+
return nil, err
329+
}
330+
331+
row.LastObservedAt = rawObservedAt
332+
if rawExecStatus != nil {
333+
row.LastExecutionStatus = new(model.CheckExecutionStatus(*rawExecStatus))
334+
}
335+
336+
result = append(result, row)
337+
}
338+
339+
if err = rows.Err(); err != nil {
340+
return nil, err
341+
}
342+
343+
return result, nil
344+
}
345+
223346
func mustField(name string) string {
224347
switch name {
225348
case "service_id", "check_id", "finished_at":
@@ -244,6 +367,21 @@ func executionBucketExpr(bucket model.CheckExecutionBucket) (string, error) {
244367
}
245368
}
246369

370+
func executionBucketStepMicros(bucket model.CheckExecutionBucket) (int64, error) {
371+
switch bucket {
372+
case model.CheckExecutionBucketSecond:
373+
return time.Second.Microseconds(), nil
374+
case "", model.CheckExecutionBucketMinute:
375+
return time.Minute.Microseconds(), nil
376+
case model.CheckExecutionBucketHour:
377+
return time.Hour.Microseconds(), nil
378+
case model.CheckExecutionBucketDay:
379+
return (24 * time.Hour).Microseconds(), nil
380+
default:
381+
return 0, fmt.Errorf("unsupported execution bucket %q", bucket)
382+
}
383+
}
384+
247385
func NewExecutionRepository(db repository.QueryExecutor) *ExecutionCheck {
248386
return &ExecutionCheck{db}
249387
}

internal/repository/check/interfaces.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,8 @@ type CheckExecutionRepository interface {
2222
context.Context,
2323
model.CheckExecutionAggregateFilter,
2424
) ([]model.CheckExecutionBucketRecord, error)
25+
ListExecutionTimeline(
26+
context.Context,
27+
model.CheckExecutionTimelineFilter,
28+
) ([]model.CheckExecutionTimelineRecord, error)
2529
}

0 commit comments

Comments
 (0)