Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/configgrpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,5 @@ README](../confignet/README.md).
- [`write_buffer_size`](https://godoc.org/google.golang.org/grpc#WriteBufferSize)
- [`auth`](../configauth/README.md)
- [`middlewares`](../configmiddleware/README.md)
- `include_metadata`
- `client_address_metadata_keys`
79 changes: 62 additions & 17 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"math"
"net"
"strings"
"time"

Expand Down Expand Up @@ -206,6 +207,11 @@ type ServerConfig struct {
// Middlewares for the gRPC server.
Middlewares []configmiddleware.Config `mapstructure:"middlewares,omitempty"`

// ClientAddressMetadataKeys list of metadata keys to determine the client address.
// Keys are processed in order, the first valid value is used to set the client.Addr.
// The client.Addr will default to using Peer address.
ClientAddressMetadataKeys []string `mapstructure:"client_address_metadata_keys,omitempty"`

// prevent unkeyed literal initialization
_ struct{}
}
Expand Down Expand Up @@ -532,8 +538,8 @@ func (sc *ServerConfig) getGrpcServerOptions(

// Enable OpenTelemetry observability plugin.

uInterceptors = append(uInterceptors, enhanceWithClientInformation(sc.IncludeMetadata))
sInterceptors = append(sInterceptors, enhanceStreamWithClientInformation(sc.IncludeMetadata)) //nolint:contextcheck // context already handled
uInterceptors = append(uInterceptors, enhanceWithClientInformation(sc.IncludeMetadata, sc.ClientAddressMetadataKeys))
sInterceptors = append(sInterceptors, enhanceStreamWithClientInformation(sc.IncludeMetadata, sc.ClientAddressMetadataKeys)) //nolint:contextcheck // context already handled

opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler(otelOpts...)), grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...))

Expand Down Expand Up @@ -571,37 +577,76 @@ func getGRPCCompressionName(compressionType configcompression.Type) (string, err

// enhanceWithClientInformation intercepts the incoming RPC, replacing the incoming context with one that includes
// a client.Info, potentially with the peer's address.
func enhanceWithClientInformation(includeMetadata bool) grpc.UnaryServerInterceptor {
func enhanceWithClientInformation(includeMetadata bool, clientAddrMetadataKeys []string) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
return handler(contextWithClient(ctx, includeMetadata), req)
return handler(contextWithClient(ctx, includeMetadata, clientAddrMetadataKeys), req)
}
}

func enhanceStreamWithClientInformation(includeMetadata bool) grpc.StreamServerInterceptor {
func enhanceStreamWithClientInformation(includeMetadata bool, clientAddrMetadataKeys []string) grpc.StreamServerInterceptor {
return func(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, wrapServerStream(contextWithClient(ss.Context(), includeMetadata), ss))
return handler(srv, wrapServerStream(contextWithClient(ss.Context(), includeMetadata, clientAddrMetadataKeys), ss))
}
}

// contextWithClient attempts to add the peer address to the client.Info from the context. When no
// client.Info exists in the context, one is created.
func contextWithClient(ctx context.Context, includeMetadata bool) context.Context {
// contextWithClient attempts to add the client address to the client.Info from the context.
// The address is found by first checking the metadata using clientAddrMetadataKeys and
// falls back to the peer address.
// When no client.Info exists in the context, one is created.
func contextWithClient(ctx context.Context, includeMetadata bool, clientAddrMetadataKeys []string) context.Context {
cl := client.FromContext(ctx)
if p, ok := peer.FromContext(ctx); ok {
md, mdExists := metadata.FromIncomingContext(ctx)

var ip *net.IPAddr
if mdExists {
ip = getIP(md, clientAddrMetadataKeys)
}
if ip != nil {
cl.Addr = ip
} else if p, ok := peer.FromContext(ctx); ok {
cl.Addr = p.Addr
}
if includeMetadata {
if md, ok := metadata.FromIncomingContext(ctx); ok {
copiedMD := md.Copy()
if len(md[client.MetadataHostName]) == 0 && len(md[":authority"]) > 0 {
copiedMD[client.MetadataHostName] = md[":authority"]
}
cl.Metadata = client.NewMetadata(copiedMD)

if includeMetadata && mdExists {
copiedMD := md.Copy()
if len(md[client.MetadataHostName]) == 0 && len(md[":authority"]) > 0 {
copiedMD[client.MetadataHostName] = md[":authority"]
}
cl.Metadata = client.NewMetadata(copiedMD)
}
return client.NewContext(ctx, cl)
}

// getIP checks keys in order to get an IP address.
// Returns the first valid IP address found, otherwise
// returns nil.
func getIP(md metadata.MD, keys []string) *net.IPAddr {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note for reviewer: I did not find a good common package for these helper funcs, so I opted to duplicate between confighttp and configgrpc

for _, key := range keys {
if values := md.Get(key); len(values) > 0 && values[0] != "" {
if ip := parseIP(values[0]); ip != nil {
return ip
}
}
}
return nil
}

// parseIP parses the given string for an IP address. The input string might contain the port,
// but must not contain a protocol or path. Suitable for getting the IP part of a client connection.
func parseIP(source string) *net.IPAddr {
ipstr, _, err := net.SplitHostPort(source)
if err == nil {
source = ipstr
}
ip := net.ParseIP(source)
if ip != nil {
return &net.IPAddr{
IP: ip,
}
}
return nil
}

func authUnaryServerInterceptor(server extensionauth.Server) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
headers, ok := metadata.FromIncomingContext(ctx)
Expand Down
146 changes: 140 additions & 6 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,10 +784,11 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {

func TestContextWithClient(t *testing.T) {
testCases := []struct {
desc string
input context.Context
doMetadata bool
expected client.Info
desc string
input context.Context
doMetadata bool
clientAddrKeys []string
expected client.Info
}{
{
desc: "no peer information, empty client",
Expand Down Expand Up @@ -837,6 +838,53 @@ func TestContextWithClient(t *testing.T) {
},
},
},
{
desc: "empty client, existing IP gets overridden from metadata keys",
input: metadata.NewIncomingContext(
context.Background(),
metadata.Pairs("x-forwarded-for", "1.1.1.1"),
),
clientAddrKeys: []string{"x-forwarded-for", "x-real-ip"},
expected: client.Info{
Addr: &net.IPAddr{
IP: net.IPv4(1, 1, 1, 1),
},
},
},
{
desc: "existing client, existing IP gets overridden from metadata keys",
input: metadata.NewIncomingContext(
peer.NewContext(context.Background(), &peer.Peer{
Addr: &net.IPAddr{
IP: net.IPv4(1, 2, 3, 4),
},
}),
metadata.Pairs("x-forwarded-for", "1.1.1.1"),
),
clientAddrKeys: []string{"x-forwarded-for", "x-real-ip"},
expected: client.Info{
Addr: &net.IPAddr{
IP: net.IPv4(1, 1, 1, 1),
},
},
},
{
desc: "existing client, no valid IP in metadata existing overridden by defaulting to peer information",
input: metadata.NewIncomingContext(
peer.NewContext(context.Background(), &peer.Peer{
Addr: &net.IPAddr{
IP: net.IPv4(1, 2, 3, 4),
},
}),
metadata.Pairs("x-forwarded-for", "", "x-real-ip", "invalid address"),
),
clientAddrKeys: []string{"x-forwarded-for", "x-real-ip"},
expected: client.Info{
Addr: &net.IPAddr{
IP: net.IPv4(1, 2, 3, 4),
},
},
},
{
desc: "existing client with metadata",
input: client.NewContext(context.Background(), client.Info{
Expand Down Expand Up @@ -880,12 +928,98 @@ func TestContextWithClient(t *testing.T) {
}
for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
cl := client.FromContext(contextWithClient(tt.input, tt.doMetadata))
cl := client.FromContext(contextWithClient(tt.input, tt.doMetadata, tt.clientAddrKeys))
assert.Equal(t, tt.expected, cl)
})
}
}

func TestGetIP(t *testing.T) {
testCases := []struct {
name string
md metadata.MD
keys []string
expected *net.IPAddr
}{
{
name: "valid ip in first key",
md: metadata.Pairs(
"x-forwarded-for", "1.2.3.4",
),
keys: []string{"x-forwarded-for", "x-real-ip"},
expected: &net.IPAddr{
IP: net.IPv4(1, 2, 3, 4),
},
},
{
name: "valid ip in second key",
md: metadata.Pairs(
"x-forwarded-for", "",
"x-real-ip", "5.6.7.8",
),
keys: []string{"x-forwarded-for", "x-real-ip"},
expected: &net.IPAddr{
IP: net.IPv4(5, 6, 7, 8),
},
},
{
name: "valid ip with port",
md: metadata.Pairs(
"x-forwarded-for", "1.2.3.4:8080",
),
keys: []string{"x-forwarded-for"},
expected: &net.IPAddr{
IP: net.IPv4(1, 2, 3, 4),
},
},
{
name: "invalid ip falls to next key",
md: metadata.Pairs(
"x-forwarded-for", "invalid",
"x-real-ip", "9.10.11.12",
),
keys: []string{"x-forwarded-for", "x-real-ip"},
expected: &net.IPAddr{
IP: net.IPv4(9, 10, 11, 12),
},
},
{
name: "no valid ip",
md: metadata.Pairs(
"x-forwarded-for", "invalid",
"x-real-ip", "also-invalid",
),
keys: []string{"x-forwarded-for", "x-real-ip"},
expected: nil,
},
{
name: "empty keys",
md: metadata.Pairs("x-forwarded-for", "1.2.3.4"),
keys: []string{},
expected: nil,
},
{
name: "empty metadata",
md: metadata.MD{},
keys: []string{"x-forwarded-for", "x-real-ip"},
expected: nil,
},
{
name: "missing key",
md: metadata.Pairs(
"other-header", "1.2.3.4",
),
keys: []string{"x-forwarded-for", "x-real-ip"},
expected: nil,
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.expected, getIP(tt.md, tt.keys))
})
}
}

func TestStreamInterceptorEnhancesClient(t *testing.T) {
// prepare
inCtx := peer.NewContext(context.Background(), &peer.Peer{
Expand All @@ -905,7 +1039,7 @@ func TestStreamInterceptorEnhancesClient(t *testing.T) {
}

// test
err := enhanceStreamWithClientInformation(false)(nil, stream, nil, handler)
err := enhanceStreamWithClientInformation(false, nil)(nil, stream, nil, handler)

// verify
require.NoError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions config/confighttp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ will not be enabled.
- [`auth`](../configauth/README.md)
- `request_params`: a list of query parameter names to add to the auth context, along with the HTTP headers
- [`middlewares`](../configmiddleware/README.md)
- `include_metadata`
- `client_address_metadata_keys`

You can enable [`attribute processor`][attribute-processor] to append any http header to span's attribute using custom key. You also need to enable the "include_metadata"

Expand All @@ -137,6 +139,8 @@ receivers:
max_age: 7200
endpoint: 0.0.0.0:55690
compression_algorithms: ["", "gzip"]
client_address_metadata_keys:
- x-forwarded-for
processors:
attributes:
actions:
Expand Down
42 changes: 36 additions & 6 deletions config/confighttp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,18 +615,19 @@ func TestHttpTransportOptions(t *testing.T) {

func TestContextWithClient(t *testing.T) {
testCases := []struct {
name string
input *http.Request
doMetadata bool
expected client.Info
name string
input *http.Request
doMetadata bool
clientAddrKeys []string
expected client.Info
}{
{
name: "request without client IP or headers",
input: &http.Request{},
expected: client.Info{},
},
{
name: "request with client IP",
name: "request with client IP from RemoteAddr",
input: &http.Request{
RemoteAddr: "1.2.3.4:55443",
},
Expand All @@ -636,6 +637,35 @@ func TestContextWithClient(t *testing.T) {
},
},
},
{
name: "request with client IP from metadata keys",
input: &http.Request{
RemoteAddr: "1.2.3.4:55443",
Header: map[string][]string{http.CanonicalHeaderKey("x-forwarded-for"): {"1.1.1.1"}},
},
clientAddrKeys: []string{"x-forwarded-for", "x-real-ip"},
expected: client.Info{
Addr: &net.IPAddr{
IP: net.IPv4(1, 1, 1, 1),
},
},
},
{
name: "request with client IP, no valid IP default to RemoteAddr",
input: &http.Request{
RemoteAddr: "1.2.3.4:55443",
Header: map[string][]string{
http.CanonicalHeaderKey("x-forwarded-for"): {""},
http.CanonicalHeaderKey("x-real-ip"): {"invalid address"},
},
},
clientAddrKeys: []string{"x-forwarded-for", "x-real-ip"},
expected: client.Info{
Addr: &net.IPAddr{
IP: net.IPv4(1, 2, 3, 4),
},
},
},
{
name: "request with client headers, no metadata processing",
input: &http.Request{
Expand Down Expand Up @@ -668,7 +698,7 @@ func TestContextWithClient(t *testing.T) {
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
ctx := contextWithClient(tt.input, tt.doMetadata)
ctx := contextWithClient(tt.input, tt.doMetadata, tt.clientAddrKeys)
assert.Equal(t, tt.expected, client.FromContext(ctx))
})
}
Expand Down
Loading