Skip to content

Commit fddd19f

Browse files
fuziontechclaude
andauthored
feat: implement global concurrent connection limit and set default to CPUs * 2 (#222)
* feat: implement global concurrent connection limit and set default to CPUs * 2 * fix: increase MaxConnections in integration test harness * fix: apply rate limit defaults per-field to preserve partial config The all-or-nothing defaulting logic was silently overwriting the test harness's MaxConnections: 100 because MaxFailedAttempts was zero, causing CI to reject connections on low-CPU runners. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 291dad3 commit fddd19f

File tree

8 files changed

+112
-9
lines changed

8 files changed

+112
-9
lines changed

config_resolution.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type configCLIInputs struct {
3131
ACMEDomain string
3232
ACMEEmail string
3333
ACMECacheDir string
34+
MaxConnections int
3435
}
3536

3637
type resolvedConfig struct {
@@ -126,6 +127,9 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
126127
if fileCfg.RateLimit.MaxConnectionsPerIP > 0 {
127128
cfg.RateLimit.MaxConnectionsPerIP = fileCfg.RateLimit.MaxConnectionsPerIP
128129
}
130+
if fileCfg.RateLimit.MaxConnections > 0 {
131+
cfg.RateLimit.MaxConnections = fileCfg.RateLimit.MaxConnections
132+
}
129133
if fileCfg.RateLimit.FailedAttemptWindow != "" {
130134
if d, err := time.ParseDuration(fileCfg.RateLimit.FailedAttemptWindow); err == nil {
131135
cfg.RateLimit.FailedAttemptWindow = d
@@ -370,6 +374,13 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
370374
if v := getenv("DUCKGRES_ACME_CACHE_DIR"); v != "" {
371375
cfg.ACMECacheDir = v
372376
}
377+
if v := getenv("DUCKGRES_MAX_CONNECTIONS"); v != "" {
378+
if n, err := strconv.Atoi(v); err == nil {
379+
cfg.RateLimit.MaxConnections = n
380+
} else {
381+
warn("Invalid DUCKGRES_MAX_CONNECTIONS: " + err.Error())
382+
}
383+
}
373384

374385
if cli.Set["host"] {
375386
cfg.Host = cli.Host
@@ -454,6 +465,9 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
454465
if cli.Set["acme-cache-dir"] {
455466
cfg.ACMECacheDir = cli.ACMECacheDir
456467
}
468+
if cli.Set["max-connections"] {
469+
cfg.RateLimit.MaxConnections = cli.MaxConnections
470+
}
457471

458472
// Validate memory_limit format if explicitly set
459473
if cfg.MemoryLimit != "" && !server.ValidateMemoryLimit(cfg.MemoryLimit) {

controlplane/memory_rebalancer.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,11 +207,10 @@ func memoryLimit(budget uint64) uint64 {
207207
return budget
208208
}
209209

210-
// DefaultMaxWorkers returns a reasonable default for max_workers based on
211-
// the memory budget. This is a concurrency cap (budget / 256MB), not a
212-
// memory-safety guarantee — each session gets the full budget.
210+
// DefaultMaxWorkers returns a reasonable default for max_workers.
211+
// Defaults to number of CPUs * 2.
213212
func (r *MemoryRebalancer) DefaultMaxWorkers() int {
214-
return int(r.memoryBudget / minMemoryPerSession)
213+
return runtime.NumCPU() * 2
215214
}
216215

217216
// SetInitialLimits sets memory_limit and threads on a single session synchronously.

duckgres.example.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,3 +116,5 @@ rate_limit:
116116
ban_duration: "15m"
117117
# Max concurrent connections from a single IP (0 = unlimited)
118118
max_connections_per_ip: 100
119+
# Max total concurrent connections (0 = unlimited, default: CPUs * 2)
120+
# max_connections: 16

main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ type RateLimitFileConfig struct {
6262
FailedAttemptWindow string `yaml:"failed_attempt_window"` // e.g., "5m"
6363
BanDuration string `yaml:"ban_duration"` // e.g., "15m"
6464
MaxConnectionsPerIP int `yaml:"max_connections_per_ip"`
65+
MaxConnections int `yaml:"max_connections"`
6566
}
6667

6768
type DuckLakeFileConfig struct {
@@ -161,6 +162,9 @@ func main() {
161162
showVersion := flag.Bool("version", false, "Show version and exit")
162163
showHelp := flag.Bool("help", false, "Show help message")
163164

165+
// Rate limiting flags
166+
maxConnections := flag.Int("max-connections", 0, "Max concurrent connections, 0=unlimited (env: DUCKGRES_MAX_CONNECTIONS)")
167+
164168
// Control plane flags
165169
mode := flag.String("mode", "standalone", "Run mode: standalone, control-plane, or duckdb-service")
166170
minWorkers := flag.Int("min-workers", 0, "Pre-warm worker count at startup (control-plane mode) (env: DUCKGRES_MIN_WORKERS)")
@@ -206,6 +210,7 @@ func main() {
206210
fmt.Fprintf(os.Stderr, " DUCKGRES_ACME_DOMAIN Domain for ACME/Let's Encrypt certificate\n")
207211
fmt.Fprintf(os.Stderr, " DUCKGRES_ACME_EMAIL Contact email for Let's Encrypt notifications\n")
208212
fmt.Fprintf(os.Stderr, " DUCKGRES_ACME_CACHE_DIR Directory for ACME certificate cache\n")
213+
fmt.Fprintf(os.Stderr, " DUCKGRES_MAX_CONNECTIONS Max concurrent connections (default: CPUs * 2)\n")
209214
fmt.Fprintf(os.Stderr, " DUCKGRES_DUCKDB_LISTEN DuckDB service listen address (duckdb-service mode)\n")
210215
fmt.Fprintf(os.Stderr, " DUCKGRES_DUCKDB_TOKEN DuckDB service bearer token (duckdb-service mode)\n")
211216
fmt.Fprintf(os.Stderr, " DUCKGRES_DUCKDB_MAX_SESSIONS DuckDB service max sessions (duckdb-service mode)\n")
@@ -289,6 +294,7 @@ func main() {
289294
ACMEDomain: *acmeDomain,
290295
ACMEEmail: *acmeEmail,
291296
ACMECacheDir: *acmeCacheDir,
297+
MaxConnections: *maxConnections,
292298
}, os.Getenv, func(msg string) {
293299
slog.Warn(msg)
294300
})

server/ratelimit.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package server
22

33
import (
44
"net"
5+
"runtime"
56
"sync"
67
"time"
78
)
@@ -16,6 +17,8 @@ type RateLimitConfig struct {
1617
BanDuration time.Duration
1718
// MaxConnectionsPerIP is the max concurrent connections from a single IP (0 = unlimited)
1819
MaxConnectionsPerIP int
20+
// MaxConnections is the total max concurrent connections (0 = unlimited)
21+
MaxConnections int
1922
}
2023

2124
// DefaultRateLimitConfig returns sensible defaults for rate limiting
@@ -25,6 +28,7 @@ func DefaultRateLimitConfig() RateLimitConfig {
2528
FailedAttemptWindow: 5 * time.Minute,
2629
BanDuration: 15 * time.Minute,
2730
MaxConnectionsPerIP: 100,
31+
MaxConnections: runtime.NumCPU() * 2,
2832
}
2933
}
3034

@@ -37,9 +41,10 @@ type ipRecord struct {
3741

3842
// RateLimiter tracks and limits connections per IP
3943
type RateLimiter struct {
40-
mu sync.Mutex
41-
config RateLimitConfig
42-
records map[string]*ipRecord
44+
mu sync.Mutex
45+
config RateLimitConfig
46+
records map[string]*ipRecord
47+
totalActiveConns int
4348
}
4449

4550
// NewRateLimiter creates a new rate limiter with the given config
@@ -76,6 +81,11 @@ func (rl *RateLimiter) CheckConnection(addr net.Addr) string {
7681
rl.mu.Lock()
7782
defer rl.mu.Unlock()
7883

84+
// Check global connection limit
85+
if rl.config.MaxConnections > 0 && rl.totalActiveConns >= rl.config.MaxConnections {
86+
return "too many concurrent connections"
87+
}
88+
7989
record := rl.getOrCreateRecord(ip)
8090

8191
// Check if IP is banned
@@ -103,6 +113,11 @@ func (rl *RateLimiter) RegisterConnection(addr net.Addr) bool {
103113
rl.mu.Lock()
104114
defer rl.mu.Unlock()
105115

116+
// Check global connection limit
117+
if rl.config.MaxConnections > 0 && rl.totalActiveConns >= rl.config.MaxConnections {
118+
return false
119+
}
120+
106121
record := rl.getOrCreateRecord(ip)
107122

108123
// Check if banned
@@ -116,6 +131,7 @@ func (rl *RateLimiter) RegisterConnection(addr net.Addr) bool {
116131
}
117132

118133
record.activeConns++
134+
rl.totalActiveConns++
119135
return true
120136
}
121137

@@ -129,6 +145,11 @@ func (rl *RateLimiter) UnregisterConnection(addr net.Addr) {
129145
rl.mu.Lock()
130146
defer rl.mu.Unlock()
131147

148+
rl.totalActiveConns--
149+
if rl.totalActiveConns < 0 {
150+
rl.totalActiveConns = 0
151+
}
152+
132153
if record, ok := rl.records[ip]; ok {
133154
record.activeConns--
134155
if record.activeConns < 0 {

server/ratelimit_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package server
22

33
import (
44
"net"
5+
"runtime"
56
"testing"
67
"time"
78
)
@@ -316,6 +317,47 @@ func TestRateLimiter_UnlimitedConnections(t *testing.T) {
316317
}
317318
}
318319

320+
func TestRateLimiter_GlobalConnectionLimit(t *testing.T) {
321+
cfg := RateLimitConfig{
322+
MaxFailedAttempts: 5,
323+
FailedAttemptWindow: 1 * time.Minute,
324+
BanDuration: 5 * time.Minute,
325+
MaxConnectionsPerIP: 100,
326+
MaxConnections: 2,
327+
}
328+
rl := NewRateLimiter(cfg)
329+
330+
addr1 := mockAddr{"tcp", "192.168.1.1:5432"}
331+
addr2 := mockAddr{"tcp", "192.168.1.2:5432"}
332+
addr3 := mockAddr{"tcp", "192.168.1.3:5432"}
333+
334+
// Register 2 connections (at limit)
335+
if !rl.RegisterConnection(addr1) {
336+
t.Fatal("RegisterConnection(addr1) should succeed")
337+
}
338+
if !rl.RegisterConnection(addr2) {
339+
t.Fatal("RegisterConnection(addr2) should succeed")
340+
}
341+
342+
t.Run("exceeds global limit", func(t *testing.T) {
343+
if rl.RegisterConnection(addr3) {
344+
t.Error("RegisterConnection(addr3) should fail when at global limit")
345+
}
346+
347+
msg := rl.CheckConnection(addr3)
348+
if msg != "too many concurrent connections" {
349+
t.Errorf("unexpected error message: %s", msg)
350+
}
351+
})
352+
353+
t.Run("allows connection after unregister", func(t *testing.T) {
354+
rl.UnregisterConnection(addr1)
355+
if !rl.RegisterConnection(addr3) {
356+
t.Error("RegisterConnection(addr3) should succeed after unregistering addr1")
357+
}
358+
})
359+
}
360+
319361
func TestDefaultRateLimitConfig(t *testing.T) {
320362
cfg := DefaultRateLimitConfig()
321363

@@ -331,4 +373,7 @@ func TestDefaultRateLimitConfig(t *testing.T) {
331373
if cfg.MaxConnectionsPerIP != 100 {
332374
t.Errorf("MaxConnectionsPerIP = %d, want 100", cfg.MaxConnectionsPerIP)
333375
}
376+
if cfg.MaxConnections != runtime.NumCPU()*2 {
377+
t.Errorf("MaxConnections = %d, want %d", cfg.MaxConnections, runtime.NumCPU()*2)
378+
}
334379
}

server/server.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,22 @@ type Server struct {
239239
}
240240

241241
func New(cfg Config) (*Server, error) {
242-
// Use default rate limit config if not specified
242+
// Apply default rate limit config for any unset fields
243+
defaults := DefaultRateLimitConfig()
243244
if cfg.RateLimit.MaxFailedAttempts == 0 {
244-
cfg.RateLimit = DefaultRateLimitConfig()
245+
cfg.RateLimit.MaxFailedAttempts = defaults.MaxFailedAttempts
246+
}
247+
if cfg.RateLimit.FailedAttemptWindow == 0 {
248+
cfg.RateLimit.FailedAttemptWindow = defaults.FailedAttemptWindow
249+
}
250+
if cfg.RateLimit.BanDuration == 0 {
251+
cfg.RateLimit.BanDuration = defaults.BanDuration
252+
}
253+
if cfg.RateLimit.MaxConnectionsPerIP == 0 {
254+
cfg.RateLimit.MaxConnectionsPerIP = defaults.MaxConnectionsPerIP
255+
}
256+
if cfg.RateLimit.MaxConnections == 0 {
257+
cfg.RateLimit.MaxConnections = defaults.MaxConnections
245258
}
246259

247260
// Use default shutdown timeout if not specified

tests/integration/harness.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ func (h *TestHarness) startDuckgres(harnessCfg HarnessConfig) error {
133133
"testuser": "testpass",
134134
},
135135
Extensions: []string{"ducklake"},
136+
RateLimit: server.RateLimitConfig{
137+
MaxConnections: 100,
138+
},
136139
}
137140

138141
// Configure DuckLake if enabled

0 commit comments

Comments
 (0)