Skip to content

Commit dec504f

Browse files
committed
feat(storage): persist raw check execution results in postgres
1 parent 86af870 commit dec504f

File tree

8 files changed

+118
-26
lines changed

8 files changed

+118
-26
lines changed

cmd/pulse/main.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88

99
"github.com/joho/godotenv"
1010

11+
"github.com/pixel365/pulse/internal/db/postgres"
12+
1113
"github.com/pixel365/pulse/internal/logger"
1214

1315
checkrepo "github.com/pixel365/pulse/internal/repository/check"
@@ -26,9 +28,19 @@ func main() {
2628

2729
cfg := config.MustLoad()
2830
log := logger.NewSlog()
29-
repo := checkrepo.NewStateRepository()
30-
stateSvc := checksvc.NewStateService(repo)
31-
checkHandlerSvc := checksvc.NewHandlerService(stateSvc)
31+
32+
pgConfig := postgres.NewConfigFromEnv()
33+
pgPool, err := postgres.NewPool(ctx, pgConfig)
34+
if err != nil {
35+
log.Error(ctx, "postgres pool error", "error", err)
36+
return
37+
}
38+
defer pgPool.Close()
39+
40+
staterepo := checkrepo.NewStateRepository(pgPool)
41+
execRepo := checkrepo.NewExecutionRepository(pgPool)
42+
stateSvc := checksvc.NewStateService(staterepo)
43+
checkHandlerSvc := checksvc.NewHandlerService(stateSvc, execRepo)
3244

3345
runner := app.NewApp(cfg, log, checkHandlerSvc)
3446
if err := runner.Run(ctx); err != nil {

internal/db/postgres/config.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,11 @@ import (
44
"fmt"
55
"net/url"
66
"os"
7-
8-
"github.com/pixel365/pulse/internal/logger"
97
)
108

119
type ConfigOption func(*Config)
1210

1311
type Config struct {
14-
Log logger.Logger
1512
User string
1613
Password string
1714
Host string

internal/db/postgres/postgres.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,38 +8,24 @@ import (
88
)
99

1010
func NewPool(ctx context.Context, cfg Config) (*pgxpool.Pool, error) {
11-
log := cfg.Log
1211
config, err := pgxpool.ParseConfig(cfg.DSN())
1312
if err != nil {
14-
if log != nil {
15-
log.Error(ctx, "cannot parse config", "error", err)
16-
}
1713
return nil, err
1814
}
1915

2016
config.ConnConfig.Tracer = otelpgx.NewTracer()
2117

2218
pool, err := pgxpool.NewWithConfig(ctx, config)
2319
if err != nil {
24-
if log != nil {
25-
log.Error(ctx, "cannot create pool", "error", err)
26-
}
2720
return nil, err
2821
}
2922

3023
if err = otelpgx.RecordStats(pool); err != nil {
31-
if log != nil {
32-
log.Error(ctx, "cannot record stats", "error", err)
33-
}
3424
pool.Close()
3525
return nil, err
3626
}
3727

3828
if err = pool.Ping(ctx); err != nil {
39-
if log != nil {
40-
log.Error(ctx, "cannot ping pool", "error", err)
41-
}
42-
4329
pool.Close()
4430
return nil, err
4531
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package check
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
7+
e2 "github.com/pixel365/pulse/internal/e"
8+
"github.com/pixel365/pulse/internal/model"
9+
"github.com/pixel365/pulse/internal/repository"
10+
)
11+
12+
var _ CheckExecutionRepository = (*ExecutionCheck)(nil)
13+
14+
type ExecutionCheck struct {
15+
db repository.QueryExecutor
16+
}
17+
18+
func (e *ExecutionCheck) Add(ctx context.Context, result model.CheckExecutionResult) error {
19+
query := `
20+
INSERT INTO pulse.check_executions (
21+
execution_id,
22+
check_id,
23+
service_id,
24+
status,
25+
check_type,
26+
started_at,
27+
finished_at,
28+
duration,
29+
attempts_total,
30+
error_kind,
31+
error_message,
32+
details
33+
) VALUES (
34+
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12
35+
)
36+
`
37+
38+
var details []byte
39+
if result.Details != nil {
40+
data, err := json.Marshal(result.Details)
41+
if err != nil {
42+
return err
43+
}
44+
details = data
45+
}
46+
47+
var errKind string
48+
if result.ErrorKind != e2.ErrNone {
49+
errKind = string(result.ErrorKind)
50+
}
51+
52+
_, err := e.db.Exec(ctx, query,
53+
result.ExecutionID,
54+
result.CheckID,
55+
result.ServiceID,
56+
result.Status,
57+
result.CheckType,
58+
result.StartedAt,
59+
result.FinishedAt,
60+
result.Duration,
61+
result.AttemptsTotal,
62+
&errKind,
63+
result.ErrorMessage,
64+
details,
65+
)
66+
67+
return err
68+
}
69+
70+
func NewExecutionRepository(db repository.QueryExecutor) *ExecutionCheck {
71+
return &ExecutionCheck{db}
72+
}

internal/repository/check/interfaces.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,7 @@ type CheckStateRepository interface {
1010
GetCheckState(context.Context, string) (*model.CheckState, error)
1111
UpdateCheckState(context.Context, model.CheckState) error
1212
}
13+
14+
type CheckExecutionRepository interface {
15+
Add(context.Context, model.CheckExecutionResult) error
16+
}

internal/repository/check/state.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ import (
44
"context"
55

66
"github.com/pixel365/pulse/internal/model"
7+
"github.com/pixel365/pulse/internal/repository"
78
)
89

910
var _ CheckStateRepository = (*StateCheck)(nil)
1011

1112
type StateCheck struct {
13+
db repository.QueryExecutor
1214
}
1315

1416
func (s *StateCheck) GetCheckState(ctx context.Context, service string) (*model.CheckState, error) {
@@ -19,6 +21,6 @@ func (s *StateCheck) UpdateCheckState(ctx context.Context, state model.CheckStat
1921
return nil
2022
}
2123

22-
func NewStateRepository() *StateCheck {
23-
return &StateCheck{}
24+
func NewStateRepository(db repository.QueryExecutor) *StateCheck {
25+
return &StateCheck{db}
2426
}

internal/repository/db.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package repository
2+
3+
import (
4+
"context"
5+
6+
"github.com/jackc/pgx/v5"
7+
"github.com/jackc/pgx/v5/pgconn"
8+
)
9+
10+
type QueryExecutor interface {
11+
Exec(context.Context, string, ...any) (pgconn.CommandTag, error)
12+
Query(context.Context, string, ...any) (pgx.Rows, error)
13+
QueryRow(context.Context, string, ...any) pgx.Row
14+
BeginTx(context.Context, pgx.TxOptions) (pgx.Tx, error)
15+
Close()
16+
}

internal/services/check/handler.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,27 @@ import (
44
"context"
55

66
"github.com/pixel365/pulse/internal/model"
7+
"github.com/pixel365/pulse/internal/repository/check"
78
)
89

910
var _ CheckHandlerService = (*Handler)(nil)
1011

1112
type Handler struct {
12-
svc CheckStateService
13+
repo check.CheckExecutionRepository
14+
svc CheckStateService
1315
}
1416

1517
func (h *Handler) Handle(
1618
ctx context.Context,
1719
policy model.CheckPolicy,
1820
result model.CheckExecutionResult,
1921
) error {
20-
return nil
22+
return h.repo.Add(ctx, result)
2123
}
2224

23-
func NewHandlerService(svc CheckStateService) *Handler {
25+
func NewHandlerService(svc CheckStateService, repo check.CheckExecutionRepository) *Handler {
2426
return &Handler{
25-
svc: svc,
27+
svc: svc,
28+
repo: repo,
2629
}
2730
}

0 commit comments

Comments
 (0)