Skip to content

Commit 34a6833

Browse files
committed
feat(state): persist check states and apply execution transitions
1 parent dec504f commit 34a6833

File tree

14 files changed

+359
-108
lines changed

14 files changed

+359
-108
lines changed

cmd/pulse/main.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212

1313
"github.com/pixel365/pulse/internal/logger"
1414

15-
checkrepo "github.com/pixel365/pulse/internal/repository/check"
1615
checksvc "github.com/pixel365/pulse/internal/services/check"
1716

1817
"github.com/pixel365/pulse/internal/app"
@@ -37,10 +36,7 @@ func main() {
3736
}
3837
defer pgPool.Close()
3938

40-
staterepo := checkrepo.NewStateRepository(pgPool)
41-
execRepo := checkrepo.NewExecutionRepository(pgPool)
42-
stateSvc := checksvc.NewStateService(staterepo)
43-
checkHandlerSvc := checksvc.NewHandlerService(stateSvc, execRepo)
39+
checkHandlerSvc := checksvc.NewHandlerService(pgPool)
4440

4541
runner := app.NewApp(cfg, log, checkHandlerSvc)
4642
if err := runner.Run(ctx); err != nil {

internal/db/postgres/tx.go

Lines changed: 0 additions & 48 deletions
This file was deleted.

internal/e/errors.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ import (
88

99
type ErrorKind string
1010

11+
func (e ErrorKind) String() string {
12+
return string(e)
13+
}
14+
1115
const (
1216
// ErrNone means the execution completed without an error.
1317
ErrNone ErrorKind = ""
@@ -31,6 +35,10 @@ const (
3135
ErrUnknown ErrorKind = "unknown"
3236
)
3337

38+
var (
39+
ErrNotFound = errors.New("not found")
40+
)
41+
3442
type KindError struct {
3543
Err error
3644
Kind ErrorKind

internal/repository/check/execution.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ import (
44
"context"
55
"encoding/json"
66

7+
"github.com/pixel365/pulse/internal/repository"
8+
79
e2 "github.com/pixel365/pulse/internal/e"
810
"github.com/pixel365/pulse/internal/model"
9-
"github.com/pixel365/pulse/internal/repository"
1011
)
1112

1213
var _ CheckExecutionRepository = (*ExecutionCheck)(nil)
@@ -57,7 +58,7 @@ INSERT INTO pulse.check_executions (
5758
result.CheckType,
5859
result.StartedAt,
5960
result.FinishedAt,
60-
result.Duration,
61+
result.Duration.Microseconds(),
6162
result.AttemptsTotal,
6263
&errKind,
6364
result.ErrorMessage,

internal/repository/check/interfaces.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import (
77
)
88

99
type CheckStateRepository interface {
10-
GetCheckState(context.Context, string) (*model.CheckState, error)
11-
UpdateCheckState(context.Context, model.CheckState) error
10+
GetCheckState(context.Context, string, string) (*model.CheckState, error)
11+
UpsertCheckState(context.Context, *model.CheckState) error
1212
}
1313

1414
type CheckExecutionRepository interface {

internal/repository/check/state.go

Lines changed: 169 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,16 @@ package check
22

33
import (
44
"context"
5+
"encoding/json"
6+
"errors"
7+
"time"
58

6-
"github.com/pixel365/pulse/internal/model"
9+
"github.com/jackc/pgx/v5"
10+
11+
"github.com/pixel365/pulse/internal/e"
712
"github.com/pixel365/pulse/internal/repository"
13+
14+
"github.com/pixel365/pulse/internal/model"
815
)
916

1017
var _ CheckStateRepository = (*StateCheck)(nil)
@@ -13,12 +20,169 @@ type StateCheck struct {
1320
db repository.QueryExecutor
1421
}
1522

16-
func (s *StateCheck) GetCheckState(ctx context.Context, service string) (*model.CheckState, error) {
17-
return nil, nil
23+
func (s *StateCheck) GetCheckState(
24+
ctx context.Context,
25+
checkID string,
26+
serviceID string,
27+
) (*model.CheckState, error) {
28+
query := `
29+
SELECT
30+
check_type,
31+
status,
32+
last_execution_id,
33+
last_status,
34+
last_error_kind,
35+
last_error_message,
36+
last_duration,
37+
last_details,
38+
last_success_at,
39+
last_failure_at,
40+
consecutive_successes,
41+
consecutive_failures,
42+
updated_at
43+
FROM pulse.check_states
44+
WHERE check_id = $1 AND service_id = $2
45+
`
46+
47+
state := model.CheckState{
48+
CheckID: checkID,
49+
ServiceID: serviceID,
50+
}
51+
52+
var (
53+
rawDetails []byte
54+
rawErrorKind *string
55+
rawErrorMessage *string
56+
rawDurationUs int64
57+
)
58+
59+
err := s.db.QueryRow(ctx, query, checkID, serviceID).Scan(
60+
&state.CheckType,
61+
&state.Status,
62+
&state.LastExecutionID,
63+
&state.LastStatus,
64+
&rawErrorKind,
65+
&rawErrorMessage,
66+
&rawDurationUs,
67+
&rawDetails,
68+
&state.LastSuccessAt,
69+
&state.LastFailureAt,
70+
&state.ConsecutiveSuccesses,
71+
&state.ConsecutiveFailures,
72+
&state.UpdatedAt,
73+
)
74+
if errors.Is(err, pgx.ErrNoRows) {
75+
return nil, e.ErrNotFound
76+
}
77+
78+
if err != nil {
79+
return nil, err
80+
}
81+
82+
if rawErrorKind != nil {
83+
state.LastErrorKind = e.ErrorKind(*rawErrorKind)
84+
}
85+
86+
if rawErrorMessage != nil {
87+
state.LastErrorMessage = *rawErrorMessage
88+
}
89+
state.LastDuration = time.Duration(rawDurationUs) * time.Microsecond
90+
91+
if rawDetails != nil {
92+
if err = json.Unmarshal(rawDetails, &state.LastDetails); err != nil {
93+
return nil, err
94+
}
95+
}
96+
97+
return &state, nil
1898
}
1999

20-
func (s *StateCheck) UpdateCheckState(ctx context.Context, state model.CheckState) error {
21-
return nil
100+
func (s *StateCheck) UpsertCheckState(
101+
ctx context.Context,
102+
state *model.CheckState,
103+
) error {
104+
if state == nil {
105+
return errors.New("check state is nil")
106+
}
107+
108+
query := `
109+
INSERT INTO pulse.check_states (
110+
check_id,
111+
service_id,
112+
check_type,
113+
status,
114+
last_execution_id,
115+
last_status,
116+
last_error_kind,
117+
last_error_message,
118+
last_duration,
119+
last_details,
120+
last_success_at,
121+
last_failure_at,
122+
consecutive_successes,
123+
consecutive_failures,
124+
updated_at
125+
) VALUES (
126+
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15
127+
)
128+
ON CONFLICT (check_id, service_id) DO UPDATE SET
129+
check_type = EXCLUDED.check_type,
130+
status = EXCLUDED.status,
131+
last_execution_id = EXCLUDED.last_execution_id,
132+
last_status = EXCLUDED.last_status,
133+
last_error_kind = EXCLUDED.last_error_kind,
134+
last_error_message = EXCLUDED.last_error_message,
135+
last_duration = EXCLUDED.last_duration,
136+
last_details = EXCLUDED.last_details,
137+
last_success_at = EXCLUDED.last_success_at,
138+
last_failure_at = EXCLUDED.last_failure_at,
139+
consecutive_successes = EXCLUDED.consecutive_successes,
140+
consecutive_failures = EXCLUDED.consecutive_failures,
141+
updated_at = EXCLUDED.updated_at
142+
`
143+
144+
var (
145+
details []byte
146+
errorKind *string
147+
errorMessage *string
148+
)
149+
150+
if state.LastDetails != nil {
151+
data, err := json.Marshal(state.LastDetails)
152+
if err != nil {
153+
return err
154+
}
155+
details = data
156+
}
157+
158+
if state.LastErrorKind != e.ErrNone {
159+
value := string(state.LastErrorKind)
160+
errorKind = &value
161+
}
162+
163+
if state.LastErrorMessage != "" {
164+
errorMessage = &state.LastErrorMessage
165+
}
166+
167+
_, err := s.db.Exec(ctx, query,
168+
state.CheckID,
169+
state.ServiceID,
170+
state.CheckType,
171+
state.Status,
172+
state.LastExecutionID,
173+
state.LastStatus,
174+
errorKind,
175+
errorMessage,
176+
state.LastDuration.Microseconds(),
177+
details,
178+
state.LastSuccessAt,
179+
state.LastFailureAt,
180+
state.ConsecutiveSuccesses,
181+
state.ConsecutiveFailures,
182+
state.UpdatedAt,
183+
)
184+
185+
return err
22186
}
23187

24188
func NewStateRepository(db repository.QueryExecutor) *StateCheck {

internal/repository/db.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package repository
22

33
import (
44
"context"
5+
"errors"
56

67
"github.com/jackc/pgx/v5"
78
"github.com/jackc/pgx/v5/pgconn"
@@ -11,6 +12,47 @@ type QueryExecutor interface {
1112
Exec(context.Context, string, ...any) (pgconn.CommandTag, error)
1213
Query(context.Context, string, ...any) (pgx.Rows, error)
1314
QueryRow(context.Context, string, ...any) pgx.Row
15+
}
16+
17+
type TxManager interface {
1418
BeginTx(context.Context, pgx.TxOptions) (pgx.Tx, error)
15-
Close()
19+
}
20+
21+
func Tx(
22+
ctx context.Context,
23+
manager TxManager,
24+
level pgx.TxIsoLevel,
25+
fns ...func(tx pgx.Tx) error,
26+
) (err error) {
27+
tx, err := manager.BeginTx(ctx, pgx.TxOptions{
28+
IsoLevel: level,
29+
})
30+
if err != nil {
31+
return err
32+
}
33+
34+
defer func() {
35+
if p := recover(); p != nil {
36+
_ = tx.Rollback(ctx)
37+
panic(p)
38+
}
39+
if err != nil {
40+
_ = tx.Rollback(ctx)
41+
return
42+
}
43+
err = tx.Commit(ctx)
44+
}()
45+
46+
for _, fn := range fns {
47+
if fn == nil {
48+
err = errors.New("transaction function is nil")
49+
return
50+
}
51+
if e := fn(tx); e != nil {
52+
err = e
53+
return
54+
}
55+
}
56+
57+
return nil
1658
}

0 commit comments

Comments
 (0)