Skip to content

Commit 53a8ba3

Browse files
authored
Merge pull request #38 from PostHog/fix/connection-timeout
Add connection timeout and keepalive to prevent hanging
2 parents 20a00c7 + af276d9 commit 53a8ba3

File tree

2 files changed

+52
-20
lines changed

2 files changed

+52
-20
lines changed

server/conn.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"os"
1414
"regexp"
1515
"strings"
16+
"time"
1617

1718
"github.com/posthog/duckgres/transpiler"
1819
)
@@ -76,20 +77,16 @@ func (c *clientConn) serve() error {
7677
return fmt.Errorf("startup failed: %w", err)
7778
}
7879

79-
// Create a new DuckDB connection for this client session.
80-
// Each client gets its own connection to ensure proper isolation of
81-
// temporary tables and session state, matching PostgreSQL's behavior.
80+
// Create a DuckDB connection for this client session
8281
db, err := c.server.createDBConnection(c.username)
8382
if err != nil {
8483
c.sendError("FATAL", "28000", fmt.Sprintf("failed to open database: %v", err))
8584
return err
8685
}
8786
c.db = db
88-
// Ensure the database connection is closed when this client disconnects
8987
defer func() {
9088
if c.db != nil {
9189
c.db.Close()
92-
log.Printf("Closed DuckDB connection for user %q", c.username)
9390
}
9491
}()
9592

@@ -225,11 +222,21 @@ func (c *clientConn) sendInitialParams() {
225222

226223
func (c *clientConn) messageLoop() error {
227224
for {
225+
// Set read deadline if idle timeout is configured
226+
if c.server.cfg.IdleTimeout > 0 {
227+
c.conn.SetReadDeadline(time.Now().Add(c.server.cfg.IdleTimeout))
228+
}
229+
228230
msgType, body, err := readMessage(c.reader)
229231
if err != nil {
230232
if err == io.EOF {
231233
return nil
232234
}
235+
// Check if this is a timeout error
236+
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
237+
log.Printf("[%s] Connection idle timeout, closing", c.username)
238+
return nil
239+
}
233240
return err
234241
}
235242

server/server.go

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ type Config struct {
4343

4444
// Graceful shutdown timeout (default: 30s)
4545
ShutdownTimeout time.Duration
46+
47+
// IdleTimeout is the maximum time a connection can be idle before being closed.
48+
// This prevents accumulation of zombie connections from clients that disconnect
49+
// uncleanly. Default: 10 minutes. Set to 0 to disable.
50+
IdleTimeout time.Duration
4651
}
4752

4853
// DuckLakeConfig configures DuckLake catalog attachment
@@ -86,8 +91,9 @@ type Server struct {
8691
closeMu sync.Mutex
8792
activeConns int64 // atomic counter for active connections
8893

89-
// duckLakeMu serializes DuckLake secret creation to avoid write-write conflicts
90-
duckLakeMu sync.Mutex
94+
// duckLakeSem serializes DuckLake attachment to avoid write-write conflicts.
95+
// Using a channel instead of mutex allows for timeout on acquisition.
96+
duckLakeSem chan struct{}
9197
}
9298

9399
func New(cfg Config) (*Server, error) {
@@ -111,12 +117,18 @@ func New(cfg Config) (*Server, error) {
111117
cfg.ShutdownTimeout = 30 * time.Second
112118
}
113119

120+
// Use default idle timeout if not specified (10 minutes)
121+
if cfg.IdleTimeout == 0 {
122+
cfg.IdleTimeout = 10 * time.Minute
123+
}
124+
114125
s := &Server{
115126
cfg: cfg,
116127
rateLimiter: NewRateLimiter(cfg.RateLimit),
117128
tlsConfig: &tls.Config{
118129
Certificates: []tls.Certificate{cert},
119130
},
131+
duckLakeSem: make(chan struct{}, 1),
120132
}
121133

122134
log.Printf("TLS enabled with certificate: %s", cfg.TLSCertFile)
@@ -146,6 +158,12 @@ func (s *Server) ListenAndServe() error {
146158
continue
147159
}
148160

161+
// Enable TCP keepalive to detect dead connections
162+
if tcpConn, ok := conn.(*net.TCPConn); ok {
163+
tcpConn.SetKeepAlive(true)
164+
tcpConn.SetKeepAlivePeriod(30 * time.Second)
165+
}
166+
149167
s.wg.Add(1)
150168
go func() {
151169
defer s.wg.Done()
@@ -230,16 +248,19 @@ func (s *Server) ActiveConnections() int64 {
230248
return atomic.LoadInt64(&s.activeConns)
231249
}
232250

233-
// createDBConnection creates a new DuckDB connection for a client session.
234-
// Each client connection gets its own DB connection to ensure proper isolation
235-
// of temporary tables and session state, matching PostgreSQL's behavior.
251+
// createDBConnection creates a DuckDB connection for a client session.
252+
// Uses in-memory database as an anchor for DuckLake attachment (actual data lives in RDS/S3).
236253
func (s *Server) createDBConnection(username string) (*sql.DB, error) {
237-
dbPath := fmt.Sprintf("%s/%s.db", s.cfg.DataDir, username)
238-
db, err := sql.Open("duckdb", dbPath)
254+
// Create new in-memory connection (DuckLake provides actual storage)
255+
db, err := sql.Open("duckdb", ":memory:")
239256
if err != nil {
240257
return nil, fmt.Errorf("failed to open duckdb: %w", err)
241258
}
242259

260+
// Single connection per client session
261+
db.SetMaxOpenConns(1)
262+
db.SetMaxIdleConns(1)
263+
243264
// Verify connection
244265
if err := db.Ping(); err != nil {
245266
db.Close()
@@ -249,7 +270,6 @@ func (s *Server) createDBConnection(username string) (*sql.DB, error) {
249270
// Load configured extensions
250271
if err := s.loadExtensions(db); err != nil {
251272
log.Printf("Warning: failed to load some extensions for user %q: %v", username, err)
252-
// Continue anyway - database will still work without the extensions
253273
}
254274

255275
// Attach DuckLake catalog if configured
@@ -270,7 +290,6 @@ func (s *Server) createDBConnection(username string) (*sql.DB, error) {
270290
// Continue anyway - basic queries will still work
271291
}
272292

273-
log.Printf("Opened DuckDB connection for user %q at %s", username, dbPath)
274293
return db, nil
275294
}
276295

@@ -308,11 +327,17 @@ func (s *Server) attachDuckLake(db *sql.DB) error {
308327
return nil // DuckLake not configured
309328
}
310329

311-
// Serialize DuckLake attachment to avoid race conditions
312-
// Multiple connections trying to attach simultaneously can cause
313-
// "database with name '__ducklake_metadata_ducklake' already exists" errors
314-
s.duckLakeMu.Lock()
315-
defer s.duckLakeMu.Unlock()
330+
// Serialize DuckLake attachment to avoid race conditions where multiple
331+
// connections try to attach simultaneously, causing errors like
332+
// "database with name '__ducklake_metadata_ducklake' already exists".
333+
// Use a 30-second timeout to prevent connections from hanging indefinitely
334+
// if attachment is slow (e.g., network latency to metadata store).
335+
select {
336+
case s.duckLakeSem <- struct{}{}:
337+
defer func() { <-s.duckLakeSem }()
338+
case <-time.After(30 * time.Second):
339+
return fmt.Errorf("timeout waiting for DuckLake attachment lock")
340+
}
316341

317342
// Check if DuckLake catalog is already attached
318343
var count int
@@ -376,7 +401,7 @@ func (s *Server) attachDuckLake(db *sql.DB) error {
376401
// - "config": explicit credentials (for MinIO or when you have access keys)
377402
// - "credential_chain": AWS SDK credential chain (env vars, config files, instance metadata, etc.)
378403
//
379-
// Note: Caller must hold duckLakeMu to avoid race conditions.
404+
// Note: Caller must hold duckLakeSem to avoid race conditions.
380405
// See: https://duckdb.org/docs/stable/core_extensions/httpfs/s3api
381406
func (s *Server) createS3Secret(db *sql.DB) error {
382407
// Check if secret already exists to avoid unnecessary creation

0 commit comments

Comments
 (0)