Skip to content

Commit 8c2eda1

Browse files
committed
TUN-8861: Add configuration for active sessions limiter
## Summary This commit adds a new configuration in the warp routing config to allow users to define the active sessions limit value.
1 parent 8bfe111 commit 8c2eda1

File tree

3 files changed

+18
-3
lines changed

3 files changed

+18
-3
lines changed

config/configuration.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func FindOrCreateConfigPath() string {
155155
// i.e. it fails if a user specifies both --url and --unix-socket
156156
func ValidateUnixSocket(c *cli.Context) (string, error) {
157157
if c.IsSet("unix-socket") && (c.IsSet("url") || c.NArg() > 0) {
158-
return "", errors.New("--unix-socket must be used exclusivly.")
158+
return "", errors.New("--unix-socket must be used exclusively.")
159159
}
160160
return c.String("unix-socket"), nil
161161
}
@@ -260,6 +260,7 @@ type Configuration struct {
260260

261261
type WarpRoutingConfig struct {
262262
ConnectTimeout *CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"`
263+
MaxActiveFlows *uint64 `yaml:"maxActiveFlows" json:"maxActiveFlows,omitempty"`
263264
TCPKeepAlive *CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"`
264265
}
265266

ingress/config.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ var (
2222
const (
2323
defaultProxyAddress = "127.0.0.1"
2424
defaultKeepAliveConnections = 100
25+
defaultMaxActiveFlows = 0 // unlimited
2526
SSHServerFlag = "ssh-server"
2627
Socks5Flag = "socks5"
2728
ProxyConnectTimeoutFlag = "proxy-connect-timeout"
@@ -46,17 +47,22 @@ const (
4647

4748
type WarpRoutingConfig struct {
4849
ConnectTimeout config.CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"`
50+
MaxActiveFlows uint64 `yaml:"maxActiveFlows" json:"MaxActiveFlows,omitempty"`
4951
TCPKeepAlive config.CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"`
5052
}
5153

5254
func NewWarpRoutingConfig(raw *config.WarpRoutingConfig) WarpRoutingConfig {
5355
cfg := WarpRoutingConfig{
5456
ConnectTimeout: defaultWarpRoutingConnectTimeout,
57+
MaxActiveFlows: defaultMaxActiveFlows,
5558
TCPKeepAlive: defaultTCPKeepAlive,
5659
}
5760
if raw.ConnectTimeout != nil {
5861
cfg.ConnectTimeout = *raw.ConnectTimeout
5962
}
63+
if raw.MaxActiveFlows != nil {
64+
cfg.MaxActiveFlows = *raw.MaxActiveFlows
65+
}
6066
if raw.TCPKeepAlive != nil {
6167
cfg.TCPKeepAlive = *raw.TCPKeepAlive
6268
}
@@ -68,6 +74,9 @@ func (c *WarpRoutingConfig) RawConfig() config.WarpRoutingConfig {
6874
if c.ConnectTimeout.Duration != defaultWarpRoutingConnectTimeout.Duration {
6975
raw.ConnectTimeout = &c.ConnectTimeout
7076
}
77+
if c.MaxActiveFlows != defaultMaxActiveFlows {
78+
raw.MaxActiveFlows = &c.MaxActiveFlows
79+
}
7180
if c.TCPKeepAlive.Duration != defaultTCPKeepAlive.Duration {
7281
raw.TCPKeepAlive = &c.TCPKeepAlive
7382
}
@@ -172,6 +181,7 @@ func originRequestFromSingleRule(c *cli.Context) OriginRequestConfig {
172181
}
173182
if flag := ProxyPortFlag; c.IsSet(flag) {
174183
// Note TUN-3758 , we use Int because UInt is not supported with altsrc
184+
// nolint: gosec
175185
proxyPort = uint(c.Int(flag))
176186
}
177187
if flag := Http2OriginFlag; c.IsSet(flag) {
@@ -551,7 +561,7 @@ func convertToRawIPRules(ipRules []ipaccess.Rule) []config.IngressIPRule {
551561
}
552562

553563
func defaultBoolToNil(b bool) *bool {
554-
if b == false {
564+
if !b {
555565
return nil
556566
}
557567

orchestration/orchestrator.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func NewOrchestrator(ctx context.Context,
5858
internalRules: internalRules,
5959
config: config,
6060
tags: tags,
61-
sessionLimiter: cfdsession.NewLimiter(0),
61+
sessionLimiter: cfdsession.NewLimiter(config.WarpRouting.MaxActiveFlows),
6262
log: log,
6363
shutdownC: ctx.Done(),
6464
}
@@ -141,6 +141,10 @@ func (o *Orchestrator) updateIngress(ingressRules ingress.Ingress, warpRouting i
141141
if err := ingressRules.StartOrigins(o.log, proxyShutdownC); err != nil {
142142
return errors.Wrap(err, "failed to start origin")
143143
}
144+
145+
// Update the sessions limit since the configuration might have changed
146+
o.sessionLimiter.SetLimit(warpRouting.MaxActiveFlows)
147+
144148
proxy := proxy.NewOriginProxy(ingressRules, warpRouting, o.tags, o.sessionLimiter, o.config.WriteTimeout, o.log)
145149
o.proxy.Store(proxy)
146150
o.config.Ingress = &ingressRules

0 commit comments

Comments
 (0)