Skip to content

Commit 30645d5

Browse files
authored
xds: add metadata registry (#8537)
Following [A83](https://github.com/grpc/proposal/blob/master/A83-xds-gcp-authn-filter.md) and [A86](https://github.com/grpc/proposal/blob/master/A86-xds-http-connect.md), this adds a registry for custom metadata received in xDS protos for the purpose of converting the received metadata into internal representations. RELEASE NOTES: n/a
1 parent fa246ef commit 30645d5

File tree

6 files changed

+638
-8
lines changed

6 files changed

+638
-8
lines changed

internal/envconfig/xds.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,10 @@ var (
6868
// trust. For more details, see:
6969
// https://github.com/grpc/proposal/blob/master/A87-mtls-spiffe-support.md
7070
XDSSPIFFEEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_MTLS_SPIFFE", false)
71+
72+
// XDSHTTPConnectEnabled is true if gRPC should parse custom Metadata
73+
// configuring use of an HTTP CONNECT proxy via xDS from cluster resources.
74+
// For more details, see:
75+
// https://github.com/grpc/proposal/blob/master/A86-xds-http-connect.md
76+
XDSHTTPConnectEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_HTTP_CONNECT", false)
7177
)
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
*
3+
* Copyright 2025 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package xdsresource
19+
20+
import (
21+
"fmt"
22+
"net/netip"
23+
24+
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
25+
"google.golang.org/protobuf/types/known/anypb"
26+
)
27+
28+
func init() {
29+
registerMetadataConverter("type.googleapis.com/envoy.config.core.v3.Address", proxyAddressConvertor{})
30+
}
31+
32+
var (
33+
// metdataRegistry is a map from proto type to metadataConverter.
34+
metdataRegistry = make(map[string]metadataConverter)
35+
)
36+
37+
// metadataConverter converts xds metadata entries in
38+
// Metadata.typed_filter_metadata into an internal form with the fields relevant
39+
// to gRPC.
40+
type metadataConverter interface {
41+
// convert parses the Any proto into a concrete struct.
42+
convert(*anypb.Any) (any, error)
43+
}
44+
45+
// registerMetadataConverter registers the converter to the map keyed on a proto
46+
// type_url. Must be called at init time. Not thread safe.
47+
func registerMetadataConverter(protoType string, c metadataConverter) {
48+
metdataRegistry[protoType] = c
49+
}
50+
51+
// metadataConverterForType retrieves a converter based on key given.
52+
func metadataConverterForType(typeURL string) metadataConverter {
53+
return metdataRegistry[typeURL]
54+
}
55+
56+
// StructMetadataValue stores the values in a google.protobuf.Struct from
57+
// FilterMetadata.
58+
type StructMetadataValue struct {
59+
// Data stores the parsed JSON representation of a google.protobuf.Struct.
60+
Data map[string]any
61+
}
62+
63+
// ProxyAddressMetadataValue holds the address parsed from the
64+
// envoy.config.core.v3.Address proto message, as specified in gRFC A86.
65+
type ProxyAddressMetadataValue struct {
66+
// Address stores the proxy address configured (A86). It will be in the form
67+
// of host:port. It has to be either IPv6 or IPv4.
68+
Address string
69+
}
70+
71+
// proxyAddressConvertor implements the metadataConverter interface to handle
72+
// the conversion of envoy.config.core.v3.Address protobuf messages into an
73+
// internal representation.
74+
type proxyAddressConvertor struct{}
75+
76+
func (proxyAddressConvertor) convert(anyProto *anypb.Any) (any, error) {
77+
addressProto := &v3corepb.Address{}
78+
if err := anyProto.UnmarshalTo(addressProto); err != nil {
79+
return nil, fmt.Errorf("failed to unmarshal resource from Any proto: %v", err)
80+
}
81+
socketaddress := addressProto.GetSocketAddress()
82+
if socketaddress == nil {
83+
return nil, fmt.Errorf("no socket_address field in metadata")
84+
}
85+
if _, err := netip.ParseAddr(socketaddress.GetAddress()); err != nil {
86+
return nil, fmt.Errorf("address field is not a valid IPv4 or IPv6 address: %q", socketaddress.GetAddress())
87+
}
88+
portvalue := socketaddress.GetPortValue()
89+
if portvalue == 0 {
90+
return nil, fmt.Errorf("port value not set in socket_address")
91+
}
92+
return ProxyAddressMetadataValue{Address: parseAddress(socketaddress)}, nil
93+
}
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
*
3+
* Copyright 2025 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package xdsresource
18+
19+
import (
20+
"testing"
21+
22+
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
23+
"github.com/google/go-cmp/cmp"
24+
"google.golang.org/grpc/internal/testutils"
25+
)
26+
27+
const proxyAddressTypeURL = "type.googleapis.com/envoy.config.core.v3.Address"
28+
29+
func (s) TestProxyAddressConverterSuccess(t *testing.T) {
30+
converter := metadataConverterForType(proxyAddressTypeURL)
31+
if converter == nil {
32+
t.Fatalf("Converter for %q not found in registry", proxyAddressTypeURL)
33+
}
34+
tests := []struct {
35+
name string
36+
addr *v3corepb.Address
37+
want ProxyAddressMetadataValue
38+
}{
39+
{
40+
name: "valid IPv4 address and port",
41+
addr: &v3corepb.Address{
42+
Address: &v3corepb.Address_SocketAddress{
43+
SocketAddress: &v3corepb.SocketAddress{
44+
Address: "192.168.1.1",
45+
PortSpecifier: &v3corepb.SocketAddress_PortValue{
46+
PortValue: 8080,
47+
},
48+
},
49+
},
50+
},
51+
want: ProxyAddressMetadataValue{
52+
Address: "192.168.1.1:8080",
53+
},
54+
},
55+
{
56+
name: "valid full IPv6 address and port",
57+
addr: &v3corepb.Address{
58+
Address: &v3corepb.Address_SocketAddress{
59+
SocketAddress: &v3corepb.SocketAddress{
60+
Address: "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
61+
PortSpecifier: &v3corepb.SocketAddress_PortValue{
62+
PortValue: 9090,
63+
},
64+
},
65+
},
66+
},
67+
want: ProxyAddressMetadataValue{
68+
Address: "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:9090",
69+
},
70+
},
71+
{
72+
name: "valid shortened IPv6 address",
73+
addr: &v3corepb.Address{
74+
Address: &v3corepb.Address_SocketAddress{
75+
SocketAddress: &v3corepb.SocketAddress{
76+
Address: "2001:db8::1",
77+
PortSpecifier: &v3corepb.SocketAddress_PortValue{
78+
PortValue: 9090,
79+
},
80+
},
81+
},
82+
},
83+
want: ProxyAddressMetadataValue{
84+
Address: "[2001:db8::1]:9090",
85+
},
86+
},
87+
{
88+
name: "valid link-local IPv6 address",
89+
addr: &v3corepb.Address{
90+
Address: &v3corepb.Address_SocketAddress{
91+
SocketAddress: &v3corepb.SocketAddress{
92+
Address: "fe80::1ff:fe23:4567:890a",
93+
PortSpecifier: &v3corepb.SocketAddress_PortValue{
94+
PortValue: 8888,
95+
},
96+
},
97+
},
98+
},
99+
want: ProxyAddressMetadataValue{
100+
Address: "[fe80::1ff:fe23:4567:890a]:8888",
101+
},
102+
},
103+
{
104+
name: "valid IPv4-mapped IPv6 address",
105+
addr: &v3corepb.Address{
106+
Address: &v3corepb.Address_SocketAddress{
107+
SocketAddress: &v3corepb.SocketAddress{
108+
Address: "::ffff:192.0.2.128",
109+
PortSpecifier: &v3corepb.SocketAddress_PortValue{
110+
PortValue: 1234,
111+
},
112+
},
113+
},
114+
},
115+
want: ProxyAddressMetadataValue{
116+
Address: "[::ffff:192.0.2.128]:1234",
117+
},
118+
},
119+
}
120+
121+
for _, tt := range tests {
122+
t.Run(tt.name, func(t *testing.T) {
123+
anyProto := testutils.MarshalAny(t, tt.addr)
124+
got, err := converter.convert(anyProto)
125+
if err != nil {
126+
t.Fatalf("convert() failed with error: %v", err)
127+
}
128+
if diff := cmp.Diff(tt.want, got, cmp.AllowUnexported(ProxyAddressMetadataValue{})); diff != "" {
129+
t.Errorf("convert() returned unexpected value (-want +got):\n%s", diff)
130+
}
131+
})
132+
}
133+
}
134+
135+
func (s) TestProxyAddressConverterFailure(t *testing.T) {
136+
converter := metadataConverterForType(proxyAddressTypeURL)
137+
if converter == nil {
138+
t.Fatalf("Converter for %q not found in registry", proxyAddressTypeURL)
139+
}
140+
tests := []struct {
141+
name string
142+
addr *v3corepb.Address
143+
wantErr string
144+
}{
145+
{
146+
name: "invalid address",
147+
addr: &v3corepb.Address{
148+
Address: &v3corepb.Address_SocketAddress{
149+
SocketAddress: &v3corepb.SocketAddress{
150+
Address: "invalid-ip",
151+
},
152+
},
153+
},
154+
wantErr: "address field is not a valid IPv4 or IPv6 address: \"invalid-ip\"",
155+
},
156+
{
157+
name: "missing socket_address",
158+
addr: &v3corepb.Address{
159+
// No SocketAddress field set.
160+
},
161+
wantErr: "no socket_address field in metadata",
162+
},
163+
{
164+
name: "address is not a socket address",
165+
addr: &v3corepb.Address{
166+
Address: &v3corepb.Address_EnvoyInternalAddress{
167+
EnvoyInternalAddress: &v3corepb.EnvoyInternalAddress{
168+
AddressNameSpecifier: &v3corepb.EnvoyInternalAddress_ServerListenerName{
169+
ServerListenerName: "some-internal-listener",
170+
},
171+
},
172+
},
173+
},
174+
wantErr: "no socket_address field in metadata",
175+
},
176+
{
177+
name: "port value not set",
178+
addr: &v3corepb.Address{
179+
Address: &v3corepb.Address_SocketAddress{
180+
SocketAddress: &v3corepb.SocketAddress{
181+
Address: "127.0.0.1",
182+
PortSpecifier: nil,
183+
},
184+
},
185+
},
186+
wantErr: "port value not set in socket_address",
187+
},
188+
}
189+
190+
for _, tt := range tests {
191+
t.Run(tt.name, func(t *testing.T) {
192+
anyProto := testutils.MarshalAny(t, tt.addr)
193+
_, err := converter.convert(anyProto)
194+
if err == nil || err.Error() != tt.wantErr {
195+
t.Errorf("convert() got error = %v, wantErr = %q", err, tt.wantErr)
196+
}
197+
})
198+
}
199+
}

internal/xds/xdsclient/xdsresource/type_eds.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type Endpoint struct {
5353
HealthStatus EndpointHealthStatus
5454
Weight uint32
5555
HashKey string
56+
Metadata map[string]any
5657
}
5758

5859
// Locality contains information of a locality.
@@ -61,6 +62,7 @@ type Locality struct {
6162
ID clients.Locality
6263
Priority uint32
6364
Weight uint32
65+
Metadata map[string]any
6466
}
6567

6668
// EndpointsUpdate contains an EDS update.

internal/xds/xdsclient/xdsresource/unmarshal_eds.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,16 @@ func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint, uniqueEndpointAddrs
108108
}
109109
uniqueEndpointAddrs[a] = true
110110
}
111+
endpointMetadata, err := validateAndConstructMetadata(lbEndpoint.GetMetadata())
112+
if err != nil {
113+
return nil, err
114+
}
111115
endpoints = append(endpoints, Endpoint{
112116
HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()),
113117
Addresses: addrs,
114118
Weight: weight,
115119
HashKey: hashKey(lbEndpoint),
120+
Metadata: endpointMetadata,
116121
})
117122
}
118123
return endpoints, nil
@@ -190,11 +195,17 @@ func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate,
190195
if err != nil {
191196
return EndpointsUpdate{}, err
192197
}
198+
localityMetadata, err := validateAndConstructMetadata(locality.GetMetadata())
199+
if err != nil {
200+
return EndpointsUpdate{}, err
201+
}
202+
193203
ret.Localities = append(ret.Localities, Locality{
194204
ID: lid,
195205
Endpoints: endpoints,
196206
Weight: weight,
197207
Priority: priority,
208+
Metadata: localityMetadata,
198209
})
199210
}
200211
for i := 0; i < len(priorities); i++ {
@@ -204,3 +215,36 @@ func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate,
204215
}
205216
return ret, nil
206217
}
218+
219+
func validateAndConstructMetadata(metadataProto *v3corepb.Metadata) (map[string]any, error) {
220+
// TODO(easwars): Find a better place for the environment variable check
221+
// once A83 is implemented.
222+
if !envconfig.XDSHTTPConnectEnabled || metadataProto == nil {
223+
return nil, nil
224+
}
225+
metadata := make(map[string]any)
226+
// First go through TypedFilterMetadata.
227+
for key, anyProto := range metadataProto.GetTypedFilterMetadata() {
228+
converter := metadataConverterForType(anyProto.GetTypeUrl())
229+
// Ignore types we don't have a converter for.
230+
if converter == nil {
231+
continue
232+
}
233+
val, err := converter.convert(anyProto)
234+
if err != nil {
235+
// If the converter fails, nack the whole resource.
236+
return nil, fmt.Errorf("metadata conversion for key %q and type %q failed: %v", key, anyProto.GetTypeUrl(), err)
237+
}
238+
metadata[key] = val
239+
}
240+
241+
// Process FilterMetadata for any keys not already handled.
242+
for key, structProto := range metadataProto.GetFilterMetadata() {
243+
// Skip keys already added from TyperFilterMetadata.
244+
if metadata[key] != nil {
245+
continue
246+
}
247+
metadata[key] = StructMetadataValue{Data: structProto.AsMap()}
248+
}
249+
return metadata, nil
250+
}

0 commit comments

Comments
 (0)