Skip to content

Commit 06fb986

Browse files
committed
disconnect reason for subchannel metrics (A94)
1 parent b6597b3 commit 06fb986

File tree

5 files changed

+379
-22
lines changed

5 files changed

+379
-22
lines changed

balancer/pickfirst/metrics_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"context"
2323
"fmt"
2424
"testing"
25+
"time"
2526

2627
"google.golang.org/grpc"
2728
"google.golang.org/grpc/balancer/pickfirst"
@@ -291,3 +292,125 @@ func metricsDataFromReader(ctx context.Context, reader *metric.ManualReader) map
291292
}
292293
return gotMetrics
293294
}
295+
296+
func (s) TestDisconnectLabel(t *testing.T) {
297+
// 1. Valid GOAWAY
298+
// Server GracefulStop sends GOAWAY with active streams = 0.
299+
// This usually sends NoError(0) code.
300+
t.Run("GoAway", func(t *testing.T) {
301+
runDisconnectLabelTest(t, "GOAWAY NO_ERROR", func(ss *stubserver.StubServer) {
302+
ss.S.GracefulStop()
303+
// GracefulStop waits for connections to close, which happens after
304+
// GOAWAY is sent.
305+
})
306+
})
307+
308+
// 2. IO Error
309+
// Server Stop closes the listener and active connections immediately.
310+
// This often results in "connection reset" or "EOF" (unknown) depending on timing/OS.
311+
// Let's check for "unknown" or "connection reset" or "subchannel shutdown" strictly.
312+
// In this test env, it often results in io.EOF which we mapped to "unknown".
313+
t.Run("IO_Error", func(t *testing.T) {
314+
runDisconnectLabelTest(t, "unknown", func(ss *stubserver.StubServer) {
315+
ss.Stop()
316+
})
317+
})
318+
319+
// Scenario 3: Unknown (Client closes - voluntary? actually client close might be UNKNOWN or not recorded as split)
320+
// If client closes, we might not record "disconnections" metric from ClientConn perspective?
321+
// disconnections metric is "Number of times the selected subchannel becomes disconnected".
322+
// If we close 'cc', we tear down subchannels.
323+
// But let's try to trigger a case where we just disconnect without server side action?
324+
// Or maybe "unknown" is what we get for "Idle" timeout?
325+
// Let's stick to IO and GoAway first which are explicit in A94.
326+
}
327+
328+
func runDisconnectLabelTest(t *testing.T, wantLabel string, triggerFunc func(*stubserver.StubServer)) {
329+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
330+
defer cancel()
331+
332+
ss := &stubserver.StubServer{
333+
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
334+
return &testpb.Empty{}, nil
335+
},
336+
}
337+
ss.StartServer()
338+
defer ss.Stop() // Cleanup in case triggerFunc didn't fully stop or strict cleanup needed
339+
340+
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(pfConfig)
341+
r := manual.NewBuilderWithScheme("whatever")
342+
r.InitialState(resolver.State{
343+
ServiceConfig: sc,
344+
Addresses: []resolver.Address{{Addr: ss.Address}},
345+
})
346+
347+
grpcTarget := r.Scheme() + ":///"
348+
reader := metric.NewManualReader()
349+
provider := metric.NewMeterProvider(metric.WithReader(reader))
350+
mo := opentelemetry.MetricsOptions{
351+
MeterProvider: provider,
352+
Metrics: opentelemetry.DefaultMetrics().Add("grpc.subchannel.disconnections"),
353+
OptionalLabels: []string{"grpc.disconnect_error"},
354+
}
355+
356+
cc, err := grpc.NewClient(grpcTarget, opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo}), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
357+
if err != nil {
358+
t.Fatalf("NewClient() failed: %v", err)
359+
}
360+
defer cc.Close()
361+
362+
tsc := testgrpc.NewTestServiceClient(cc)
363+
// Ensure connected
364+
if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
365+
t.Fatalf("EmptyCall() failed: %v", err)
366+
}
367+
368+
// Trigger disconnection
369+
triggerFunc(ss)
370+
371+
// Wait for Idle state (disconnection happened)
372+
testutils.AwaitState(ctx, t, cc, connectivity.Idle)
373+
374+
// Verify metrics
375+
376+
gotMetrics := metricsDataFromReader(ctx, reader)
377+
val, ok := gotMetrics["grpc.subchannel.disconnections"]
378+
if !ok {
379+
t.Fatalf("Metric grpc.subchannel.disconnections not found")
380+
}
381+
382+
// We used AssertEqual in the other test, let's use it here too if available.
383+
// But checking attributes manually might be safer if AssertEqual is strict on other optional fields.
384+
// Let's iterate datapoints.
385+
start := time.Now()
386+
for {
387+
points := val.Data.(metricdata.Sum[int64]).DataPoints
388+
if len(points) == 0 {
389+
t.Fatalf("No data points for disconnections")
390+
}
391+
dp := points[0]
392+
// Check attributes
393+
seenLabel := false
394+
var foundAttrs []string
395+
for _, kv := range dp.Attributes.ToSlice() {
396+
foundAttrs = append(foundAttrs, fmt.Sprintf("%s=%s", kv.Key, kv.Value.AsString()))
397+
if kv.Key == "grpc.disconnect_error" {
398+
seenLabel = true
399+
if kv.Value.AsString() != wantLabel {
400+
t.Errorf("Want label %q, got %q", wantLabel, kv.Value.AsString())
401+
}
402+
}
403+
}
404+
if !seenLabel && wantLabel != "" {
405+
if time.Since(start) > time.Second {
406+
t.Errorf("Label grpc.disconnect_error missing. Found attributes: %v", foundAttrs)
407+
}
408+
}
409+
if seenLabel || time.Since(start) > time.Second {
410+
break
411+
}
412+
time.Sleep(10 * time.Millisecond)
413+
gotMetrics = metricsDataFromReader(ctx, reader)
414+
val = gotMetrics["grpc.subchannel.disconnections"]
415+
}
416+
}

clientconn.go

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,16 @@ import (
2424
"fmt"
2525
"math"
2626
"net/url"
27+
"os"
2728
"slices"
2829
"strings"
2930
"sync"
3031
"sync/atomic"
32+
"syscall"
3133
"time"
3234

35+
"golang.org/x/net/http2"
36+
3337
"google.golang.org/grpc/balancer"
3438
"google.golang.org/grpc/balancer/base"
3539
"google.golang.org/grpc/balancer/pickfirst"
@@ -1270,6 +1274,7 @@ type addrConn struct {
12701274

12711275
localityLabel string
12721276
backendServiceLabel string
1277+
disconnectError string
12731278
}
12741279

12751280
// Note: this requires a lock on ac.mu.
@@ -1286,9 +1291,14 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
12861291
// TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second
12871292
// part of the if condition below once the issue is fixed.
12881293
if ac.state == connectivity.Ready || (ac.state == connectivity.Connecting && s == connectivity.Idle) {
1289-
disconnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel, "unknown")
1294+
disconnectError := ac.disconnectError
1295+
if disconnectError == "" {
1296+
disconnectError = "unknown"
1297+
}
1298+
disconnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel, disconnectError)
12901299
openConnectionsMetric.Record(ac.cc.metricsRecorderList, -1, ac.cc.target, ac.backendServiceLabel, ac.securityLevelLocked(), ac.localityLabel)
12911300
}
1301+
ac.disconnectError = "" // Reset for next time
12921302
ac.state = s
12931303
ac.channelz.ChannelMetrics.State.Store(&s)
12941304
if lastErr == nil {
@@ -1483,7 +1493,7 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,
14831493
addr.ServerName = ac.cc.getServerName(addr)
14841494
hctx, hcancel := context.WithCancel(ctx)
14851495

1486-
onClose := func(r transport.GoAwayReason) {
1496+
onClose := func(r transport.GoAwayReason, goAwayCode http2.ErrCode, err error) {
14871497
ac.mu.Lock()
14881498
defer ac.mu.Unlock()
14891499
// adjust params based on GoAwayReason
@@ -1504,6 +1514,7 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,
15041514
return
15051515
}
15061516
ac.transport = nil
1517+
ac.disconnectError = disconnectErrorString(r, goAwayCode, err)
15071518
// Refresh the name resolver on any connection loss.
15081519
ac.cc.resolveNow(resolver.ResolveNowOptions{})
15091520
// Always go idle and wait for the LB policy to initiate a new
@@ -1560,6 +1571,31 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,
15601571
return nil
15611572
}
15621573

1574+
func disconnectErrorString(r transport.GoAwayReason, goAwayCode http2.ErrCode, err error) string {
1575+
if r != transport.GoAwayInvalid {
1576+
return fmt.Sprintf("GOAWAY %s", goAwayCode.String())
1577+
}
1578+
if err == nil {
1579+
return "unknown"
1580+
}
1581+
if errors.Is(err, context.Canceled) {
1582+
return "subchannel shutdown"
1583+
}
1584+
if errors.Is(err, syscall.ECONNRESET) {
1585+
return "connection reset"
1586+
}
1587+
if errors.Is(err, syscall.ETIMEDOUT) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, os.ErrDeadlineExceeded) {
1588+
return "connection timed out"
1589+
}
1590+
if errors.Is(err, syscall.ECONNABORTED) {
1591+
return "connection aborted"
1592+
}
1593+
if errors.Is(err, syscall.ECONNREFUSED) {
1594+
return "socket error"
1595+
}
1596+
return "unknown"
1597+
}
1598+
15631599
// startHealthCheck starts the health checking stream (RPC) to watch the health
15641600
// stats of this connection if health checking is requested and configured.
15651601
//
@@ -1665,6 +1701,9 @@ func (ac *addrConn) tearDown(err error) {
16651701
ac.transport = nil
16661702
// We have to set the state to Shutdown before anything else to prevent races
16671703
// between setting the state and logic that waits on context cancellation / etc.
1704+
if ac.disconnectError == "" {
1705+
ac.disconnectError = "subchannel shutdown"
1706+
}
16681707
ac.updateConnectivityState(connectivity.Shutdown, nil)
16691708
ac.cancel()
16701709
ac.curAddr = resolver.Address{}

internal/transport/http2_client.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ type http2Client struct {
134134
// goAwayDebugMessage contains a detailed human readable string about a
135135
// GoAway frame, useful for error messages.
136136
goAwayDebugMessage string
137+
// goAwayCode records the http2.ErrCode received with the GoAway frame.
138+
goAwayCode http2.ErrCode
137139
// A condition variable used to signal when the keepalive goroutine should
138140
// go dormant. The condition for dormancy is based on the number of active
139141
// streams and the `PermitWithoutStream` keepalive client parameter. And
@@ -147,7 +149,7 @@ type http2Client struct {
147149

148150
channelz *channelz.Socket
149151

150-
onClose func(GoAwayReason)
152+
onClose func(GoAwayReason, http2.ErrCode, error)
151153

152154
bufferPool mem.BufferPool
153155

@@ -204,7 +206,7 @@ func isTemporary(err error) bool {
204206
// NewHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
205207
// and starts to receive messages on it. Non-nil error returns if construction
206208
// fails.
207-
func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ ClientTransport, err error) {
209+
func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason, http2.ErrCode, error)) (_ ClientTransport, err error) {
208210
scheme := "http"
209211
ctx, cancel := context.WithCancel(ctx)
210212
defer func() {
@@ -1015,7 +1017,7 @@ func (t *http2Client) Close(err error) {
10151017
// Call t.onClose ASAP to prevent the client from attempting to create new
10161018
// streams.
10171019
if t.state != draining {
1018-
t.onClose(GoAwayInvalid)
1020+
t.onClose(GoAwayInvalid, http2.ErrCodeNo, err)
10191021
}
10201022
t.state = closing
10211023
streams := t.activeStreams
@@ -1086,7 +1088,7 @@ func (t *http2Client) GracefulClose() {
10861088
if t.logger.V(logLevel) {
10871089
t.logger.Infof("GracefulClose called")
10881090
}
1089-
t.onClose(GoAwayInvalid)
1091+
t.onClose(GoAwayInvalid, http2.ErrCodeNo, nil)
10901092
t.state = draining
10911093
active := len(t.activeStreams)
10921094
t.mu.Unlock()
@@ -1372,7 +1374,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error {
13721374
// draining, to allow the client to stop attempting to create streams
13731375
// before disallowing new streams on this connection.
13741376
if t.state != draining {
1375-
t.onClose(t.goAwayReason)
1377+
t.onClose(t.goAwayReason, t.goAwayCode, nil)
13761378
t.state = draining
13771379
}
13781380
}
@@ -1406,22 +1408,26 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error {
14061408
return nil
14071409
}
14081410

1411+
// setGoAwayReason sets the value of t.goAwayReason based
1412+
// on the GoAway frame received.
1413+
// It expects a lock on transport's mutex to be held by
1414+
// the caller.
14091415
// setGoAwayReason sets the value of t.goAwayReason based
14101416
// on the GoAway frame received.
14111417
// It expects a lock on transport's mutex to be held by
14121418
// the caller.
14131419
func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
14141420
t.goAwayReason = GoAwayNoReason
1421+
t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode)
1422+
if len(f.DebugData()) > 0 {
1423+
t.goAwayDebugMessage += fmt.Sprintf(", debug data: %q", string(f.DebugData()))
1424+
}
14151425
if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
14161426
if string(f.DebugData()) == "too_many_pings" {
14171427
t.goAwayReason = GoAwayTooManyPings
14181428
}
14191429
}
1420-
if len(f.DebugData()) == 0 {
1421-
t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode)
1422-
} else {
1423-
t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %q", f.ErrCode, string(f.DebugData()))
1424-
}
1430+
t.goAwayCode = f.ErrCode
14251431
}
14261432

14271433
func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) {

0 commit comments

Comments
 (0)