Skip to content

Commit 9ff80a7

Browse files
authored
transport: Increment metrics only when the stream is active (#8573)
Fixes: #8529 This PR fixes to increment metrics only when the stream is active which is found in the activeStreams map. #### as-is - The deleteStream was incrementing channelz metrics every time it was called, even when stream was already removed from activeStreams or not exists in activeStreams. #### to-be - Added check to ensure metrics are only incremented once when a stream is actually removed from activeStreams. RELEASE NOTES: * server: Fix a bug that caused overcounting of channelz metrics for successful and failed streams.
1 parent c9cca95 commit 9ff80a7

File tree

2 files changed

+129
-3
lines changed

2 files changed

+129
-3
lines changed

internal/transport/http2_server.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,15 @@ import (
3535

3636
"golang.org/x/net/http2"
3737
"golang.org/x/net/http2/hpack"
38+
"google.golang.org/protobuf/proto"
39+
3840
"google.golang.org/grpc/internal"
3941
"google.golang.org/grpc/internal/grpclog"
4042
"google.golang.org/grpc/internal/grpcutil"
4143
"google.golang.org/grpc/internal/pretty"
4244
istatus "google.golang.org/grpc/internal/status"
4345
"google.golang.org/grpc/internal/syscall"
4446
"google.golang.org/grpc/mem"
45-
"google.golang.org/protobuf/proto"
4647

4748
"google.golang.org/grpc/codes"
4849
"google.golang.org/grpc/credentials"
@@ -1304,15 +1305,16 @@ func (t *http2Server) Close(err error) {
13041305
// deleteStream deletes the stream s from transport's active streams.
13051306
func (t *http2Server) deleteStream(s *ServerStream, eosReceived bool) {
13061307
t.mu.Lock()
1307-
if _, ok := t.activeStreams[s.id]; ok {
1308+
_, isActive := t.activeStreams[s.id]
1309+
if isActive {
13081310
delete(t.activeStreams, s.id)
13091311
if len(t.activeStreams) == 0 {
13101312
t.idle = time.Now()
13111313
}
13121314
}
13131315
t.mu.Unlock()
13141316

1315-
if channelz.IsOn() {
1317+
if isActive && channelz.IsOn() {
13161318
if eosReceived {
13171319
t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
13181320
} else {

internal/transport/transport_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,11 @@ import (
3939
"github.com/google/go-cmp/cmp"
4040
"golang.org/x/net/http2"
4141
"golang.org/x/net/http2/hpack"
42+
4243
"google.golang.org/grpc/attributes"
4344
"google.golang.org/grpc/codes"
4445
"google.golang.org/grpc/credentials"
46+
"google.golang.org/grpc/internal"
4547
"google.golang.org/grpc/internal/channelz"
4648
"google.golang.org/grpc/internal/grpctest"
4749
"google.golang.org/grpc/internal/leakcheck"
@@ -3260,3 +3262,125 @@ func (s) TestClientTransport_Handle1xxHeaders(t *testing.T) {
32603262
})
32613263
}
32623264
}
3265+
3266+
func (s) TestDeleteStreamMetricsIncrementedOnlyOnce(t *testing.T) {
3267+
// Enable channelz for metrics collection
3268+
defer internal.ChannelzTurnOffForTesting()
3269+
if !channelz.IsOn() {
3270+
channelz.TurnOn()
3271+
}
3272+
3273+
for _, test := range []struct {
3274+
name string
3275+
eosReceived bool
3276+
wantStreamSucceeded int64
3277+
wantStreamFailed int64
3278+
}{
3279+
{
3280+
name: "StreamsSucceeded",
3281+
eosReceived: true,
3282+
wantStreamSucceeded: 1,
3283+
wantStreamFailed: 0,
3284+
},
3285+
{
3286+
name: "StreamsFailed",
3287+
eosReceived: false,
3288+
wantStreamSucceeded: 0,
3289+
wantStreamFailed: 1,
3290+
},
3291+
} {
3292+
t.Run(test.name, func(t *testing.T) {
3293+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3294+
defer cancel()
3295+
3296+
// Setup server configuration with channelz support
3297+
serverConfig := &ServerConfig{
3298+
ChannelzParent: channelz.RegisterServer(t.Name()),
3299+
}
3300+
defer channelz.RemoveEntry(serverConfig.ChannelzParent.ID)
3301+
3302+
// Create server and client with normal handler (not notifyCall)
3303+
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, ConnectOptions{})
3304+
defer func() {
3305+
client.Close(fmt.Errorf("test cleanup"))
3306+
server.stop()
3307+
cancel()
3308+
}()
3309+
3310+
// Wait for connection to be established
3311+
waitWhileTrue(t, func() (bool, error) {
3312+
server.mu.Lock()
3313+
defer server.mu.Unlock()
3314+
if len(server.conns) == 0 {
3315+
return true, fmt.Errorf("timed-out while waiting for connection")
3316+
}
3317+
return false, nil
3318+
})
3319+
3320+
// Get the server transport
3321+
server.mu.Lock()
3322+
var serverTransport *http2Server
3323+
for st := range server.conns {
3324+
serverTransport = st.(*http2Server)
3325+
break
3326+
}
3327+
server.mu.Unlock()
3328+
3329+
if serverTransport == nil {
3330+
t.Fatal("Server transport not found")
3331+
}
3332+
3333+
clientStream, err := client.NewStream(ctx, &CallHdr{})
3334+
if err != nil {
3335+
t.Fatalf("Failed to create stream: %v", err)
3336+
}
3337+
3338+
// Wait for the stream to be created on the server side
3339+
var serverStream *ServerStream
3340+
waitWhileTrue(t, func() (bool, error) {
3341+
serverTransport.mu.Lock()
3342+
defer serverTransport.mu.Unlock()
3343+
for _, v := range serverTransport.activeStreams {
3344+
if v.id == clientStream.id {
3345+
serverStream = v
3346+
return false, nil
3347+
}
3348+
}
3349+
return true, nil
3350+
})
3351+
3352+
if serverStream == nil {
3353+
t.Fatalf("Server stream not found for client stream ID %d", clientStream.id)
3354+
}
3355+
3356+
// First call to deleteStream should remove the stream from activeStreams and update metrics
3357+
serverTransport.deleteStream(serverStream, test.eosReceived)
3358+
3359+
// Check metrics after first deleteStream call
3360+
streamsSucceeded := serverTransport.channelz.SocketMetrics.StreamsSucceeded.Load()
3361+
streamsFailed := serverTransport.channelz.SocketMetrics.StreamsFailed.Load()
3362+
3363+
if streamsSucceeded != test.wantStreamSucceeded {
3364+
t.Errorf("After first deleteStream - StreamsSucceeded: got %d, want %d", streamsSucceeded, test.wantStreamSucceeded)
3365+
}
3366+
if streamsFailed != test.wantStreamFailed {
3367+
t.Errorf("After first deleteStream - StreamsFailed: got %d, want %d", streamsFailed, test.wantStreamFailed)
3368+
}
3369+
3370+
// Additional calls to deleteStream should not change metrics (stream already deleted)
3371+
serverTransport.deleteStream(serverStream, test.eosReceived)
3372+
serverTransport.deleteStream(serverStream, test.eosReceived)
3373+
3374+
// Verify metrics haven't changed after subsequent calls
3375+
additionalStreamsSucceeded := serverTransport.channelz.SocketMetrics.StreamsSucceeded.Load()
3376+
additionalStreamsFailed := serverTransport.channelz.SocketMetrics.StreamsFailed.Load()
3377+
3378+
if additionalStreamsSucceeded != test.wantStreamSucceeded {
3379+
t.Errorf("After multiple deleteStream calls - StreamsSucceeded changed: got %d, want %d", additionalStreamsSucceeded, test.wantStreamSucceeded)
3380+
}
3381+
if additionalStreamsFailed != test.wantStreamFailed {
3382+
t.Errorf("After multiple deleteStream calls - StreamsFailed changed: got %d, want %d", additionalStreamsFailed, test.wantStreamFailed)
3383+
}
3384+
})
3385+
}
3386+
}

0 commit comments

Comments
 (0)