Skip to content

Commit b80f4aa

Browse files
yuluo-yxlynx009
andauthored
feat: add base metrics endpoint (#37)
Co-authored-by: lynx009 <2030509072@qq.com>
1 parent 934b42a commit b80f4aa

File tree

5 files changed

+199
-60
lines changed

5 files changed

+199
-60
lines changed

etc/hertzbeat-collector.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,6 @@ collector:
3333
# Collector identity and mode
3434
identity: hertzbeat-collector-go
3535
mode: public
36+
37+
metrics:
38+
port: 9090

internal/cmd/server.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
cfgloader "hertzbeat.apache.org/hertzbeat-collector-go/internal/config"
3232
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect"
3333
jobserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/server"
34+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/metrics"
3435
clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
3536
transportserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
3637
collectortypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector"
@@ -109,13 +110,15 @@ func server(ctx context.Context, logOut io.Writer) error {
109110
}
110111

111112
func startRunners(ctx context.Context, cfg *clrserver.Server) error {
113+
112114
// Create transport server first
113-
transportRunner := transportserver.NewFromConfig(cfg.Config)
115+
transportRunner := transportserver.New(cfg)
114116

115117
// Create lazy message router that can get transport client when needed
116118
messageRouter := collect.NewLazyMessageRouter(transportRunner, cfg.Logger, cfg.Config.Collector.Identity)
117119

118120
// Create job server with message router
121+
// todo optimize not depend server start!
119122
jobRunner := jobserver.New(&jobserver.Config{
120123
Server: *cfg,
121124
MessageRouter: messageRouter,
@@ -124,12 +127,15 @@ func startRunners(ctx context.Context, cfg *clrserver.Server) error {
124127
// Connect transport to job scheduler
125128
transportRunner.SetJobScheduler(jobRunner)
126129

130+
// Create metrics runner
131+
metricsRunner := metrics.New(cfg)
132+
127133
runners := []struct {
128134
runner Runner[collectortypes.Info]
129135
}{
136+
{metricsRunner},
130137
{jobRunner},
131138
{transportRunner},
132-
// todo; add metrics
133139
}
134140

135141
errCh := make(chan error, len(runners))

internal/metrics/metrics.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,137 @@
1818
*/
1919

2020
package metrics
21+
22+
import (
23+
"context"
24+
"fmt"
25+
"net"
26+
"net/http"
27+
"time"
28+
29+
"github.com/prometheus/client_golang/prometheus"
30+
"github.com/prometheus/client_golang/prometheus/promhttp"
31+
clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
32+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector"
33+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
34+
)
35+
36+
const (
37+
Namespace = "hertzbeat"
38+
Subsystem = "collector"
39+
)
40+
41+
var (
42+
// JobExecutionTotal counts the total number of job executions
43+
JobExecutionTotal = prometheus.NewCounterVec(
44+
prometheus.CounterOpts{
45+
Namespace: Namespace,
46+
Subsystem: Subsystem,
47+
Name: "job_execution_total",
48+
Help: "Total number of job executions",
49+
},
50+
[]string{"status", "type"},
51+
)
52+
53+
// JobExecutionDuration tracks the duration of job executions
54+
JobExecutionDuration = prometheus.NewHistogramVec(
55+
prometheus.HistogramOpts{
56+
Namespace: Namespace,
57+
Subsystem: Subsystem,
58+
Name: "job_execution_duration_seconds",
59+
Help: "Duration of job executions in seconds",
60+
Buckets: prometheus.DefBuckets,
61+
},
62+
[]string{"type"},
63+
)
64+
65+
// CollectorUp indicates if the collector is up
66+
CollectorUp = prometheus.NewGauge(
67+
prometheus.GaugeOpts{
68+
Namespace: Namespace,
69+
Subsystem: Subsystem,
70+
Name: "up",
71+
Help: "1 if the collector is up, 0 otherwise",
72+
},
73+
)
74+
)
75+
76+
func init() {
77+
// Register metrics with the global prometheus registry
78+
prometheus.MustRegister(JobExecutionTotal)
79+
prometheus.MustRegister(JobExecutionDuration)
80+
prometheus.MustRegister(CollectorUp)
81+
CollectorUp.Set(1)
82+
}
83+
84+
// Runner implements the metrics server runner
85+
type Runner struct {
86+
cfg *clrserver.Server
87+
server *http.Server
88+
}
89+
90+
// New creates a new metrics runner
91+
func New(cfg *clrserver.Server) *Runner {
92+
93+
return &Runner{cfg: cfg}
94+
}
95+
96+
// Start starts the metrics server
97+
func (r *Runner) Start(ctx context.Context) error {
98+
99+
// init logger
100+
mlog := r.initLogs()
101+
102+
mux := http.NewServeMux()
103+
mux.Handle("/metrics", promhttp.Handler())
104+
105+
addr := fmt.Sprintf(":%d", r.cfg.Config.Collector.MetricsConfig.Port)
106+
r.server = &http.Server{
107+
Addr: addr,
108+
Handler: mux,
109+
ReadTimeout: 5 * time.Second,
110+
ReadHeaderTimeout: 5 * time.Second,
111+
WriteTimeout: 10 * time.Second,
112+
IdleTimeout: 15 * time.Second,
113+
MaxHeaderBytes: 1 << 20, // 1 MB
114+
BaseContext: func(_ net.Listener) context.Context {
115+
return ctx
116+
},
117+
}
118+
119+
mlog.Info("Starting metrics server", "addr", addr)
120+
121+
go func() {
122+
if err := r.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
123+
mlog.Error(err, "Metrics server failed")
124+
}
125+
}()
126+
127+
<-ctx.Done()
128+
return r.Close()
129+
}
130+
131+
// Info returns the runner info
132+
func (r *Runner) Info() collector.Info {
133+
134+
return collector.Info{
135+
Name: "metrics-server",
136+
}
137+
}
138+
139+
// Close closes the metrics server
140+
func (r *Runner) Close() error {
141+
142+
if r.server != nil {
143+
r.initLogs().Info("Shutting down metrics server")
144+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
145+
defer cancel()
146+
return r.server.Shutdown(ctx)
147+
}
148+
return nil
149+
}
150+
151+
func (r *Runner) initLogs() logger.Logger {
152+
153+
return r.cfg.Logger.WithName("metrics")
154+
}

0 commit comments

Comments
 (0)