Skip to content

Commit d0471d8

Browse files
committed
review comments, test XfrChannelSize validation
Signed-off-by: Karol Szwaj <[email protected]>
1 parent 8ace398 commit d0471d8

File tree

7 files changed

+40
-20
lines changed

7 files changed

+40
-20
lines changed

cmd/agent/app/options/options.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ type GrpcProxyAgentOptions struct {
7979
WarnOnChannelLimit bool
8080

8181
SyncForever bool
82-
XrfChannelSize int
82+
XfrChannelSize uint
8383
}
8484

8585
func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig {
@@ -94,7 +94,7 @@ func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption)
9494
ServiceAccountTokenPath: o.ServiceAccountTokenPath,
9595
WarnOnChannelLimit: o.WarnOnChannelLimit,
9696
SyncForever: o.SyncForever,
97-
XrfChannelSize: o.XrfChannelSize,
97+
XfrChannelSize: o.XfrChannelSize,
9898
}
9999
}
100100

@@ -121,7 +121,7 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
121121
flags.StringVar(&o.AgentIdentifiers, "agent-identifiers", o.AgentIdentifiers, "Identifiers of the agent that will be used by the server when choosing agent. N.B. the list of identifiers must be in URL encoded format. e.g.,host=localhost&host=node1.mydomain.com&cidr=127.0.0.1/16&ipv4=1.2.3.4&ipv4=5.6.7.8&ipv6=:::::&default-route=true")
122122
flags.BoolVar(&o.WarnOnChannelLimit, "warn-on-channel-limit", o.WarnOnChannelLimit, "Turns on a warning if the system is going to push to a full channel. The check involves an unsafe read.")
123123
flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "If true, the agent continues syncing, in order to support server count changes.")
124-
flags.IntVar(&o.XrfChannelSize, "channel-size", 150, "Set the size of the channel")
124+
flags.UintVar(&o.XfrChannelSize, "xfr-channel-size", 150, "Set the size of the channel")
125125
return flags
126126
}
127127

@@ -147,7 +147,7 @@ func (o *GrpcProxyAgentOptions) Print() {
147147
klog.V(1).Infof("AgentIdentifiers set to %s.\n", util.PrettyPrintURL(o.AgentIdentifiers))
148148
klog.V(1).Infof("WarnOnChannelLimit set to %t.\n", o.WarnOnChannelLimit)
149149
klog.V(1).Infof("SyncForever set to %v.\n", o.SyncForever)
150-
klog.V(1).Infof("ChannelSize set to %d.\n", o.XrfChannelSize)
150+
klog.V(1).Infof("ChannelSize set to %d.\n", o.XfrChannelSize)
151151
}
152152

153153
func (o *GrpcProxyAgentOptions) Validate() error {
@@ -181,8 +181,8 @@ func (o *GrpcProxyAgentOptions) Validate() error {
181181
if o.AdminServerPort <= 0 {
182182
return fmt.Errorf("admin server port %d must be greater than 0", o.AdminServerPort)
183183
}
184-
if o.XrfChannelSize <= 0 {
185-
return fmt.Errorf("channel size %d must be greater than 0", o.XrfChannelSize)
184+
if o.XfrChannelSize <= 0 {
185+
return fmt.Errorf("channel size %d must be greater than 0", o.XfrChannelSize)
186186
}
187187
if o.EnableContentionProfiling && !o.EnableProfiling {
188188
return fmt.Errorf("if --enable-contention-profiling is set, --enable-profiling must also be set")
@@ -242,7 +242,7 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
242242
ServiceAccountTokenPath: "",
243243
WarnOnChannelLimit: false,
244244
SyncForever: false,
245-
XrfChannelSize: 150,
245+
XfrChannelSize: 150,
246246
}
247247
return &o
248248
}

cmd/agent/app/options/options_test.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ package options
1818

1919
import (
2020
"fmt"
21-
"github.com/stretchr/testify/assert"
2221
"reflect"
2322
"testing"
2423
"time"
24+
25+
"github.com/stretchr/testify/assert"
2526
)
2627

2728
/*
@@ -49,6 +50,7 @@ func TestDefaultServerOptions(t *testing.T) {
4950
assertDefaultValue(t, "ServiceAccountTokenPath", defaultAgentOptions.ServiceAccountTokenPath, "")
5051
assertDefaultValue(t, "WarnOnChannelLimit", defaultAgentOptions.WarnOnChannelLimit, false)
5152
assertDefaultValue(t, "SyncForever", defaultAgentOptions.SyncForever, false)
53+
assertDefaultValue(t, "XfrChannelSize", defaultAgentOptions.XfrChannelSize, uint(150))
5254
}
5355

5456
func assertDefaultValue(t *testing.T, fieldName string, actual, expected interface{}) {
@@ -145,6 +147,10 @@ func TestValidate(t *testing.T) {
145147
},
146148
expected: fmt.Errorf("if --enable-contention-profiling is set, --enable-profiling must also be set"),
147149
},
150+
"ZeroXfrChannelSize": {
151+
fieldMap: map[string]interface{}{"XfrChannelSize": uint(0)},
152+
expected: fmt.Errorf("channel size 0 must be greater than 0"),
153+
},
148154
} {
149155
t.Run(desc, func(t *testing.T) {
150156
testAgentOptions := NewGrpcProxyAgentOptions()
@@ -162,6 +168,9 @@ func TestValidate(t *testing.T) {
162168
case reflect.Bool:
163169
bvalue := value.(bool)
164170
fv.SetBool(bvalue)
171+
case reflect.Uint:
172+
uvalue := value.(uint)
173+
fv.SetUint(uint64(uvalue))
165174
}
166175
}
167176
actual := testAgentOptions.Validate()

cmd/server/app/options/options.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ type ProxyRunOptions struct {
101101
// NOTE that cipher suites are not configurable for TLS1.3,
102102
// see: https://pkg.go.dev/crypto/tls#Config, so in that case, this option won't have any effect.
103103
CipherSuites []string
104-
XrfChannelSize int
104+
XfrChannelSize uint
105105
}
106106

107107
func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
@@ -137,7 +137,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
137137
flags.StringVar(&o.AuthenticationAudience, "authentication-audience", o.AuthenticationAudience, "Expected agent's token authentication audience (used with agent-namespace, agent-service-account, kubeconfig).")
138138
flags.StringVar(&o.ProxyStrategies, "proxy-strategies", o.ProxyStrategies, "The list of proxy strategies used by the server to pick an agent/tunnel, available strategies are: default, destHost, defaultRoute.")
139139
flags.StringSliceVar(&o.CipherSuites, "cipher-suites", o.CipherSuites, "The comma separated list of allowed cipher suites. Has no effect on TLS1.3. Empty means allow default list.")
140-
flags.IntVar(&o.XrfChannelSize, "xfr-channel-size", o.XrfChannelSize, "The size of the channel for transferring data between the proxy server and the agent.")
140+
flags.UintVar(&o.XfrChannelSize, "xfr-channel-size", o.XfrChannelSize, "The size of the channel for transferring data between the proxy server and the agent.")
141141

142142
flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.")
143143
flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.")
@@ -177,7 +177,7 @@ func (o *ProxyRunOptions) Print() {
177177
klog.V(1).Infof("KubeconfigBurst set to %d.\n", o.KubeconfigBurst)
178178
klog.V(1).Infof("ProxyStrategies set to %q.\n", o.ProxyStrategies)
179179
klog.V(1).Infof("CipherSuites set to %q.\n", o.CipherSuites)
180-
klog.V(1).Infof("XrfChannelSize set to %d.\n", o.XrfChannelSize)
180+
klog.V(1).Infof("XfrChannelSize set to %d.\n", o.XfrChannelSize)
181181
}
182182

183183
func (o *ProxyRunOptions) Validate() error {
@@ -300,8 +300,8 @@ func (o *ProxyRunOptions) Validate() error {
300300
if _, err := server.ParseProxyStrategies(o.ProxyStrategies); err != nil {
301301
return fmt.Errorf("invalid proxy strategies: %v", err)
302302
}
303-
if o.XrfChannelSize <= 0 {
304-
return fmt.Errorf("channel size %d must be greater than 0", o.XrfChannelSize)
303+
if o.XfrChannelSize <= 0 {
304+
return fmt.Errorf("channel size %d must be greater than 0", o.XfrChannelSize)
305305
}
306306
// validate the cipher suites
307307
if len(o.CipherSuites) != 0 {
@@ -350,7 +350,7 @@ func NewProxyRunOptions() *ProxyRunOptions {
350350
AuthenticationAudience: "",
351351
ProxyStrategies: "default",
352352
CipherSuites: make([]string, 0),
353-
XrfChannelSize: 10,
353+
XfrChannelSize: 10,
354354
}
355355
return &o
356356
}

cmd/server/app/options/options_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ func TestDefaultServerOptions(t *testing.T) {
6161
assertDefaultValue(t, "AuthenticationAudience", defaultServerOptions.AuthenticationAudience, "")
6262
assertDefaultValue(t, "ProxyStrategies", defaultServerOptions.ProxyStrategies, "default")
6363
assertDefaultValue(t, "CipherSuites", defaultServerOptions.CipherSuites, make([]string, 0))
64+
assertDefaultValue(t, "XfrChannelSize", defaultServerOptions.XfrChannelSize, uint(10))
65+
6466
}
6567

6668
func assertDefaultValue(t *testing.T, fieldName string, actual, expected interface{}) {
@@ -155,6 +157,11 @@ func TestValidate(t *testing.T) {
155157
value: "invalid",
156158
expected: fmt.Errorf("invalid proxy strategies: unknown proxy strategy: invalid"),
157159
},
160+
"ZeroXfrChannelSize": {
161+
field: "XfrChannelSize",
162+
value: uint(0),
163+
expected: fmt.Errorf("channel size 0 must be greater than 0"),
164+
},
158165
} {
159166
t.Run(desc, func(t *testing.T) {
160167
testServerOptions := NewProxyRunOptions()
@@ -171,6 +178,9 @@ func TestValidate(t *testing.T) {
171178
case reflect.Int:
172179
ivalue := tc.value.(int)
173180
fv.SetInt(int64(ivalue))
181+
case reflect.Uint:
182+
uvalue := tc.value.(uint)
183+
fv.SetUint(uint64(uvalue))
174184
}
175185
}
176186
actual := testServerOptions.Validate()

cmd/server/app/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
132132
if err != nil {
133133
return err
134134
}
135-
p.server = server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt, o.XrfChannelSize)
135+
p.server = server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt, o.XfrChannelSize)
136136

137137
frontendStop, err := p.runFrontendServer(ctx, o, p.server)
138138
if err != nil {

pkg/agent/clientset.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ type ClientSet struct {
6161
// by the server when choosing agent
6262

6363
warnOnChannelLimit bool
64-
xfrChannelSize int
64+
xfrChannelSize uint
6565

6666
syncForever bool // Continue syncing (support dynamic server count).
6767
}
@@ -142,7 +142,7 @@ type ClientSetConfig struct {
142142
ServiceAccountTokenPath string
143143
WarnOnChannelLimit bool
144144
SyncForever bool
145-
XrfChannelSize int
145+
XfrChannelSize uint
146146
}
147147

148148
func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *ClientSet {
@@ -160,6 +160,7 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C
160160
syncForever: cc.SyncForever,
161161
drainCh: drainCh,
162162
xfrChannelSize: cc.XrfChannelSize,
163+
xfrChannelSize: cc.XfrChannelSize,
163164
stopCh: stopCh,
164165
}
165166
}

pkg/server/server.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ type ProxyServer struct {
216216

217217
// TODO: move strategies into BackendStorage
218218
proxyStrategies []ProxyStrategy
219-
xfrChannelSize int
219+
xfrChannelSize uint
220220
}
221221

222222
// AgentTokenAuthenticationOptions contains list of parameters required for agent token based authentication
@@ -375,7 +375,7 @@ func (s *ProxyServer) removeEstablishedForStream(streamUID string) []*ProxyClien
375375
}
376376

377377
// NewProxyServer creates a new ProxyServer instance
378-
func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions, channelSize int) *ProxyServer {
378+
func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions, channelSize uint) *ProxyServer {
379379
var bms []BackendManager
380380
for _, ps := range proxyStrategies {
381381
switch ps {
@@ -417,7 +417,7 @@ func (s *ProxyServer) Proxy(stream client.ProxyService_ProxyServer) error {
417417
streamUID := uuid.New().String()
418418
klog.V(5).InfoS("Proxy request from client", "userAgent", userAgent, "serverID", s.serverID, "streamUID", streamUID)
419419

420-
recvCh := make(chan *client.Packet, s.xfrChannelSize)
420+
recvCh := make(chan *client.Packet, int(s.xfrChannelSize))
421421
stopCh := make(chan error, 1)
422422

423423
frontend := GrpcFrontend{

0 commit comments

Comments
 (0)