Skip to content

Commit 7f640cd

Browse files
committed
otel metrics wip
Signed-off-by: sami <sami@appscode.com>
1 parent 709ba34 commit 7f640cd

File tree

409 files changed

+53461
-3744
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

409 files changed

+53461
-3744
lines changed

apis/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,14 @@ type Config struct {
2424
Database *DatabaseCfg `valid:"required" json:"database" mapstructure:"database"`
2525
Publisher *PublisherCfg `valid:"required" json:"publisher" mapstructure:"publisher"`
2626
Logger *Logger `valid:"required" json:"logger" mapstructure:"logger"`
27+
Telemetry *TelemetryCfg `json:"telemetry" mapstructure:"telemetry"`
28+
}
29+
30+
// TelemetryCfg holds telemetry and metrics configuration.
31+
type TelemetryCfg struct {
32+
// PrometheusEnabled enables the Prometheus metrics endpoint at /metrics.
33+
// Defaults to true if not specified.
34+
PrometheusEnabled bool `json:"prometheusEnabled" mapstructure:"prometheusEnabled"`
2735
}
2836

2937
// ListenerCfg path of the listener config.

apis/metrics.go

Lines changed: 189 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,209 @@
1+
/*
2+
Copyright AppsCode Inc. and Contributors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
117
package apis
218

319
import (
4-
"github.com/prometheus/client_golang/prometheus"
5-
"github.com/prometheus/client_golang/prometheus/promauto"
20+
"context"
21+
22+
"go.opentelemetry.io/otel"
23+
"go.opentelemetry.io/otel/attribute"
24+
"go.opentelemetry.io/otel/metric"
25+
)
26+
27+
const (
28+
meterName = "kubeops.dev/pgoutbox"
29+
30+
attrKeyTable = "table"
31+
attrKeySubject = "subject"
32+
attrKeyKind = "kind"
633
)
734

8-
// Metrics Prometheus metrics.
935
type Metrics struct {
10-
filterSkippedEvents, publishedEvents, problematicEvents *prometheus.CounterVec
36+
// Counters
37+
eventsPublished metric.Int64Counter
38+
eventsFiltered metric.Int64Counter
39+
eventsFailed metric.Int64Counter
40+
41+
// Histograms
42+
processingDuration metric.Float64Histogram
43+
publishDuration metric.Float64Histogram
44+
45+
// Gauges (observable)
46+
replicationLagBytes metric.Int64ObservableGauge
47+
currentLSN metric.Int64ObservableGauge
1148
}
1249

13-
const (
14-
labelApp = "app"
15-
labelTable = "table"
16-
labelSubject = "subject"
17-
labelKind = "kind"
18-
)
50+
func NewMetrics() (*Metrics, error) {
51+
meter := otel.Meter(meterName)
52+
53+
eventsPublished, err := meter.Int64Counter(
54+
"pgoutbox.events.published",
55+
metric.WithDescription("Total number of successfully published events"),
56+
metric.WithUnit("{event}"),
57+
)
58+
if err != nil {
59+
return nil, err
60+
}
61+
62+
eventsFiltered, err := meter.Int64Counter(
63+
"pgoutbox.events.filtered",
64+
metric.WithDescription("Total number of events skipped by filter"),
65+
metric.WithUnit("{event}"),
66+
)
67+
if err != nil {
68+
return nil, err
69+
}
70+
71+
eventsFailed, err := meter.Int64Counter(
72+
"pgoutbox.events.failed",
73+
metric.WithDescription("Total number of events that failed processing"),
74+
metric.WithUnit("{event}"),
75+
)
76+
if err != nil {
77+
return nil, err
78+
}
79+
80+
processingDuration, err := meter.Float64Histogram(
81+
"pgoutbox.processing.duration",
82+
metric.WithDescription("Time to process a WAL message end-to-end"),
83+
metric.WithUnit("s"),
84+
metric.WithExplicitBucketBoundaries(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10),
85+
)
86+
if err != nil {
87+
return nil, err
88+
}
89+
90+
publishDuration, err := meter.Float64Histogram(
91+
"pgoutbox.publish.duration",
92+
metric.WithDescription("Time to publish an event to the message broker"),
93+
metric.WithUnit("s"),
94+
metric.WithExplicitBucketBoundaries(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5),
95+
)
96+
if err != nil {
97+
return nil, err
98+
}
99+
100+
replicationLagBytes, err := meter.Int64ObservableGauge(
101+
"pgoutbox.replication.lag.bytes",
102+
metric.WithDescription("Replication lag in bytes (difference between server WAL end and current LSN)"),
103+
metric.WithUnit("By"),
104+
)
105+
if err != nil {
106+
return nil, err
107+
}
108+
109+
currentLSN, err := meter.Int64ObservableGauge(
110+
"pgoutbox.replication.lsn",
111+
metric.WithDescription("Current LSN position in the replication stream"),
112+
metric.WithUnit("{position}"),
113+
)
114+
if err != nil {
115+
return nil, err
116+
}
19117

20-
// NewMetrics create and initialize new Prometheus metrics.
21-
func NewMetrics() *Metrics {
22118
return &Metrics{
23-
publishedEvents: promauto.NewCounterVec(prometheus.CounterOpts{
24-
Name: "published_events_total",
25-
Help: "The total number of published events",
26-
},
27-
[]string{labelApp, labelSubject, labelTable},
28-
),
29-
problematicEvents: promauto.NewCounterVec(prometheus.CounterOpts{
30-
Name: "problematic_events_total",
31-
Help: "The total number of skipped problematic events",
32-
},
33-
[]string{labelApp, labelKind},
34-
),
35-
filterSkippedEvents: promauto.NewCounterVec(prometheus.CounterOpts{
36-
Name: "filter_skipped_events_total",
37-
Help: "The total number of skipped events",
38-
},
39-
[]string{labelApp, labelTable},
119+
eventsPublished: eventsPublished,
120+
eventsFiltered: eventsFiltered,
121+
eventsFailed: eventsFailed,
122+
processingDuration: processingDuration,
123+
publishDuration: publishDuration,
124+
replicationLagBytes: replicationLagBytes,
125+
currentLSN: currentLSN,
126+
}, nil
127+
}
128+
129+
func (m *Metrics) IncPublishedEvents(subject, table string) {
130+
if m == nil || m.eventsPublished == nil {
131+
return
132+
}
133+
m.eventsPublished.Add(context.Background(), 1,
134+
metric.WithAttributes(
135+
attribute.String(attrKeySubject, subject),
136+
attribute.String(attrKeyTable, table),
40137
),
138+
)
139+
}
140+
141+
func (m *Metrics) IncFilterSkippedEvents(table string) {
142+
if m == nil || m.eventsFiltered == nil {
143+
return
41144
}
145+
m.eventsFiltered.Add(context.Background(), 1,
146+
metric.WithAttributes(attribute.String(attrKeyTable, table)),
147+
)
42148
}
43149

44-
const appName = "pgoutbox"
150+
func (m *Metrics) IncProblematicEvents(kind string) {
151+
if m == nil || m.eventsFailed == nil {
152+
return
153+
}
154+
m.eventsFailed.Add(context.Background(), 1,
155+
metric.WithAttributes(attribute.String(attrKeyKind, kind)),
156+
)
157+
}
158+
159+
func (m *Metrics) RecordProcessingDuration(seconds float64) {
160+
if m == nil || m.processingDuration == nil {
161+
return
162+
}
163+
m.processingDuration.Record(context.Background(), seconds)
164+
}
45165

46-
// IncPublishedEvents increment published events counter.
47-
func (m Metrics) IncPublishedEvents(subject, table string) {
48-
m.publishedEvents.With(prometheus.Labels{labelApp: appName, labelSubject: subject, labelTable: table}).Inc()
166+
func (m *Metrics) RecordPublishDuration(seconds float64, subject string) {
167+
if m == nil || m.publishDuration == nil {
168+
return
169+
}
170+
m.publishDuration.Record(context.Background(), seconds,
171+
metric.WithAttributes(attribute.String(attrKeySubject, subject)),
172+
)
49173
}
50174

51-
// IncFilterSkippedEvents increment skipped by filter events counter.
52-
func (m Metrics) IncFilterSkippedEvents(table string) {
53-
m.filterSkippedEvents.With(prometheus.Labels{labelApp: appName, labelTable: table}).Inc()
175+
type GaugeCallbacks struct {
176+
GetCurrentLSN func() uint64
177+
GetServerLSN func() uint64
54178
}
55179

56-
// IncProblematicEvents increment skipped by filter events counter.
57-
func (m Metrics) IncProblematicEvents(kind string) {
58-
m.problematicEvents.With(prometheus.Labels{labelApp: appName, labelKind: kind}).Inc()
180+
func (m *Metrics) RegisterCallbacks(cb GaugeCallbacks) error {
181+
if m == nil {
182+
return nil
183+
}
184+
185+
meter := otel.Meter(meterName)
186+
187+
_, err := meter.RegisterCallback(
188+
func(ctx context.Context, o metric.Observer) error {
189+
if cb.GetCurrentLSN != nil {
190+
currentLSN := cb.GetCurrentLSN()
191+
o.ObserveInt64(m.currentLSN, int64(currentLSN))
192+
if cb.GetServerLSN != nil {
193+
serverLSN := cb.GetServerLSN()
194+
if serverLSN > currentLSN {
195+
o.ObserveInt64(m.replicationLagBytes, int64(serverLSN-currentLSN))
196+
} else {
197+
o.ObserveInt64(m.replicationLagBytes, 0)
198+
}
199+
}
200+
}
201+
202+
return nil
203+
},
204+
m.currentLSN,
205+
m.replicationLagBytes,
206+
)
207+
208+
return err
59209
}

cmd/pgoutbox/main.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package main
1818

1919
import (
20+
"context"
2021
"encoding/binary"
2122
"fmt"
2223
"log/slog"
@@ -29,6 +30,7 @@ import (
2930
"kubeops.dev/pgoutbox/apis"
3031
"kubeops.dev/pgoutbox/internal/listener"
3132
"kubeops.dev/pgoutbox/internal/listener/transaction"
33+
"kubeops.dev/pgoutbox/internal/telemetry"
3234

3335
"github.com/urfave/cli/v2"
3436
)
@@ -85,6 +87,19 @@ func main() {
8587

8688
logger := apis.InitSlog(cfg.Logger, version, false)
8789

90+
if cfg.Telemetry == nil || cfg.Telemetry.PrometheusEnabled {
91+
if err = telemetry.InitMetrics(ctx, version); err != nil {
92+
return fmt.Errorf("initialize telemetry: %w", err)
93+
}
94+
defer func() {
95+
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
96+
defer shutdownCancel()
97+
if err := telemetry.Shutdown(shutdownCtx); err != nil {
98+
slog.Error("telemetry shutdown failed", "err", err)
99+
}
100+
}()
101+
}
102+
88103
pgxConn, pgConn, err := initPgxConnections(cfg.Database, logger, time.Minute*10)
89104
if err != nil {
90105
return fmt.Errorf("pgx connection: %w", err)
@@ -104,16 +119,31 @@ func main() {
104119
}
105120
}()
106121

122+
metrics, err := apis.NewMetrics()
123+
if err != nil {
124+
return fmt.Errorf("initialize metrics: %w", err)
125+
}
126+
127+
repo := listener.NewRepository(pgxConn)
128+
repl := newReplicationConn(pgConn)
129+
107130
svc := listener.NewWalListener(
108131
cfg,
109132
logger,
110-
listener.NewRepository(pgxConn),
111-
newReplicationConn(pgConn),
133+
repo,
134+
repl,
112135
pub,
113136
transaction.NewBinaryParser(logger, binary.BigEndian),
114-
apis.NewMetrics(),
137+
metrics,
115138
)
116139

140+
if err := metrics.RegisterCallbacks(apis.GaugeCallbacks{
141+
GetCurrentLSN: func() uint64 { return uint64(svc.ReadLSN()) },
142+
GetServerLSN: func() uint64 { return uint64(svc.ReadServerLSN()) },
143+
}); err != nil {
144+
return fmt.Errorf("register metrics callbacks: %w", err)
145+
}
146+
117147
go svc.InitHandlers(ctx)
118148

119149
if err = svc.Process(ctx); err != nil {

0 commit comments

Comments
 (0)