Skip to content

Commit dcbd9a2

Browse files
committed
feat(query): add execution history filters and time-bucket aggregation
1 parent 27fd0f9 commit dcbd9a2

File tree

3 files changed

+117
-3
lines changed

3 files changed

+117
-3
lines changed

internal/model/result.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
type CheckExecutionStatus string
1212
type CheckStateStatus string
1313
type ServiceStateStatus string
14+
type CheckExecutionBucket string
1415

1516
const (
1617
CheckExecutionSuccess CheckExecutionStatus = "success"
@@ -24,6 +25,11 @@ const (
2425
ServiceStateHealthy ServiceStateStatus = "healthy"
2526
ServiceStateUnhealthy ServiceStateStatus = "unhealthy"
2627
ServiceStateDegraded ServiceStateStatus = "degraded"
28+
29+
CheckExecutionBucketSecond CheckExecutionBucket = "second"
30+
CheckExecutionBucketMinute CheckExecutionBucket = "minute"
31+
CheckExecutionBucketHour CheckExecutionBucket = "hour"
32+
CheckExecutionBucketDay CheckExecutionBucket = "day"
2733
)
2834

2935
type CheckExecutionResult struct {
@@ -81,7 +87,20 @@ type CheckExecutionFilter struct {
8187
Limit int
8288
}
8389

84-
func (f *CheckExecutionFilter) Apply(query string, fieldFn func(string) string) (string, []any) {
90+
type CheckExecutionAggregateFilter struct {
91+
Bucket CheckExecutionBucket
92+
CheckExecutionFilter
93+
}
94+
95+
type CheckExecutionBucketRecord struct {
96+
BucketStart time.Time
97+
Total int
98+
SuccessCount int
99+
FailureCount int
100+
AvgDurationUs int64
101+
}
102+
103+
func (f *CheckExecutionFilter) ApplyConditions(fieldFn func(string) string) ([]string, []any) {
85104
var (
86105
conditions []string
87106
args []any
@@ -99,14 +118,26 @@ func (f *CheckExecutionFilter) Apply(query string, fieldFn func(string) string)
99118

100119
if f.From != nil {
101120
args = append(args, *f.From)
102-
conditions = append(conditions, fmt.Sprintf("%s >= $%d", fieldFn("finished_at"), len(args)))
121+
conditions = append(
122+
conditions,
123+
fmt.Sprintf("%s >= TIMESTAMPTZ $%d", fieldFn("finished_at"), len(args)),
124+
)
103125
}
104126

105127
if f.To != nil {
106128
args = append(args, *f.To)
107-
conditions = append(conditions, fmt.Sprintf("%s <= $%d", fieldFn("finished_at"), len(args)))
129+
conditions = append(
130+
conditions,
131+
fmt.Sprintf("%s <= TIMESTAMPTZ $%d", fieldFn("finished_at"), len(args)),
132+
)
108133
}
109134

135+
return conditions, args
136+
}
137+
138+
func (f *CheckExecutionFilter) Apply(query string, fieldFn func(string) string) (string, []any) {
139+
conditions, args := f.ApplyConditions(fieldFn)
140+
110141
if len(conditions) > 0 {
111142
query += "WHERE " + conditions[0]
112143
for i := 1; i < len(conditions); i++ {

internal/repository/check/execution.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package check
33
import (
44
"context"
55
"encoding/json"
6+
"fmt"
67
"time"
78

89
"github.com/pixel365/pulse/internal/repository"
@@ -156,6 +157,69 @@ FROM pulse.check_executions
156157
return result, nil
157158
}
158159

160+
func (e *ExecutionCheck) ListExecutionBuckets(
161+
ctx context.Context,
162+
filter model.CheckExecutionAggregateFilter,
163+
) ([]model.CheckExecutionBucketRecord, error) {
164+
bucketExpr, err := executionBucketExpr(filter.Bucket)
165+
if err != nil {
166+
return nil, err
167+
}
168+
169+
query := fmt.Sprintf(`
170+
SELECT
171+
%s AS bucket_start,
172+
COUNT(1) AS total,
173+
COUNT(1) FILTER (WHERE status = 'success') AS success_count,
174+
COUNT(1) FILTER (WHERE status = 'failure') AS failure_count,
175+
AVG(duration)::bigint AS avg_duration_us
176+
FROM pulse.check_executions
177+
`, bucketExpr)
178+
179+
conditions, args := filter.ApplyConditions(mustField)
180+
if len(conditions) > 0 {
181+
query += "WHERE " + conditions[0]
182+
for i := 1; i < len(conditions); i++ {
183+
query += " AND " + conditions[i]
184+
}
185+
query += "\n"
186+
}
187+
188+
query += "GROUP BY bucket_start\n"
189+
query += "ORDER BY bucket_start\n"
190+
191+
rows, err := e.db.Query(ctx, query, args...)
192+
if err != nil {
193+
return nil, err
194+
}
195+
defer rows.Close()
196+
197+
var result []model.CheckExecutionBucketRecord
198+
199+
for rows.Next() {
200+
var row model.CheckExecutionBucketRecord
201+
202+
err = rows.Scan(
203+
&row.BucketStart,
204+
&row.Total,
205+
&row.SuccessCount,
206+
&row.FailureCount,
207+
&row.AvgDurationUs,
208+
)
209+
if err != nil {
210+
return nil, err
211+
}
212+
213+
result = append(result, row)
214+
}
215+
216+
if err = rows.Err(); err != nil {
217+
return nil, err
218+
}
219+
220+
return result, nil
221+
}
222+
159223
func mustField(name string) string {
160224
switch name {
161225
case "service_id", "check_id", "finished_at":
@@ -165,6 +229,21 @@ func mustField(name string) string {
165229
panic("unknown field " + name + " in check execution filter")
166230
}
167231

232+
func executionBucketExpr(bucket model.CheckExecutionBucket) (string, error) {
233+
switch bucket {
234+
case model.CheckExecutionBucketSecond:
235+
return "date_trunc('second', finished_at)", nil
236+
case "", model.CheckExecutionBucketMinute:
237+
return "date_trunc('minute', finished_at)", nil
238+
case model.CheckExecutionBucketHour:
239+
return "date_trunc('hour', finished_at)", nil
240+
case model.CheckExecutionBucketDay:
241+
return "date_trunc('day', finished_at)", nil
242+
default:
243+
return "", fmt.Errorf("unsupported execution bucket %q", bucket)
244+
}
245+
}
246+
168247
func NewExecutionRepository(db repository.QueryExecutor) *ExecutionCheck {
169248
return &ExecutionCheck{db}
170249
}

internal/repository/check/interfaces.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,8 @@ type CheckExecutionRepository interface {
1818
context.Context,
1919
model.CheckExecutionFilter,
2020
) ([]model.CheckExecutionRecord, error)
21+
ListExecutionBuckets(
22+
context.Context,
23+
model.CheckExecutionAggregateFilter,
24+
) ([]model.CheckExecutionBucketRecord, error)
2125
}

0 commit comments

Comments
 (0)