Skip to content

Commit f123124

Browse files
committed
Merge remote-tracking branch 'origin/master' into ignore-status-for-grpc-content-type-8486
2 parents b1a7265 + e60a04b commit f123124

27 files changed

+1555
-174
lines changed

.github/pull_request_template.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Thank you for your PR. Please read and follow
1+
Thank you for your PR. Please read and follow
22
https://github.com/grpc/grpc-go/blob/master/CONTRIBUTING.md, especially the
33
"Guidelines for Pull Requests" section, and then delete this text before
44
entering your PR description.

internal/envconfig/xds.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,10 @@ var (
6868
// trust. For more details, see:
6969
// https://github.com/grpc/proposal/blob/master/A87-mtls-spiffe-support.md
7070
XDSSPIFFEEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_MTLS_SPIFFE", false)
71+
72+
// XDSHTTPConnectEnabled is true if gRPC should parse custom Metadata
73+
// configuring use of an HTTP CONNECT proxy via xDS from cluster resources.
74+
// For more details, see:
75+
// https://github.com/grpc/proposal/blob/master/A86-xds-http-connect.md
76+
XDSHTTPConnectEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_HTTP_CONNECT", false)
7177
)

internal/transport/http2_client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1532,6 +1532,11 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
15321532
return
15331533
}
15341534
statusCode := int(c)
1535+
if statusCode >= 100 && statusCode < 200 {
1536+
//In case of informational headers return
1537+
//For trailers, since we are already in gRPC mode, we will ignore all http statuses and not enter this block
1538+
return
1539+
}
15351540
httpStatusErr = fmt.Sprintf(
15361541
"unexpected HTTP status code received from server: %d (%s)",
15371542
statusCode,

internal/transport/http2_server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1353,10 +1353,10 @@ func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCo
13531353
// called to interrupt the potential blocking on other goroutines.
13541354
s.cancel()
13551355

1356-
oldState := s.swapState(streamDone)
1357-
if oldState == streamDone {
1358-
return
1359-
}
1356+
// We can't return early even if the stream's state is "done" as the state
1357+
// might have been set by the `finishStream` method. Deleting the stream via
1358+
// `finishStream` can get blocked on flow control.
1359+
s.swapState(streamDone)
13601360
t.deleteStream(s, eosReceived)
13611361

13621362
t.controlBuf.put(&cleanupStream{

internal/transport/transport_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3183,3 +3183,79 @@ func (s) TestServerSendsRSTAfterDeadlineToMisbehavedClient(t *testing.T) {
31833183
t.Fatalf("RST frame received earlier than expected by duration: %v", want-got)
31843184
}
31853185
}
3186+
3187+
// TestClientTransport_Handle1xxHeaders validates that 1xx HTTP status headers
3188+
// are ignored and treated as a protocol error if END_STREAM is set.
3189+
func (s) TestClientTransport_Handle1xxHeaders(t *testing.T) {
3190+
testStream := func() *ClientStream {
3191+
return &ClientStream{
3192+
Stream: &Stream{
3193+
buf: &recvBuffer{
3194+
c: make(chan recvMsg),
3195+
mu: sync.Mutex{},
3196+
},
3197+
},
3198+
done: make(chan struct{}),
3199+
headerChan: make(chan struct{}),
3200+
}
3201+
}
3202+
3203+
testClient := func(ts *ClientStream) *http2Client {
3204+
return &http2Client{
3205+
mu: sync.Mutex{},
3206+
activeStreams: map[uint32]*ClientStream{
3207+
0: ts,
3208+
},
3209+
controlBuf: newControlBuffer(make(<-chan struct{})),
3210+
}
3211+
}
3212+
3213+
for _, test := range []struct {
3214+
name string
3215+
metaHeaderFrame *http2.MetaHeadersFrame
3216+
httpFlags http2.Flags
3217+
wantStatus *status.Status
3218+
}{
3219+
{
3220+
name: "1xx with END_STREAM will be ignored",
3221+
metaHeaderFrame: &http2.MetaHeadersFrame{
3222+
Fields: []hpack.HeaderField{
3223+
{Name: ":status", Value: "100"},
3224+
},
3225+
},
3226+
httpFlags: http2.FlagHeadersEndStream,
3227+
wantStatus: nil,
3228+
},
3229+
{
3230+
name: "1xx without END_STREAM is ignored",
3231+
metaHeaderFrame: &http2.MetaHeadersFrame{
3232+
Fields: []hpack.HeaderField{
3233+
{Name: ":status", Value: "100"},
3234+
},
3235+
},
3236+
httpFlags: 0,
3237+
wantStatus: nil,
3238+
},
3239+
} {
3240+
t.Run(test.name, func(t *testing.T) {
3241+
ts := testStream()
3242+
s := testClient(ts)
3243+
3244+
test.metaHeaderFrame.HeadersFrame = &http2.HeadersFrame{
3245+
FrameHeader: http2.FrameHeader{
3246+
StreamID: 0,
3247+
Flags: test.httpFlags,
3248+
},
3249+
}
3250+
3251+
s.operateHeaders(test.metaHeaderFrame)
3252+
3253+
got := ts.status
3254+
want := test.wantStatus
3255+
3256+
if got.Code() != want.Code() || got.Message() != want.Message() {
3257+
t.Fatalf("operateHeaders(%v); status = %v, want %v", test.metaHeaderFrame, got, want)
3258+
}
3259+
})
3260+
}
3261+
}

internal/xds/balancer/priority/ignore_resolve_now.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,29 +29,21 @@ import (
2929
// ResolveNow() method to ignore those calls if the ignoreResolveNow bit is set.
3030
type ignoreResolveNowClientConn struct {
3131
balancer.ClientConn
32-
ignoreResolveNow *uint32
32+
ignoreResolveNow atomic.Bool
3333
}
3434

3535
func newIgnoreResolveNowClientConn(cc balancer.ClientConn, ignore bool) *ignoreResolveNowClientConn {
36-
ret := &ignoreResolveNowClientConn{
37-
ClientConn: cc,
38-
ignoreResolveNow: new(uint32),
39-
}
36+
ret := &ignoreResolveNowClientConn{ClientConn: cc}
4037
ret.updateIgnoreResolveNow(ignore)
4138
return ret
4239
}
4340

4441
func (i *ignoreResolveNowClientConn) updateIgnoreResolveNow(b bool) {
45-
if b {
46-
atomic.StoreUint32(i.ignoreResolveNow, 1)
47-
return
48-
}
49-
atomic.StoreUint32(i.ignoreResolveNow, 0)
50-
42+
i.ignoreResolveNow.Store(b)
5143
}
5244

53-
func (i ignoreResolveNowClientConn) ResolveNow(o resolver.ResolveNowOptions) {
54-
if atomic.LoadUint32(i.ignoreResolveNow) != 0 {
45+
func (i *ignoreResolveNowClientConn) ResolveNow(o resolver.ResolveNowOptions) {
46+
if i.ignoreResolveNow.Load() {
5547
return
5648
}
5749
i.ClientConn.ResolveNow(o)

internal/xds/clients/xdsclient/internal/internal.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ package internal
2121
import "time"
2222

2323
var (
24-
// WatchExpiryTimeout is the watch expiry timeout for xDS client. It can be
25-
// overridden by tests to change the default watch expiry timeout.
26-
WatchExpiryTimeout time.Duration
27-
2824
// StreamBackoff is the stream backoff for xDS client. It can be overridden
2925
// by tests to change the default backoff strategy.
3026
StreamBackoff func(int) time.Duration

internal/xds/clients/xdsclient/test/ads_stream_watch_test.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/google/uuid"
2929
"google.golang.org/grpc/credentials/insecure"
3030
"google.golang.org/grpc/internal/testutils/xds/e2e"
31+
"google.golang.org/grpc/internal/xds/clients"
3132
"google.golang.org/grpc/internal/xds/clients/grpctransport"
3233
"google.golang.org/grpc/internal/xds/clients/internal/testutils"
3334
"google.golang.org/grpc/internal/xds/clients/xdsclient"
@@ -175,9 +176,34 @@ func (s) TestADS_WatchState_TimerFires(t *testing.T) {
175176
// short resource expiry timeout.
176177
nodeID := uuid.New().String()
177178
configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}}
178-
overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout)
179-
client := createXDSClient(t, mgmtServer.Address, nodeID, grpctransport.NewBuilder(configs))
180-
179+
resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType}
180+
si := clients.ServerIdentifier{
181+
ServerURI: mgmtServer.Address,
182+
Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"},
183+
}
184+
185+
xdsClientConfig := xdsclient.Config{
186+
Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}},
187+
Node: clients.Node{ID: nodeID, UserAgentName: "user-agent", UserAgentVersion: "0.0.0.0"},
188+
TransportBuilder: grpctransport.NewBuilder(configs),
189+
ResourceTypes: resourceTypes,
190+
// Xdstp resource names used in this test do not specify an
191+
// authority. These will end up looking up an entry with the
192+
// empty key in the authorities map. Having an entry with an
193+
// empty key and empty configuration, results in these
194+
// resources also using the top-level configuration.
195+
Authorities: map[string]xdsclient.Authority{
196+
"": {XDSServers: []xdsclient.ServerConfig{}},
197+
},
198+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
199+
}
200+
201+
// Create an xDS client with the above config.
202+
client, err := xdsclient.New(xdsClientConfig)
203+
if err != nil {
204+
t.Fatalf("Failed to create xDS client: %v", err)
205+
}
206+
t.Cleanup(func() { client.Close() })
181207
// Create a watch for the first listener resource and verify that the timer
182208
// is running and the watch state is `requested`.
183209
const listenerName = "listener"

internal/xds/clients/xdsclient/test/authority_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,10 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T) (*testutils.Liste
105105
testAuthority2: {XDSServers: []xdsclient.ServerConfig{}},
106106
testAuthority3: {XDSServers: []xdsclient.ServerConfig{{ServerIdentifier: siNonDefault}}},
107107
},
108+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
108109
}
109110

110111
// Create an xDS client with the above config.
111-
overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout)
112112
client, err := xdsclient.New(xdsClientConfig)
113113
if err != nil {
114114
t.Fatalf("Failed to create xDS client: %v", err)

internal/xds/clients/xdsclient/test/lds_watchers_test.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
"google.golang.org/grpc/internal/xds/clients/internal/testutils"
3636
"google.golang.org/grpc/internal/xds/clients/internal/testutils/e2e"
3737
"google.golang.org/grpc/internal/xds/clients/xdsclient"
38-
xdsclientinternal "google.golang.org/grpc/internal/xds/clients/xdsclient/internal"
3938
"google.golang.org/grpc/internal/xds/clients/xdsclient/internal/xdsresource"
4039

4140
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
@@ -115,12 +114,6 @@ func badListenerResource(t *testing.T, name string) *v3listenerpb.Listener {
115114
}
116115
}
117116

118-
func overrideWatchExpiryTimeout(t *testing.T, watchExpiryTimeout time.Duration) {
119-
originalWatchExpiryTimeout := xdsclientinternal.WatchExpiryTimeout
120-
xdsclientinternal.WatchExpiryTimeout = watchExpiryTimeout
121-
t.Cleanup(func() { xdsclientinternal.WatchExpiryTimeout = originalWatchExpiryTimeout })
122-
}
123-
124117
// verifyNoListenerUpdate verifies that no listener update is received on the
125118
// provided update channel, and returns an error if an update is received.
126119
//
@@ -726,11 +719,10 @@ func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
726719
Node: clients.Node{ID: nodeID},
727720
TransportBuilder: grpctransport.NewBuilder(configs),
728721
ResourceTypes: resourceTypes,
722+
// Override the default watch expiry timeout.
723+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
729724
}
730725

731-
// Create an xDS client with the above config and override the default
732-
// watch expiry timeout.
733-
overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout)
734726
client, err := xdsclient.New(xdsClientConfig)
735727
if err != nil {
736728
t.Fatalf("Failed to create xDS client: %v", err)
@@ -777,11 +769,11 @@ func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
777769
Node: clients.Node{ID: nodeID},
778770
TransportBuilder: grpctransport.NewBuilder(configs),
779771
ResourceTypes: resourceTypes,
772+
// Override the default watch expiry timeout.
773+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
780774
}
781775

782-
// Create an xDS client with the above config and override the default
783-
// watch expiry timeout.
784-
overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout)
776+
// Create an xDS client with the above config.
785777
client, err := xdsclient.New(xdsClientConfig)
786778
if err != nil {
787779
t.Fatalf("Failed to create xDS client: %v", err)

0 commit comments

Comments
 (0)