Skip to content

Commit 72d0280

Browse files
committed
Add metric and logging for activator-autoscaler connectivity
This change adds observability for the websocket connection between the activator and autoscaler components: - Add `activator_autoscaler_reachable` gauge metric (1=reachable, 0=not reachable) - Log ERROR when autoscaler is not reachable during stat sending - Add periodic connection status monitor (every 5s) to detect connection establishment failures - Add unit tests for the new AutoscalerConnectionStatusMonitor function The metric is recorded in two scenarios: 1. When SendRaw fails/succeeds during stat message sending 2. When the periodic status check detects connection not established This helps operators identify connectivity issues between activator and autoscaler that could impact autoscaling decisions.
1 parent a8803aa commit 72d0280

File tree

5 files changed

+145
-7
lines changed

5 files changed

+145
-7
lines changed

activator

85.1 MB
Binary file not shown.

cmd/activator/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,8 @@ func main() {
205205
logger.Info("Connecting to Autoscaler at ", autoscalerEndpoint)
206206
statSink := websocket.NewDurableSendingConnection(autoscalerEndpoint, logger)
207207
defer statSink.Shutdown()
208-
go activator.ReportStats(logger, statSink, statCh)
208+
go activator.ReportStats(logger, statSink, statCh, mp)
209+
go activator.AutoscalerConnectionStatusMonitor(ctx, logger, statSink, mp)
209210

210211
// Create and run our concurrency reporter
211212
concurrencyReporter := activatorhandler.NewConcurrencyReporter(ctx, env.PodName, statCh, mp)

pkg/activator/metrics.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
Copyright 2024 The Knative Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package activator
18+
19+
import (
20+
"go.opentelemetry.io/otel"
21+
"go.opentelemetry.io/otel/metric"
22+
)
23+
24+
var scopeName = "knative.dev/serving/pkg/activator"
25+
26+
type statReporterMetrics struct {
27+
autoscalerReachable metric.Int64Gauge
28+
}
29+
30+
func newStatReporterMetrics(mp metric.MeterProvider) *statReporterMetrics {
31+
var (
32+
m statReporterMetrics
33+
err error
34+
provider = mp
35+
)
36+
37+
if provider == nil {
38+
provider = otel.GetMeterProvider()
39+
}
40+
41+
meter := provider.Meter(scopeName)
42+
43+
m.autoscalerReachable, err = meter.Int64Gauge(
44+
"activator_autoscaler_reachable",
45+
metric.WithDescription("Whether the autoscaler is reachable from the activator (1 = reachable, 0 = not reachable)"),
46+
metric.WithUnit("{reachable}"),
47+
)
48+
if err != nil {
49+
panic(err)
50+
}
51+
52+
return &m
53+
}

pkg/activator/stat_reporter.go

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,18 @@ limitations under the License.
1717
package activator
1818

1919
import (
20+
"context"
21+
"time"
22+
2023
"github.com/gorilla/websocket"
24+
"go.opentelemetry.io/otel/metric"
2125
"go.uber.org/zap"
22-
"knative.dev/serving/pkg/autoscaler/metrics"
26+
asmetrics "knative.dev/serving/pkg/autoscaler/metrics"
27+
)
28+
29+
const (
30+
// connectionCheckInterval is how often to check the autoscaler connection status.
31+
connectionCheckInterval = 5 * time.Second
2332
)
2433

2534
// RawSender sends raw byte array messages with a message type
@@ -28,21 +37,55 @@ type RawSender interface {
2837
SendRaw(msgType int, msg []byte) error
2938
}
3039

40+
// StatusChecker checks the connection status.
41+
type StatusChecker interface {
42+
Status() error
43+
}
44+
45+
// AutoscalerConnectionStatusMonitor periodically checks if the autoscaler is reachable
46+
// and emits metrics and logs accordingly.
47+
func AutoscalerConnectionStatusMonitor(ctx context.Context, logger *zap.SugaredLogger, conn StatusChecker, mp metric.MeterProvider) {
48+
metrics := newStatReporterMetrics(mp)
49+
ticker := time.NewTicker(connectionCheckInterval)
50+
defer ticker.Stop()
51+
52+
for {
53+
select {
54+
case <-ctx.Done():
55+
return
56+
case <-ticker.C:
57+
if err := conn.Status(); err != nil {
58+
logger.Errorw("Autoscaler is not reachable from activator.",
59+
zap.Error(err))
60+
metrics.autoscalerReachable.Record(context.Background(), 0)
61+
} else {
62+
metrics.autoscalerReachable.Record(context.Background(), 1)
63+
}
64+
}
65+
}
66+
}
67+
3168
// ReportStats sends any messages received on the source channel to the sink.
3269
// The messages are sent on a goroutine to avoid blocking, which means that
3370
// messages may arrive out of order.
34-
func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []metrics.StatMessage) {
71+
func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []asmetrics.StatMessage, mp metric.MeterProvider) {
72+
metrics := newStatReporterMetrics(mp)
3573
for sms := range source {
36-
go func(sms []metrics.StatMessage) {
37-
wsms := metrics.ToWireStatMessages(sms)
74+
go func(sms []asmetrics.StatMessage) {
75+
wsms := asmetrics.ToWireStatMessages(sms)
3876
b, err := wsms.Marshal()
3977
if err != nil {
4078
logger.Errorw("Error while marshaling stats", zap.Error(err))
4179
return
4280
}
4381

4482
if err := sink.SendRaw(websocket.BinaryMessage, b); err != nil {
45-
logger.Errorw("Error while sending stats", zap.Error(err))
83+
logger.Errorw("Autoscaler is not reachable from activator. Stats were not sent.",
84+
zap.Error(err),
85+
zap.Int("stat_message_count", len(sms)))
86+
metrics.autoscalerReachable.Record(context.Background(), 0)
87+
} else {
88+
metrics.autoscalerReachable.Record(context.Background(), 1)
4689
}
4790
}(sms)
4891
}

pkg/activator/stat_reporter_test.go

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package activator
1818

1919
import (
20+
"context"
21+
"errors"
2022
"testing"
2123
"time"
2224

@@ -43,7 +45,7 @@ func TestReportStats(t *testing.T) {
4345
})
4446

4547
defer close(ch)
46-
go ReportStats(logger, sink, ch)
48+
go ReportStats(logger, sink, ch, nil)
4749

4850
inputs := [][]metrics.StatMessage{{{
4951
Key: types.NamespacedName{Name: "first-a"},
@@ -95,3 +97,42 @@ type sendRawFunc func(msgType int, msg []byte) error
9597
func (fn sendRawFunc) SendRaw(msgType int, msg []byte) error {
9698
return fn(msgType, msg)
9799
}
100+
101+
type statusCheckerFunc func() error
102+
103+
func (fn statusCheckerFunc) Status() error {
104+
return fn()
105+
}
106+
107+
func TestAutoscalerConnectionStatusMonitor(t *testing.T) {
108+
tests := []struct {
109+
name string
110+
statusErr error
111+
}{{
112+
name: "connection established",
113+
statusErr: nil,
114+
}, {
115+
name: "connection not established",
116+
statusErr: errors.New("connection not established"),
117+
}}
118+
119+
for _, tt := range tests {
120+
t.Run(tt.name, func(t *testing.T) {
121+
logger := logtesting.TestLogger(t)
122+
ctx, cancel := context.WithCancel(context.Background())
123+
124+
checker := statusCheckerFunc(func() error {
125+
return tt.statusErr
126+
})
127+
128+
// Start the monitor
129+
go AutoscalerConnectionStatusMonitor(ctx, logger, checker, nil)
130+
131+
// Wait for at least one check to complete
132+
time.Sleep(6 * time.Second)
133+
134+
// Cancel context to stop the monitor
135+
cancel()
136+
})
137+
}
138+
}

0 commit comments

Comments
 (0)