Skip to content

Commit ec6accb

Browse files
committed
refactor: introduce check state handling pipeline
1 parent da7ccb2 commit ec6accb

File tree

10 files changed

+188
-40
lines changed

10 files changed

+188
-40
lines changed

cmd/pulse/main.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ import (
99

1010
"github.com/joho/godotenv"
1111

12+
checkrepo "github.com/pixel365/pulse/internal/repository/check"
13+
checksvc "github.com/pixel365/pulse/internal/services/check"
14+
1215
"github.com/pixel365/pulse/internal/app"
1316

1417
"github.com/pixel365/pulse/internal/config"
@@ -24,7 +27,11 @@ func main() {
2427

2528
cfg := config.MustLoad()
2629

27-
runner := app.NewApp(cfg)
30+
repo := checkrepo.NewStateRepository()
31+
stateSvc := checksvc.NewStateService(repo)
32+
checkHandlerSvc := checksvc.NewHandlerService(stateSvc)
33+
34+
runner := app.NewApp(cfg, checkHandlerSvc)
2835
if err := runner.Run(ctx); err != nil {
2936
stop()
3037
log.Fatalf("app run error: %v", err)

internal/app/app.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55

66
"golang.org/x/sync/errgroup"
77

8+
checksvc "github.com/pixel365/pulse/internal/services/check"
9+
810
"github.com/pixel365/pulse/internal"
911

1012
"github.com/pixel365/pulse/internal/checker/dns"
@@ -18,52 +20,54 @@ import (
1820
var _ internal.Runner = (*App)(nil)
1921

2022
type App struct {
21-
cfg *config.Config
23+
cfg *config.Config
24+
checkHandlerSvc checksvc.CheckHandlerService
2225
}
2326

24-
func NewApp(cfg *config.Config) *App {
25-
return &App{cfg}
27+
func NewApp(cfg *config.Config, checkSvc checksvc.CheckHandlerService) *App {
28+
return &App{
29+
cfg: cfg,
30+
checkHandlerSvc: checkSvc,
31+
}
2632
}
2733

2834
func (a *App) Run(ctx context.Context) error {
2935
g, ctx := errgroup.WithContext(ctx)
3036

31-
w := internal.FakeWriter{}
32-
3337
for i := range a.cfg.HttpChecks {
34-
executor := internal.NewCheckExecutor(w, a.cfg.HttpChecks[i].CheckFields)
38+
executor := internal.NewCheckExecutor(a.checkHandlerSvc, a.cfg.HttpChecks[i].CheckFields)
3539
checker := http.NewChecker(a.cfg.HttpChecks[i], executor)
3640
g.Go(func() error {
3741
return checker.Check(ctx)
3842
})
3943
}
4044

4145
for i := range a.cfg.TCPChecks {
42-
executor := internal.NewCheckExecutor(w, a.cfg.TCPChecks[i].CheckFields)
46+
executor := internal.NewCheckExecutor(a.checkHandlerSvc, a.cfg.TCPChecks[i].CheckFields)
4347
checker := tcp.NewChecker(a.cfg.TCPChecks[i], executor)
4448
g.Go(func() error {
4549
return checker.Check(ctx)
4650
})
4751
}
4852

4953
for i := range a.cfg.GRPCChecks {
50-
executor := internal.NewCheckExecutor(w, a.cfg.GRPCChecks[i].CheckFields)
54+
executor := internal.NewCheckExecutor(a.checkHandlerSvc, a.cfg.GRPCChecks[i].CheckFields)
5155
checker := grpc.NewChecker(a.cfg.GRPCChecks[i], executor)
5256
g.Go(func() error {
5357
return checker.Check(ctx)
5458
})
5559
}
5660

5761
for i := range a.cfg.DNSChecks {
58-
executor := internal.NewCheckExecutor(w, a.cfg.DNSChecks[i].CheckFields)
62+
executor := internal.NewCheckExecutor(a.checkHandlerSvc, a.cfg.DNSChecks[i].CheckFields)
5963
checker := dns.NewChecker(a.cfg.DNSChecks[i], executor)
6064
g.Go(func() error {
6165
return checker.Check(ctx)
6266
})
6367
}
6468

6569
for i := range a.cfg.TLSChecks {
66-
executor := internal.NewCheckExecutor(w, a.cfg.TLSChecks[i].CheckFields)
70+
executor := internal.NewCheckExecutor(a.checkHandlerSvc, a.cfg.TLSChecks[i].CheckFields)
6771
checker := tls.NewChecker(a.cfg.TLSChecks[i], executor)
6872
g.Go(func() error {
6973
return checker.Check(ctx)

internal/executor.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/pixel365/pulse/internal/config"
99
"github.com/pixel365/pulse/internal/e"
1010
"github.com/pixel365/pulse/internal/model"
11+
"github.com/pixel365/pulse/internal/services/check"
1112
)
1213

1314
type CheckExecutor interface {
@@ -17,8 +18,8 @@ type CheckExecutor interface {
1718
var _ CheckExecutor = (*CheckExec)(nil)
1819

1920
type CheckExec struct {
20-
writer ResultWriter
21-
cfg config.CheckFields
21+
handler check.CheckHandlerService
22+
cfg config.CheckFields
2223
}
2324

2425
func (c *CheckExec) Execute(
@@ -37,8 +38,17 @@ func (c *CheckExec) Execute(
3738
case <-ticker.C:
3839
Sleep(ctx, c.cfg.Jitter)
3940
result := c.execute(ctx, request)
41+
42+
policy := model.CheckPolicy{
43+
CheckID: result.CheckID,
44+
ServiceID: result.ServiceID,
45+
CheckType: result.CheckType,
46+
FailureThreshold: c.cfg.FailureThreshold,
47+
SuccessThreshold: c.cfg.SuccessThreshold,
48+
}
49+
4050
//nolint:staticcheck
41-
if err := c.writer.Write(ctx, result); err != nil {
51+
if err := c.handler.Handle(ctx, policy, result); err != nil {
4252
//TODO: log
4353
}
4454
}
@@ -56,7 +66,7 @@ func (c *CheckExec) execute(
5666
CheckID: c.cfg.ID,
5767
ServiceID: c.cfg.Service,
5868
CheckType: c.cfg.Type,
59-
Status: model.Success,
69+
Status: model.CheckExecutionSuccess,
6070
StartedAt: time.Now().UTC(),
6171
ErrorKind: e.ErrNone,
6272
ErrorMessage: "",
@@ -84,19 +94,19 @@ func (c *CheckExec) execute(
8494
result.Duration = result.FinishedAt.Sub(result.StartedAt)
8595

8696
if err != nil {
87-
result.Status = model.Failure
97+
result.Status = model.CheckExecutionFailure
8898
result.ErrorKind, result.ErrorMessage = e.ResolveError(err)
8999
}
90100

91101
return result
92102
}
93103

94104
func NewCheckExecutor(
95-
w ResultWriter,
105+
handler check.CheckHandlerService,
96106
cfg config.CheckFields,
97107
) *CheckExec {
98108
return &CheckExec{
99-
writer: w,
100-
cfg: cfg,
109+
handler: handler,
110+
cfg: cfg,
101111
}
102112
}

internal/model/result.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,21 @@ import (
88
)
99

1010
type CheckExecutionStatus string
11+
type CheckStateStatus string
12+
type ServiceStateStatus string
1113

1214
const (
13-
Success CheckExecutionStatus = "success"
14-
Failure CheckExecutionStatus = "failure"
15+
CheckExecutionSuccess CheckExecutionStatus = "success"
16+
CheckExecutionFailure CheckExecutionStatus = "failure"
17+
18+
CheckStateUnknown CheckStateStatus = "unknown"
19+
CheckStateHealthy CheckStateStatus = "healthy"
20+
CheckStateUnhealthy CheckStateStatus = "unhealthy"
21+
22+
ServiceStateUnknown ServiceStateStatus = "unknown"
23+
ServiceStateHealthy ServiceStateStatus = "healthy"
24+
ServiceStateUnhealthy ServiceStateStatus = "unhealthy"
25+
ServiceStateDegraded ServiceStateStatus = "degraded"
1526
)
1627

1728
type CheckExecutionResult struct {
@@ -28,3 +39,29 @@ type CheckExecutionResult struct {
2839
Duration time.Duration
2940
AttemptsTotal int
3041
}
42+
43+
type CheckState struct {
44+
UpdatedAt time.Time
45+
LastFailureAt *time.Time
46+
LastSuccessAt *time.Time
47+
LastDetails map[string]any
48+
LastStatus CheckExecutionStatus
49+
LastExecutionID string
50+
CheckID string
51+
LastErrorKind e.ErrorKind
52+
LastErrorMessage string
53+
Status CheckStateStatus
54+
CheckType config.CheckType
55+
ServiceID string
56+
LastDuration time.Duration
57+
ConsecutiveSuccesses int
58+
ConsecutiveFailures int
59+
}
60+
61+
type CheckPolicy struct {
62+
CheckID string
63+
ServiceID string
64+
CheckType config.CheckType
65+
FailureThreshold int
66+
SuccessThreshold int
67+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package check
2+
3+
import (
4+
"context"
5+
6+
"github.com/pixel365/pulse/internal/model"
7+
)
8+
9+
type CheckStateRepository interface {
10+
GetCheckState(context.Context, string) (*model.CheckState, error)
11+
UpdateCheckState(context.Context, model.CheckState) error
12+
}

internal/repository/check/state.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package check
2+
3+
import (
4+
"context"
5+
6+
"github.com/pixel365/pulse/internal/model"
7+
)
8+
9+
var _ CheckStateRepository = (*StateCheck)(nil)
10+
11+
type StateCheck struct {
12+
}
13+
14+
func (s *StateCheck) GetCheckState(ctx context.Context, service string) (*model.CheckState, error) {
15+
return nil, nil
16+
}
17+
18+
func (s *StateCheck) UpdateCheckState(ctx context.Context, state model.CheckState) error {
19+
return nil
20+
}
21+
22+
func NewStateRepository() *StateCheck {
23+
return &StateCheck{}
24+
}

internal/services/check/handler.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package check
2+
3+
import (
4+
"context"
5+
6+
"github.com/pixel365/pulse/internal/model"
7+
)
8+
9+
var _ CheckHandlerService = (*Handler)(nil)
10+
11+
type Handler struct {
12+
svc CheckStateService
13+
}
14+
15+
func (h *Handler) Handle(
16+
ctx context.Context,
17+
policy model.CheckPolicy,
18+
result model.CheckExecutionResult,
19+
) error {
20+
return nil
21+
}
22+
23+
func NewHandlerService(svc CheckStateService) *Handler {
24+
return &Handler{
25+
svc: svc,
26+
}
27+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package check
2+
3+
import (
4+
"context"
5+
6+
"github.com/pixel365/pulse/internal/model"
7+
)
8+
9+
type CheckHandlerService interface {
10+
Handle(context.Context, model.CheckPolicy, model.CheckExecutionResult) error
11+
}
12+
13+
type CheckStateService interface {
14+
GetState(context.Context, string) (*model.CheckState, error)
15+
UpsertState(context.Context, model.CheckState) error
16+
}

internal/services/check/state.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package check
2+
3+
import (
4+
"context"
5+
6+
"github.com/pixel365/pulse/internal/model"
7+
"github.com/pixel365/pulse/internal/repository/check"
8+
)
9+
10+
var _ CheckStateService = (*State)(nil)
11+
12+
type State struct {
13+
repo check.CheckStateRepository
14+
}
15+
16+
func (s *State) GetState(ctx context.Context, service string) (*model.CheckState, error) {
17+
result, err := s.repo.GetCheckState(ctx, service)
18+
19+
return result, err
20+
}
21+
22+
func (s *State) UpsertState(ctx context.Context, state model.CheckState) error {
23+
err := s.repo.UpdateCheckState(ctx, state)
24+
25+
return err
26+
}
27+
28+
func NewStateService(repo check.CheckStateRepository) *State {
29+
return &State{repo: repo}
30+
}

internal/writer.go

Lines changed: 0 additions & 19 deletions
This file was deleted.

0 commit comments

Comments
 (0)