Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/server/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/klog/v2"

"sigs.k8s.io/apiserver-network-proxy/pkg/server"
"sigs.k8s.io/apiserver-network-proxy/pkg/server/proxystrategies"
"sigs.k8s.io/apiserver-network-proxy/pkg/util"
)

Expand Down Expand Up @@ -314,7 +315,7 @@ func (o *ProxyRunOptions) Validate() error {
if len(o.ProxyStrategies) == 0 {
return fmt.Errorf("ProxyStrategies cannot be empty")
}
if _, err := server.ParseProxyStrategies(o.ProxyStrategies); err != nil {
if _, err := proxystrategies.ParseProxyStrategies(o.ProxyStrategies); err != nil {
return fmt.Errorf("invalid proxy strategies: %v", err)
}
if o.XfrChannelSize <= 0 {
Expand Down
3 changes: 2 additions & 1 deletion cmd/server/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
"sigs.k8s.io/apiserver-network-proxy/pkg/server"
"sigs.k8s.io/apiserver-network-proxy/pkg/server/leases"
"sigs.k8s.io/apiserver-network-proxy/pkg/server/proxystrategies"
"sigs.k8s.io/apiserver-network-proxy/pkg/util"
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
)
Expand Down Expand Up @@ -134,7 +135,7 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
AuthenticationAudience: o.AuthenticationAudience,
}
klog.V(1).Infoln("Starting frontend server for client connections.")
ps, err := server.ParseProxyStrategies(o.ProxyStrategies)
ps, err := proxystrategies.ParseProxyStrategies(o.ProxyStrategies)
if err != nil {
return err
}
Expand Down
89 changes: 18 additions & 71 deletions pkg/server/backend_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"io"
"math/rand"
"slices"
"strings"
"sync"
"time"

Expand All @@ -32,72 +31,11 @@ import (
commonmetrics "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics"
client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
"sigs.k8s.io/apiserver-network-proxy/pkg/server/metrics"
"sigs.k8s.io/apiserver-network-proxy/pkg/server/proxystrategies"
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
"sigs.k8s.io/apiserver-network-proxy/proto/header"
)

type ProxyStrategy int

const (
// With this strategy the Proxy Server will randomly pick a backend from
// the current healthy backends to establish the tunnel over which to
// forward requests.
ProxyStrategyDefault ProxyStrategy = iota + 1
// With this strategy the Proxy Server will pick a backend that has the same
// associated host as the request.Host to establish the tunnel.
ProxyStrategyDestHost
// ProxyStrategyDefaultRoute will only forward traffic to agents that have explicity advertised
// they serve the default route through an agent identifier. Typically used in combination with destHost
ProxyStrategyDefaultRoute
)

func (ps ProxyStrategy) String() string {
switch ps {
case ProxyStrategyDefault:
return "default"
case ProxyStrategyDestHost:
return "destHost"
case ProxyStrategyDefaultRoute:
return "defaultRoute"
}
panic(fmt.Sprintf("unhandled ProxyStrategy: %d", ps))
}

func ParseProxyStrategy(s string) (ProxyStrategy, error) {
switch s {
case ProxyStrategyDefault.String():
return ProxyStrategyDefault, nil
case ProxyStrategyDestHost.String():
return ProxyStrategyDestHost, nil
case ProxyStrategyDefaultRoute.String():
return ProxyStrategyDefaultRoute, nil
default:
return 0, fmt.Errorf("unknown proxy strategy: %s", s)
}
}

// GenProxyStrategiesFromStr generates the list of proxy strategies from the
// comma-seperated string, i.e., destHost.
func ParseProxyStrategies(proxyStrategies string) ([]ProxyStrategy, error) {
var result []ProxyStrategy

strs := strings.Split(proxyStrategies, ",")
for _, s := range strs {
if len(s) == 0 {
continue
}
ps, err := ParseProxyStrategy(s)
if err != nil {
return nil, err
}
result = append(result, ps)
}
if len(result) == 0 {
return nil, fmt.Errorf("proxy strategies cannot be empty")
}
return result, nil
}

// Backend abstracts a connected Konnectivity agent.
//
// In the only currently supported case (gRPC), it wraps an
Expand Down Expand Up @@ -271,24 +209,31 @@ type DefaultBackendStorage struct {
// e.g., when associating to the DestHostBackendManager, it can only use the
// identifiers of types, IPv4, IPv6 and Host.
idTypes []header.IdentifierType
// proxyStrategy is the proxy strategy of the backend manager this storage
// belongs to.
// It is used to record metrics.
proxyStrategy proxystrategies.ProxyStrategy
}

// NewDefaultBackendManager returns a DefaultBackendManager.
func NewDefaultBackendManager() *DefaultBackendManager {
return &DefaultBackendManager{
DefaultBackendStorage: NewDefaultBackendStorage(
[]header.IdentifierType{header.UID})}
[]header.IdentifierType{header.UID}, proxystrategies.ProxyStrategyDefault)}
}

// NewDefaultBackendStorage returns a DefaultBackendStorage
func NewDefaultBackendStorage(idTypes []header.IdentifierType) *DefaultBackendStorage {
func NewDefaultBackendStorage(idTypes []header.IdentifierType, proxyStrategy proxystrategies.ProxyStrategy) *DefaultBackendStorage {
// Set an explicit value, so that the metric is emitted even when
// no agent ever successfully connects.
metrics.Metrics.SetBackendCount(0)
metrics.Metrics.SetBackendCountDeprecated(0)
metrics.Metrics.SetTotalBackendCount(proxyStrategy, 0)

return &DefaultBackendStorage{
backends: make(map[string][]*Backend),
random: rand.New(rand.NewSource(time.Now().UnixNano())), /* #nosec G404 */
idTypes: idTypes,
backends: make(map[string][]*Backend),
random: rand.New(rand.NewSource(time.Now().UnixNano())), /* #nosec G404 */
idTypes: idTypes,
proxyStrategy: proxyStrategy,
}
}

Expand Down Expand Up @@ -317,7 +262,8 @@ func (s *DefaultBackendStorage) addBackend(identifier string, idType header.Iden
return
}
s.backends[identifier] = []*Backend{backend}
metrics.Metrics.SetBackendCount(len(s.backends))
metrics.Metrics.SetBackendCountDeprecated(len(s.backends))
metrics.Metrics.SetTotalBackendCount(s.proxyStrategy, len(s.backends))
s.agentIDs = append(s.agentIDs, identifier)
}

Expand Down Expand Up @@ -358,7 +304,8 @@ func (s *DefaultBackendStorage) removeBackend(identifier string, idType header.I
if !found {
klog.V(1).InfoS("Could not find connection matching identifier to remove", "agentID", identifier, "idType", idType)
}
metrics.Metrics.SetBackendCount(len(s.backends))
metrics.Metrics.SetBackendCountDeprecated(len(s.backends))
metrics.Metrics.SetTotalBackendCount(s.proxyStrategy, len(s.backends))
}

// NumBackends resturns the number of available backends
Expand Down
122 changes: 0 additions & 122 deletions pkg/server/backend_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ package server

import (
"context"
"fmt"
"reflect"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
"google.golang.org/grpc/metadata"

Expand Down Expand Up @@ -385,123 +383,3 @@ func TestDestHostBackendManager_WithDuplicateIdents(t *testing.T) {
t.Errorf("expected %v, got %v", e, a)
}
}

func TestProxyStrategy(t *testing.T) {
for desc, tc := range map[string]struct {
input ProxyStrategy
want string
wantPanic string
}{
"default": {
input: ProxyStrategyDefault,
want: "default",
},
"destHost": {
input: ProxyStrategyDestHost,
want: "destHost",
},
"defaultRoute": {
input: ProxyStrategyDefaultRoute,
want: "defaultRoute",
},
"unrecognized": {
input: ProxyStrategy(0),
wantPanic: "unhandled ProxyStrategy: 0",
},
} {
t.Run(desc, func(t *testing.T) {
if tc.wantPanic != "" {
assert.PanicsWithValue(t, tc.wantPanic, func() {
_ = tc.input.String()
})
} else {
got := tc.input.String()
if got != tc.want {
t.Errorf("ProxyStrategy.String(): got %v, want %v", got, tc.want)
}
}
})
}
}

func TestParseProxyStrategy(t *testing.T) {
for desc, tc := range map[string]struct {
input string
want ProxyStrategy
wantErr error
}{
"empty": {
input: "",
wantErr: fmt.Errorf("unknown proxy strategy: "),
},
"unrecognized": {
input: "unrecognized",
wantErr: fmt.Errorf("unknown proxy strategy: unrecognized"),
},
"default": {
input: "default",
want: ProxyStrategyDefault,
},
"destHost": {
input: "destHost",
want: ProxyStrategyDestHost,
},
"defaultRoute": {
input: "defaultRoute",
want: ProxyStrategyDefaultRoute,
},
} {
t.Run(desc, func(t *testing.T) {
got, err := ParseProxyStrategy(tc.input)
assert.Equal(t, tc.wantErr, err, "ParseProxyStrategy(%s): got error %q, want %v", tc.input, err, tc.wantErr)
if got != tc.want {
t.Errorf("ParseProxyStrategy(%s): got %v, want %v", tc.input, got, tc.want)
}
})
}
}

func TestParseProxyStrategies(t *testing.T) {
for desc, tc := range map[string]struct {
input string
want []ProxyStrategy
wantErr error
}{
"empty": {
input: "",
wantErr: fmt.Errorf("proxy strategies cannot be empty"),
},
"unrecognized": {
input: "unrecognized",
wantErr: fmt.Errorf("unknown proxy strategy: unrecognized"),
},
"default": {
input: "default",
want: []ProxyStrategy{ProxyStrategyDefault},
},
"destHost": {
input: "destHost",
want: []ProxyStrategy{ProxyStrategyDestHost},
},
"defaultRoute": {
input: "defaultRoute",
want: []ProxyStrategy{ProxyStrategyDefaultRoute},
},
"duplicate": {
input: "destHost,defaultRoute,defaultRoute,default",
want: []ProxyStrategy{ProxyStrategyDestHost, ProxyStrategyDefaultRoute, ProxyStrategyDefaultRoute, ProxyStrategyDefault},
},
"multiple": {
input: "destHost,defaultRoute,default",
want: []ProxyStrategy{ProxyStrategyDestHost, ProxyStrategyDefaultRoute, ProxyStrategyDefault},
},
} {
t.Run(desc, func(t *testing.T) {
got, err := ParseProxyStrategies(tc.input)
assert.Equal(t, tc.wantErr, err, "ParseProxyStrategies(%s): got error %q, want %v", tc.input, err, tc.wantErr)
if !reflect.DeepEqual(got, tc.want) {
t.Errorf("ParseProxyStrategies(%s): got %v, want %v", tc.input, got, tc.want)
}
})
}
}
3 changes: 2 additions & 1 deletion pkg/server/default_route_backend_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"k8s.io/klog/v2"
"sigs.k8s.io/apiserver-network-proxy/pkg/server/proxystrategies"
"sigs.k8s.io/apiserver-network-proxy/proto/header"
)

Expand All @@ -32,7 +33,7 @@ var _ BackendManager = &DefaultRouteBackendManager{}
func NewDefaultRouteBackendManager() *DefaultRouteBackendManager {
return &DefaultRouteBackendManager{
DefaultBackendStorage: NewDefaultBackendStorage(
[]header.IdentifierType{header.DefaultRoute})}
[]header.IdentifierType{header.DefaultRoute}, proxystrategies.ProxyStrategyDefaultRoute)}
}

// Backend tries to get a backend that advertises default route, with random selection.
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/desthost_backend_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"k8s.io/klog/v2"
"sigs.k8s.io/apiserver-network-proxy/pkg/server/proxystrategies"
"sigs.k8s.io/apiserver-network-proxy/proto/header"
)

Expand All @@ -32,7 +33,7 @@ var _ BackendManager = &DestHostBackendManager{}
func NewDestHostBackendManager() *DestHostBackendManager {
return &DestHostBackendManager{
DefaultBackendStorage: NewDefaultBackendStorage(
[]header.IdentifierType{header.IPv4, header.IPv6, header.Host})}
[]header.IdentifierType{header.IPv4, header.IPv6, header.Host}, proxystrategies.ProxyStrategyDestHost)}
}

func (dibm *DestHostBackendManager) AddBackend(backend *Backend) {
Expand Down
Loading