Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions config_resolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ type configCLIInputs struct {
MemoryRebalance bool
MaxWorkers int
MinWorkers int
WorkerQueueTimeout string
ACMEDomain string
ACMEEmail string
ACMECacheDir string
MaxConnections int
}

type resolvedConfig struct {
Server server.Config
Server server.Config
WorkerQueueTimeout time.Duration
}

func defaultServerConfig() server.Config {
Expand Down Expand Up @@ -69,6 +71,7 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
}

cfg := defaultServerConfig()
var workerQueueTimeout time.Duration

if fileCfg != nil {
if fileCfg.Host != "" {
Expand Down Expand Up @@ -210,6 +213,13 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
if fileCfg.MinWorkers != 0 {
cfg.MinWorkers = fileCfg.MinWorkers
}
if fileCfg.WorkerQueueTimeout != "" {
if d, err := time.ParseDuration(fileCfg.WorkerQueueTimeout); err == nil {
workerQueueTimeout = d
} else {
warn("Invalid worker_queue_timeout duration: " + err.Error())
}
}
if len(fileCfg.PassthroughUsers) > 0 {
cfg.PassthroughUsers = make(map[string]bool, len(fileCfg.PassthroughUsers))
for _, u := range fileCfg.PassthroughUsers {
Expand Down Expand Up @@ -365,6 +375,13 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
warn("Invalid DUCKGRES_MAX_WORKERS: " + err.Error())
}
}
if v := getenv("DUCKGRES_WORKER_QUEUE_TIMEOUT"); v != "" {
if d, err := time.ParseDuration(v); err == nil {
workerQueueTimeout = d
} else {
warn("Invalid DUCKGRES_WORKER_QUEUE_TIMEOUT duration: " + err.Error())
}
}
if v := getenv("DUCKGRES_ACME_DOMAIN"); v != "" {
cfg.ACMEDomain = v
}
Expand Down Expand Up @@ -456,6 +473,13 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
if cli.Set["max-workers"] {
cfg.MaxWorkers = cli.MaxWorkers
}
if cli.Set["worker-queue-timeout"] {
if d, err := time.ParseDuration(cli.WorkerQueueTimeout); err == nil {
workerQueueTimeout = d
} else {
warn("Invalid --worker-queue-timeout duration: " + err.Error())
}
}
if cli.Set["acme-domain"] {
cfg.ACMEDomain = cli.ACMEDomain
}
Expand All @@ -482,6 +506,7 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
}

return resolvedConfig{
Server: cfg,
Server: cfg,
WorkerQueueTimeout: workerQueueTimeout,
}
}
29 changes: 19 additions & 10 deletions controlplane/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ControlPlaneConfig struct {
ConfigPath string // Path to config file, passed to workers
HandoverSocket string
HealthCheckInterval time.Duration
WorkerQueueTimeout time.Duration // How long to wait for an available worker slot (default: 5m)
}

// ControlPlane manages the TCP listener and routes connections to Flight SQL workers.
Expand Down Expand Up @@ -62,6 +63,9 @@ func RunControlPlane(cfg ControlPlaneConfig) {
if cfg.HealthCheckInterval == 0 {
cfg.HealthCheckInterval = 2 * time.Second
}
if cfg.WorkerQueueTimeout == 0 {
cfg.WorkerQueueTimeout = 5 * time.Minute
}

// Enforce secure defaults for control-plane mode.
if err := validateControlPlaneSecurity(cfg); err != nil {
Expand Down Expand Up @@ -153,7 +157,6 @@ func RunControlPlane(cfg ControlPlaneConfig) {
// Lock ordering invariant: rebalancer.mu → sm.mu(RLock). Never acquire
// rebalancer.mu while holding sm.mu to avoid deadlock.
rebalancer.SetSessionLister(sessions)
pool.SetSessionCounter(sessions)

cp := &ControlPlane{
cfg: cfg,
Expand All @@ -171,10 +174,11 @@ func RunControlPlane(cfg ControlPlaneConfig) {
// creation races between pre-warm and first external Flight requests.
if cfg.FlightPort > 0 {
flightIngress, err := NewFlightIngress(cfg.Host, cfg.FlightPort, tlsCfg, cfg.Users, sessions, cp.rateLimiter, FlightIngressConfig{
SessionIdleTTL: cfg.FlightSessionIdleTTL,
SessionReapTick: cfg.FlightSessionReapInterval,
HandleIdleTTL: cfg.FlightHandleIdleTTL,
SessionTokenTTL: cfg.FlightSessionTokenTTL,
SessionIdleTTL: cfg.FlightSessionIdleTTL,
SessionReapTick: cfg.FlightSessionReapInterval,
HandleIdleTTL: cfg.FlightHandleIdleTTL,
SessionTokenTTL: cfg.FlightSessionTokenTTL,
WorkerQueueTimeout: cfg.WorkerQueueTimeout,
})
if err != nil {
slog.Error("Failed to initialize Flight ingress.", "error", err)
Expand Down Expand Up @@ -261,6 +265,7 @@ func RunControlPlane(cfg ControlPlaneConfig) {
"flight_addr", cp.flightAddr(),
"min_workers", minWorkers,
"max_workers", maxWorkers,
"worker_queue_timeout", cfg.WorkerQueueTimeout,
"memory_budget", formatBytes(rebalancer.memoryBudget),
"memory_rebalance", cfg.MemoryRebalance)

Expand Down Expand Up @@ -469,9 +474,11 @@ func (cp *ControlPlane) handleConnection(conn net.Conn) {
server.RecordSuccessfulAuthAttempt(cp.rateLimiter, remoteAddr)
slog.Info("User authenticated.", "user", username, "remote_addr", remoteAddr)

// Create session on a worker
ctx := context.Background()
// Create session on a worker. The timeout controls how long we wait in the
// worker queue when all slots are occupied.
ctx, cancel := context.WithTimeout(context.Background(), cp.cfg.WorkerQueueTimeout)
pid, executor, err := cp.sessions.CreateSession(ctx, username)
cancel()
if err != nil {
slog.Error("Failed to create session.", "user", username, "remote_addr", remoteAddr, "error", err)
_ = server.WriteErrorResponse(writer, "FATAL", "53300", "too many connections")
Expand Down Expand Up @@ -732,9 +739,11 @@ func (cp *ControlPlane) recoverFlightIngressAfterFailedReload() {
}

flightIngress, err := NewFlightIngress(cp.cfg.Host, cp.cfg.FlightPort, cp.tlsConfig, cp.cfg.Users, cp.sessions, cp.rateLimiter, FlightIngressConfig{
SessionIdleTTL: cp.cfg.FlightSessionIdleTTL,
SessionReapTick: cp.cfg.FlightSessionReapInterval,
HandleIdleTTL: cp.cfg.FlightHandleIdleTTL,
SessionIdleTTL: cp.cfg.FlightSessionIdleTTL,
SessionReapTick: cp.cfg.FlightSessionReapInterval,
HandleIdleTTL: cp.cfg.FlightHandleIdleTTL,
SessionTokenTTL: cp.cfg.FlightSessionTokenTTL,
WorkerQueueTimeout: cp.cfg.WorkerQueueTimeout,
})
if err != nil {
slog.Error("Failed to recover Flight ingress after reload failure.", "error", err)
Expand Down
5 changes: 0 additions & 5 deletions controlplane/flight_ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package controlplane

import (
"crypto/tls"
"errors"

"github.com/posthog/duckgres/server"
"github.com/posthog/duckgres/server/flightsqlingress"
Expand All @@ -15,14 +14,10 @@ type FlightIngress = flightsqlingress.FlightIngress
// NewFlightIngress creates a control-plane Flight SQL ingress listener.
func NewFlightIngress(host string, port int, tlsConfig *tls.Config, users map[string]string, sm *SessionManager, rateLimiter *server.RateLimiter, cfg FlightIngressConfig) (*FlightIngress, error) {
return flightsqlingress.NewFlightIngress(host, port, tlsConfig, users, sm, cfg, flightsqlingress.Options{
IsMaxWorkersError: func(err error) bool {
return errors.Is(err, ErrMaxWorkersReached)
},
RateLimiter: rateLimiter,
Hooks: flightsqlingress.Hooks{
OnSessionCountChanged: observeFlightAuthSessions,
OnSessionsReaped: observeFlightSessionsReaped,
OnMaxWorkersRetry: observeFlightMaxWorkersRetry,
},
})
}
9 changes: 0 additions & 9 deletions controlplane/flight_ingress_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ var flightSessionsReapedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Help: "Number of Flight auth sessions reaped",
}, []string{"trigger"})

var flightMaxWorkersRetryCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "duckgres_flight_max_workers_retry_total",
Help: "Number of max-worker retry outcomes when creating Flight auth sessions",
}, []string{"outcome"})

func observeFlightAuthSessions(count int) {
if count < 0 {
count = 0
Expand All @@ -43,7 +38,3 @@ func observeFlightSessionsReaped(trigger string, count int) {
}
flightSessionsReapedCounter.WithLabelValues(trigger).Add(float64(count))
}

func observeFlightMaxWorkersRetry(outcome string) {
flightMaxWorkersRetryCounter.WithLabelValues(outcome).Inc()
}
4 changes: 2 additions & 2 deletions controlplane/memory_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ func memoryLimit(budget uint64) uint64 {
}

// DefaultMaxWorkers returns a reasonable default for max_workers.
// Defaults to number of CPUs * 2.
// Derived from the memory budget (budget / 256MB).
func (r *MemoryRebalancer) DefaultMaxWorkers() int {
return runtime.NumCPU() * 2
return int(r.memoryBudget / minMemoryPerSession)
}

// SetInitialLimits sets memory_limit and threads on a single session synchronously.
Expand Down
7 changes: 4 additions & 3 deletions controlplane/session_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,16 @@ func NewSessionManager(pool *FlightWorkerPool, rebalancer *MemoryRebalancer) *Se
// creates a session on it, and rebalances memory/thread limits across all active sessions.
func (sm *SessionManager) CreateSession(ctx context.Context, username string) (int32, *server.FlightExecutor, error) {
// Acquire a worker: reuses idle pre-warmed workers or spawns a new one.
// Max-workers check is atomic inside AcquireWorker to prevent TOCTOU races.
worker, err := sm.pool.AcquireWorker()
// When max-workers is set, this blocks until a slot is available.
worker, err := sm.pool.AcquireWorker(ctx)
if err != nil {
return 0, nil, fmt.Errorf("acquire worker: %w", err)
}

sessionToken, err := worker.CreateSession(ctx, username)
if err != nil {
// Clean up the worker we just spawned (but not if it was a pre-warmed idle worker)
// Clean up the worker we just spawned (but not if it was a pre-warmed idle worker
// that has sessions from other concurrent requests).
sm.pool.RetireWorkerIfNoSessions(worker.ID)
return 0, nil, fmt.Errorf("create session on worker %d: %w", worker.ID, err)
}
Expand Down
Loading