Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 67 additions & 54 deletions agent/xds/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package xds

import (
"encoding/json"
"errors"
"fmt"
"net/url"
Expand Down Expand Up @@ -87,11 +88,14 @@ func (s *ResourceGenerator) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.C
clusters := make([]proto.Message, 0, len(cfgSnap.ConnectProxy.DiscoveryChain)+1)

// Include the "app" cluster for the public listener
appCluster, err := s.makeAppCluster(cfgSnap, xdscommon.LocalAppClusterName, "", cfgSnap.Proxy.LocalServicePort)
if err != nil {
return nil, err
for _, port := range []int{9090, 10100} {
// Include the "app" cluster for the public listener
appCluster, err := s.makeAppCluster(cfgSnap, xdscommon.LocalAppClusterName+"-"+strconv.Itoa(port), "", port)
if err != nil {
return nil, err
}
clusters = append(clusters, appCluster)
}
clusters = append(clusters, appCluster)

if cfgSnap.Proxy.Mode == structs.ProxyModeTransparent {
passthroughs, err := makePassthroughClusters(cfgSnap, cfgSnap.GetXDSCommonConfig(s.Logger))
Expand Down Expand Up @@ -1529,68 +1533,77 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
out = append(out, c)
}

// Construct the target clusters.
for _, groupedTarget := range targetGroups {
s.Logger.Debug("generating cluster for", "cluster", groupedTarget.ClusterName)
for _, p := range []uint{9090, 10100} {
// Construct the target clusters.
for _, groupedTarget := range targetGroups {
s.Logger.Debug("generating cluster for", "cluster", groupedTarget.ClusterName)

c := &envoy_cluster_v3.Cluster{
Name: groupedTarget.ClusterName,
AltStatName: groupedTarget.ClusterName,
ConnectTimeout: durationpb.New(node.Resolver.ConnectTimeout),
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_EDS},
CommonLbConfig: &envoy_cluster_v3.Cluster_CommonLbConfig{
HealthyPanicThreshold: &envoy_type_v3.Percent{
Value: 0, // disable panic threshold
c := &envoy_cluster_v3.Cluster{
Name: groupedTarget.ClusterName + "-" + fmt.Sprintf("%d", p),
AltStatName: groupedTarget.ClusterName + "-" + fmt.Sprintf("%d", p),
ConnectTimeout: durationpb.New(node.Resolver.ConnectTimeout),
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_EDS},
CommonLbConfig: &envoy_cluster_v3.Cluster_CommonLbConfig{
HealthyPanicThreshold: &envoy_type_v3.Percent{
Value: 0, // disable panic threshold
},
},
},
EdsClusterConfig: &envoy_cluster_v3.Cluster_EdsClusterConfig{
EdsConfig: &envoy_core_v3.ConfigSource{
InitialFetchTimeout: cfgSnap.GetXDSCommonConfig(s.Logger).GetXDSFetchTimeout(),
ResourceApiVersion: envoy_core_v3.ApiVersion_V3,
ConfigSourceSpecifier: &envoy_core_v3.ConfigSource_Ads{
Ads: &envoy_core_v3.AggregatedConfigSource{},
EdsClusterConfig: &envoy_cluster_v3.Cluster_EdsClusterConfig{
EdsConfig: &envoy_core_v3.ConfigSource{
InitialFetchTimeout: cfgSnap.GetXDSCommonConfig(s.Logger).GetXDSFetchTimeout(),
ResourceApiVersion: envoy_core_v3.ApiVersion_V3,
ConfigSourceSpecifier: &envoy_core_v3.ConfigSource_Ads{
Ads: &envoy_core_v3.AggregatedConfigSource{},
},
},
},
},
// TODO(peering): make circuit breakers or outlier detection work?
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(upstreamConfig.Limits),
},
OutlierDetection: config.ToOutlierDetection(upstreamConfig.PassiveHealthCheck, nil, true),
}
// TODO(peering): make circuit breakers or outlier detection work?
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(upstreamConfig.Limits),
},
OutlierDetection: config.ToOutlierDetection(upstreamConfig.PassiveHealthCheck, nil, true),
}

var lb *structs.LoadBalancer
if node.LoadBalancer != nil {
lb = node.LoadBalancer
}
if err := injectLBToCluster(lb, c); err != nil {
return nil, fmt.Errorf("failed to apply load balancer configuration to cluster %q: %v", groupedTarget.ClusterName, err)
}
var lb *structs.LoadBalancer
if node.LoadBalancer != nil {
lb = node.LoadBalancer
}
if err := injectLBToCluster(lb, c); err != nil {
return nil, fmt.Errorf("failed to apply load balancer configuration to cluster %q: %v", groupedTarget.ClusterName, err)
}

if upstreamConfig.Protocol == "http2" || upstreamConfig.Protocol == "grpc" {
if err := s.setHttp2ProtocolOptions(c); err != nil {
return nil, err
if upstreamConfig.Protocol == "http2" || upstreamConfig.Protocol == "grpc" {
if err := s.setHttp2ProtocolOptions(c); err != nil {
return nil, err
}
}
}

switch len(groupedTarget.Targets) {
case 0:
continue
case 1:
// We expect one target so this passes through to continue setting the cluster up.
default:
return nil, fmt.Errorf("cannot have more than one target")
}
switch len(groupedTarget.Targets) {
case 0:
continue
case 1:
// We expect one target so this passes through to continue setting the cluster up.
default:
return nil, fmt.Errorf("cannot have more than one target")
}

if targetInfo := groupedTarget.Targets[0]; targetInfo.TLSContext != nil {
transportSocket, err := makeUpstreamTLSTransportSocket(targetInfo.TLSContext)
if targetInfo := groupedTarget.Targets[0]; targetInfo.TLSContext != nil {
// First set ALPN protocol for the outgoing connection
targetInfo.TLSContext.CommonTlsContext.AlpnProtocols = []string{"app-" + fmt.Sprintf("%d", p)}
transportSocket, err := makeUpstreamTLSTransportSocket(targetInfo.TLSContext)
if err != nil {
return nil, err
}
c.TransportSocket = transportSocket
}

b, err := json.Marshal(c)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to marshal cluster %q to json: %v", c.Name, err)
}
c.TransportSocket = transportSocket
s.Logger.Trace("constructed cluster", "cluster", c.Name, "json", string(b))
out = append(out, c)
}

out = append(out, c)
}
}

Expand Down
Loading
Loading