Skip to content

Commit f45de32

Browse files
fuziontechclaude
andcommitted
Queue connections for worker slots instead of rejecting immediately
Replace the "reject immediately" pattern when max_workers is reached with a blocking semaphore in FlightWorkerPool. A buffered channel of size maxWorkers acts as both a concurrency limiter and a FIFO queue. Clients now complete TLS + auth, then block at the ReadyForQuery stage until a worker slot becomes available (or the queue timeout expires). This eliminates retry storms from clients like Fivetran that reconnect aggressively on "too many connections" errors. Key changes: - AcquireWorker(ctx) blocks on semaphore instead of returning ErrMaxWorkersReached - RetireWorker/HealthCheckLoop crash paths release semaphore slots - ShutdownAll closes shutdownCh to unblock all queued waiters - New --worker-queue-timeout flag (default 30s) controls queue wait time - Remove dead reap-and-retry code from Flight ingress (GetOrCreate) - Remove MaxWorkersRetry Prometheus metrics and hooks Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent e81aeda commit f45de32

File tree

10 files changed

+364
-284
lines changed

10 files changed

+364
-284
lines changed

config_resolution.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,16 @@ type configCLIInputs struct {
2828
MemoryRebalance bool
2929
MaxWorkers int
3030
MinWorkers int
31+
WorkerQueueTimeout string
3132
ACMEDomain string
3233
ACMEEmail string
3334
ACMECacheDir string
3435
MaxConnections int
3536
}
3637

3738
type resolvedConfig struct {
38-
Server server.Config
39+
Server server.Config
40+
WorkerQueueTimeout time.Duration
3941
}
4042

4143
func defaultServerConfig() server.Config {
@@ -69,6 +71,7 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
6971
}
7072

7173
cfg := defaultServerConfig()
74+
var workerQueueTimeout time.Duration
7275

7376
if fileCfg != nil {
7477
if fileCfg.Host != "" {
@@ -210,6 +213,13 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
210213
if fileCfg.MinWorkers != 0 {
211214
cfg.MinWorkers = fileCfg.MinWorkers
212215
}
216+
if fileCfg.WorkerQueueTimeout != "" {
217+
if d, err := time.ParseDuration(fileCfg.WorkerQueueTimeout); err == nil {
218+
workerQueueTimeout = d
219+
} else {
220+
warn("Invalid worker_queue_timeout duration: " + err.Error())
221+
}
222+
}
213223
if len(fileCfg.PassthroughUsers) > 0 {
214224
cfg.PassthroughUsers = make(map[string]bool, len(fileCfg.PassthroughUsers))
215225
for _, u := range fileCfg.PassthroughUsers {
@@ -365,6 +375,13 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
365375
warn("Invalid DUCKGRES_MAX_WORKERS: " + err.Error())
366376
}
367377
}
378+
if v := getenv("DUCKGRES_WORKER_QUEUE_TIMEOUT"); v != "" {
379+
if d, err := time.ParseDuration(v); err == nil {
380+
workerQueueTimeout = d
381+
} else {
382+
warn("Invalid DUCKGRES_WORKER_QUEUE_TIMEOUT duration: " + err.Error())
383+
}
384+
}
368385
if v := getenv("DUCKGRES_ACME_DOMAIN"); v != "" {
369386
cfg.ACMEDomain = v
370387
}
@@ -456,6 +473,13 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
456473
if cli.Set["max-workers"] {
457474
cfg.MaxWorkers = cli.MaxWorkers
458475
}
476+
if cli.Set["worker-queue-timeout"] {
477+
if d, err := time.ParseDuration(cli.WorkerQueueTimeout); err == nil {
478+
workerQueueTimeout = d
479+
} else {
480+
warn("Invalid --worker-queue-timeout duration: " + err.Error())
481+
}
482+
}
459483
if cli.Set["acme-domain"] {
460484
cfg.ACMEDomain = cli.ACMEDomain
461485
}
@@ -482,6 +506,7 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
482506
}
483507

484508
return resolvedConfig{
485-
Server: cfg,
509+
Server: cfg,
510+
WorkerQueueTimeout: workerQueueTimeout,
486511
}
487512
}

controlplane/control.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type ControlPlaneConfig struct {
2828
ConfigPath string // Path to config file, passed to workers
2929
HandoverSocket string
3030
HealthCheckInterval time.Duration
31+
WorkerQueueTimeout time.Duration // How long to wait for an available worker slot (default: 30s)
3132
}
3233

3334
// ControlPlane manages the TCP listener and routes connections to Flight SQL workers.
@@ -62,6 +63,9 @@ func RunControlPlane(cfg ControlPlaneConfig) {
6263
if cfg.HealthCheckInterval == 0 {
6364
cfg.HealthCheckInterval = 2 * time.Second
6465
}
66+
if cfg.WorkerQueueTimeout == 0 {
67+
cfg.WorkerQueueTimeout = 5 * time.Minute
68+
}
6569

6670
// Enforce secure defaults for control-plane mode.
6771
if err := validateControlPlaneSecurity(cfg); err != nil {
@@ -171,10 +175,11 @@ func RunControlPlane(cfg ControlPlaneConfig) {
171175
// creation races between pre-warm and first external Flight requests.
172176
if cfg.FlightPort > 0 {
173177
flightIngress, err := NewFlightIngress(cfg.Host, cfg.FlightPort, tlsCfg, cfg.Users, sessions, FlightIngressConfig{
174-
SessionIdleTTL: cfg.FlightSessionIdleTTL,
175-
SessionReapTick: cfg.FlightSessionReapInterval,
176-
HandleIdleTTL: cfg.FlightHandleIdleTTL,
177-
SessionTokenTTL: cfg.FlightSessionTokenTTL,
178+
SessionIdleTTL: cfg.FlightSessionIdleTTL,
179+
SessionReapTick: cfg.FlightSessionReapInterval,
180+
HandleIdleTTL: cfg.FlightHandleIdleTTL,
181+
SessionTokenTTL: cfg.FlightSessionTokenTTL,
182+
WorkerQueueTimeout: cfg.WorkerQueueTimeout,
178183
})
179184
if err != nil {
180185
slog.Error("Failed to initialize Flight ingress.", "error", err)
@@ -261,6 +266,7 @@ func RunControlPlane(cfg ControlPlaneConfig) {
261266
"flight_addr", cp.flightAddr(),
262267
"min_workers", minWorkers,
263268
"max_workers", maxWorkers,
269+
"worker_queue_timeout", cfg.WorkerQueueTimeout,
264270
"memory_budget", formatBytes(rebalancer.memoryBudget),
265271
"memory_rebalance", cfg.MemoryRebalance)
266272

@@ -478,9 +484,11 @@ func (cp *ControlPlane) handleConnection(conn net.Conn) {
478484
cp.rateLimiter.RecordSuccessfulAuth(remoteAddr)
479485
slog.Info("User authenticated.", "user", username, "remote_addr", remoteAddr)
480486

481-
// Create session on a worker
482-
ctx := context.Background()
487+
// Create session on a worker. The timeout controls how long we wait in the
488+
// worker queue when all slots are occupied.
489+
ctx, cancel := context.WithTimeout(context.Background(), cp.cfg.WorkerQueueTimeout)
483490
pid, executor, err := cp.sessions.CreateSession(ctx, username)
491+
cancel()
484492
if err != nil {
485493
slog.Error("Failed to create session.", "user", username, "remote_addr", remoteAddr, "error", err)
486494
_ = server.WriteErrorResponse(writer, "FATAL", "53300", "too many connections")
@@ -741,9 +749,10 @@ func (cp *ControlPlane) recoverFlightIngressAfterFailedReload() {
741749
}
742750

743751
flightIngress, err := NewFlightIngress(cp.cfg.Host, cp.cfg.FlightPort, cp.tlsConfig, cp.cfg.Users, cp.sessions, FlightIngressConfig{
744-
SessionIdleTTL: cp.cfg.FlightSessionIdleTTL,
745-
SessionReapTick: cp.cfg.FlightSessionReapInterval,
746-
HandleIdleTTL: cp.cfg.FlightHandleIdleTTL,
752+
SessionIdleTTL: cp.cfg.FlightSessionIdleTTL,
753+
SessionReapTick: cp.cfg.FlightSessionReapInterval,
754+
HandleIdleTTL: cp.cfg.FlightHandleIdleTTL,
755+
WorkerQueueTimeout: cp.cfg.WorkerQueueTimeout,
747756
})
748757
if err != nil {
749758
slog.Error("Failed to recover Flight ingress after reload failure.", "error", err)

controlplane/flight_ingress.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package controlplane
22

33
import (
44
"crypto/tls"
5-
"errors"
65

76
"github.com/posthog/duckgres/server/flightsqlingress"
87
)
@@ -14,13 +13,9 @@ type FlightIngress = flightsqlingress.FlightIngress
1413
// NewFlightIngress creates a control-plane Flight SQL ingress listener.
1514
func NewFlightIngress(host string, port int, tlsConfig *tls.Config, users map[string]string, sm *SessionManager, cfg FlightIngressConfig) (*FlightIngress, error) {
1615
return flightsqlingress.NewFlightIngress(host, port, tlsConfig, users, sm, cfg, flightsqlingress.Options{
17-
IsMaxWorkersError: func(err error) bool {
18-
return errors.Is(err, ErrMaxWorkersReached)
19-
},
2016
Hooks: flightsqlingress.Hooks{
2117
OnSessionCountChanged: observeFlightAuthSessions,
2218
OnSessionsReaped: observeFlightSessionsReaped,
23-
OnMaxWorkersRetry: observeFlightMaxWorkersRetry,
2419
},
2520
})
2621
}

controlplane/flight_ingress_metrics.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,6 @@ var flightSessionsReapedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
1818
Help: "Number of Flight auth sessions reaped",
1919
}, []string{"trigger"})
2020

21-
var flightMaxWorkersRetryCounter = promauto.NewCounterVec(prometheus.CounterOpts{
22-
Name: "duckgres_flight_max_workers_retry_total",
23-
Help: "Number of max-worker retry outcomes when creating Flight auth sessions",
24-
}, []string{"outcome"})
25-
2621
func observeFlightAuthSessions(count int) {
2722
if count < 0 {
2823
count = 0
@@ -43,7 +38,3 @@ func observeFlightSessionsReaped(trigger string, count int) {
4338
}
4439
flightSessionsReapedCounter.WithLabelValues(trigger).Add(float64(count))
4540
}
46-
47-
func observeFlightMaxWorkersRetry(outcome string) {
48-
flightMaxWorkersRetryCounter.WithLabelValues(outcome).Inc()
49-
}

controlplane/session_mgr.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,21 @@ func NewSessionManager(pool *FlightWorkerPool, rebalancer *MemoryRebalancer) *Se
4848
// creates a session on it, and rebalances memory/thread limits across all active sessions.
4949
func (sm *SessionManager) CreateSession(ctx context.Context, username string) (int32, *server.FlightExecutor, error) {
5050
// Acquire a worker: reuses idle pre-warmed workers or spawns a new one.
51-
// Max-workers check is atomic inside AcquireWorker to prevent TOCTOU races.
52-
worker, err := sm.pool.AcquireWorker()
51+
// When max-workers is set, this blocks until a slot is available.
52+
worker, err := sm.pool.AcquireWorker(ctx)
5353
if err != nil {
5454
return 0, nil, fmt.Errorf("acquire worker: %w", err)
5555
}
5656

5757
sessionToken, err := worker.CreateSession(ctx, username)
5858
if err != nil {
59-
// Clean up the worker we just spawned (but not if it was a pre-warmed idle worker)
60-
sm.pool.RetireWorkerIfNoSessions(worker.ID)
59+
// Clean up the worker we just spawned (but not if it was a pre-warmed idle worker
60+
// that has sessions from other concurrent requests).
61+
if !sm.pool.RetireWorkerIfNoSessions(worker.ID) {
62+
// Worker wasn't retired (it has other sessions), but we still hold
63+
// a semaphore slot for our failed request. Release it.
64+
sm.pool.releaseWorkerSem()
65+
}
6166
return 0, nil, fmt.Errorf("create session on worker %d: %w", worker.ID, err)
6267
}
6368

controlplane/worker_mgr.go

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"crypto/rand"
66
"encoding/hex"
77
"encoding/json"
8-
"errors"
98
"fmt"
109
"log/slog"
1110
"os"
@@ -22,8 +21,6 @@ import (
2221
"google.golang.org/grpc/credentials/insecure"
2322
)
2423

25-
var ErrMaxWorkersReached = errors.New("max workers reached")
26-
2724
// ManagedWorker represents a duckdb-service worker process.
2825
type ManagedWorker struct {
2926
ID int
@@ -55,6 +52,8 @@ type FlightWorkerPool struct {
5552
sessionCounter SessionCounter // set after SessionManager is created
5653
maxWorkers int // 0 = unlimited
5754
shuttingDown bool
55+
workerSem chan struct{} // buffered to maxWorkers; nil when unlimited
56+
shutdownCh chan struct{} // closed by ShutdownAll to unblock queued waiters
5857
}
5958

6059
// NewFlightWorkerPool creates a new worker pool.
@@ -66,6 +65,10 @@ func NewFlightWorkerPool(socketDir, configPath string, maxWorkers int) *FlightWo
6665
configPath: configPath,
6766
binaryPath: binaryPath,
6867
maxWorkers: maxWorkers,
68+
shutdownCh: make(chan struct{}),
69+
}
70+
if maxWorkers > 0 {
71+
pool.workerSem = make(chan struct{}, maxWorkers)
6972
}
7073
observeControlPlaneWorkers(0)
7174
return pool
@@ -235,30 +238,31 @@ func (p *FlightWorkerPool) SpawnMinWorkers(count int) error {
235238
return p.SpawnAll(count)
236239
}
237240

238-
// AcquireWorker returns a worker for a new session. It first tries to claim an
239-
// idle pre-warmed worker (one with no active sessions). If none are available,
240-
// it spawns a new one. The max-workers check is performed atomically under the
241-
// write lock to prevent TOCTOU races from concurrent connections.
242-
func (p *FlightWorkerPool) AcquireWorker() (*ManagedWorker, error) {
241+
// AcquireWorker returns a worker for a new session. When maxWorkers is set,
242+
// callers block in FIFO order on the semaphore until a slot is available,
243+
// the context is cancelled, or the pool shuts down.
244+
// Once a slot is acquired, it first tries to claim an idle pre-warmed worker
245+
// (one with no active sessions). If none are available, it spawns a new one.
246+
func (p *FlightWorkerPool) AcquireWorker(ctx context.Context) (*ManagedWorker, error) {
247+
// Block until a semaphore slot is available (FIFO via Go's sudog queue).
248+
if p.workerSem != nil {
249+
select {
250+
case p.workerSem <- struct{}{}:
251+
// Got a slot
252+
case <-ctx.Done():
253+
return nil, fmt.Errorf("timed out waiting for available worker (max_workers=%d): %w", p.maxWorkers, ctx.Err())
254+
case <-p.shutdownCh:
255+
return nil, fmt.Errorf("pool is shutting down")
256+
}
257+
}
258+
243259
p.mu.Lock()
244260
if p.shuttingDown {
245261
p.mu.Unlock()
262+
p.releaseWorkerSem()
246263
return nil, fmt.Errorf("pool is shutting down")
247264
}
248265

249-
// Check max-workers cap atomically under the write lock
250-
if p.maxWorkers > 0 && len(p.workers) >= p.maxWorkers {
251-
// Even at the cap, we may have idle pre-warmed workers to reuse.
252-
// Only fail if all existing workers are busy.
253-
idle := p.findIdleWorkerLocked()
254-
if idle != nil {
255-
p.mu.Unlock()
256-
return idle, nil
257-
}
258-
p.mu.Unlock()
259-
return nil, fmt.Errorf("%w (%d)", ErrMaxWorkersReached, p.maxWorkers)
260-
}
261-
262266
// Try to claim an idle pre-warmed worker before spawning a new one
263267
idle := p.findIdleWorkerLocked()
264268
if idle != nil {
@@ -271,16 +275,28 @@ func (p *FlightWorkerPool) AcquireWorker() (*ManagedWorker, error) {
271275
p.mu.Unlock()
272276

273277
if err := p.SpawnWorker(id); err != nil {
278+
p.releaseWorkerSem()
274279
return nil, err
275280
}
276281

277282
w, ok := p.Worker(id)
278283
if !ok {
284+
p.releaseWorkerSem()
279285
return nil, fmt.Errorf("worker %d not found after spawn", id)
280286
}
281287
return w, nil
282288
}
283289

290+
// releaseWorkerSem drains one token from the semaphore (non-blocking).
291+
func (p *FlightWorkerPool) releaseWorkerSem() {
292+
if p.workerSem != nil {
293+
select {
294+
case <-p.workerSem:
295+
default:
296+
}
297+
}
298+
}
299+
284300
// findIdleWorkerLocked returns a live worker with no active sessions, or nil.
285301
// Caller must hold p.mu (read or write lock).
286302
func (p *FlightWorkerPool) findIdleWorkerLocked() *ManagedWorker {
@@ -312,18 +328,23 @@ func (p *FlightWorkerPool) RetireWorker(id int) {
312328
p.mu.Unlock()
313329
observeControlPlaneWorkers(workerCount)
314330

331+
// Release semaphore slot so a queued waiter can proceed.
332+
p.releaseWorkerSem()
333+
315334
// Run the actual process cleanup asynchronously so DestroySession
316335
// doesn't block the connection handler goroutine for up to 3s+.
317336
go retireWorkerProcess(w)
318337
}
319338

320339
// RetireWorkerIfNoSessions retires a worker only if it has no active sessions.
321340
// Used to clean up on session creation failure without retiring pre-warmed workers.
322-
func (p *FlightWorkerPool) RetireWorkerIfNoSessions(id int) {
341+
// Returns true if the worker was retired (and its semaphore slot released).
342+
func (p *FlightWorkerPool) RetireWorkerIfNoSessions(id int) bool {
323343
if p.sessionCounter != nil && p.sessionCounter.SessionCountForWorker(id) > 0 {
324-
return
344+
return false
325345
}
326346
p.RetireWorker(id)
347+
return true
327348
}
328349

329350
// retireWorkerProcess handles the actual process shutdown and socket cleanup.
@@ -384,6 +405,9 @@ func (p *FlightWorkerPool) ShutdownAll() {
384405
}
385406
p.mu.Unlock()
386407

408+
// Unblock all goroutines waiting in AcquireWorker's semaphore select.
409+
close(p.shutdownCh)
410+
387411
for _, w := range workers {
388412
if w.cmd.Process != nil {
389413
slog.Info("Shutting down worker.", "id", w.ID, "pid", w.cmd.Process.Pid)
@@ -483,6 +507,7 @@ func (p *FlightWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Du
483507
_ = w.client.Close()
484508
}
485509
_ = os.Remove(w.socketPath)
510+
p.releaseWorkerSem()
486511
default:
487512
// Worker is alive, do a health check.
488513
// Recover nil-pointer panics: w.client.Close() (from a
@@ -535,6 +560,7 @@ func (p *FlightWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Du
535560
_ = w.client.Close()
536561
}
537562
_ = os.Remove(w.socketPath)
563+
p.releaseWorkerSem()
538564
}
539565
}
540566
} else {

0 commit comments

Comments
 (0)