Skip to content

Commit 76bf805

Browse files
queue new connection when removing one to prevent stalled checkOut
1 parent 4b88fd7 commit 76bf805

File tree

2 files changed

+80
-1
lines changed

2 files changed

+80
-1
lines changed

x/mongo/driver/topology/pool.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -934,6 +934,18 @@ func (p *pool) checkInNoEvent(conn *connection) error {
934934
go func() {
935935
_ = p.closeConnection(conn)
936936
}()
937+
938+
// Since we are removing the connection, we should try to queue another
939+
// in good faith in case the current idle wait queue is being awaited
940+
// in a checkOut() call.
941+
p.createConnectionsCond.L.Lock()
942+
w := p.newConnWait.popFront()
943+
p.createConnectionsCond.L.Unlock()
944+
945+
if w != nil {
946+
p.queueForNewConn(context.Background(), w)
947+
}
948+
937949
return nil
938950
}
939951

@@ -1128,15 +1140,29 @@ func (p *pool) getOrQueueForIdleConn(w *wantConn) bool {
11281140
return false
11291141
}
11301142

1143+
// queueForNewConn enqueues a checkout request and signals the
1144+
// connection-creation state machine. It does NOT initiate dialing directly,
1145+
// but places the wantConn into the pending queue and wakes a background worker
1146+
// using sync.Cond. That worker will then dequeue in FIFO order and perform the
1147+
// actual dial under it's own synchronization, preserving order.
11311148
func (p *pool) queueForNewConn(ctx context.Context, w *wantConn) {
11321149
p.createConnectionsCond.L.Lock()
11331150
defer p.createConnectionsCond.L.Unlock()
11341151

1152+
// Remove any wantConn entries at the front that are no longer waiting. This
1153+
// keeps the queue clean and avoids delivering to canceled requests.
11351154
p.newConnWait.cleanFront()
1155+
1156+
// Enqueu this wantConn for allocation of a new connection.
11361157
p.newConnWait.pushBack(w)
1158+
1159+
// Signale on goroutine waiting in waitForNewConn that pool state changed and
1160+
// new wantConn is available. That goroutine will then dequeue under lock.
11371161
p.createConnectionsCond.Signal()
11381162

1139-
// Try to spawn without blocking the caller.
1163+
// Spawn a background worker to service the queue without blocking callers. We
1164+
// do NOT pass "w" here because the worker must re-acquite the queue lock and
1165+
// pick the next available wantConn in FIFO order via waitForNewConn.
11401166
go p.spawnConnectionIfNeeded(ctx)
11411167
}
11421168

@@ -1529,6 +1555,11 @@ func (p *pool) waitForNewConn(ctx context.Context) (*wantConn, *connection, bool
15291555
// spawnConnectionIfNeeded takes on waiting waitConn (if any) and starts its
15301556
// connection creation subject to the semaphore limit.
15311557
func (p *pool) spawnConnectionIfNeeded(ctx context.Context) {
1558+
if !p.hasSpace() {
1559+
// If the pool is full, we can't spawn a new connection.
1560+
return
1561+
}
1562+
15321563
// Block until we're allowed to start another connection.
15331564
p.connectionSem <- struct{}{}
15341565

x/mongo/driver/topology/pool_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"errors"
1212
"net"
1313
"regexp"
14+
"runtime"
1415
"sync"
1516
"testing"
1617
"time"
@@ -1608,3 +1609,50 @@ func TestPool_Error(t *testing.T) {
16081609
p.close(context.Background())
16091610
})
16101611
}
1612+
1613+
// Test that if the pool is already at MaxPoolSize, a flood of checkOuts with
1614+
// a background context spins up unbounded goroutines.
1615+
func TestPool_unboundedGoroutines(t *testing.T) {
1616+
// Start a server that never response so no connection ever frees up.
1617+
addr := bootstrapConnections(t, 1, func(net.Conn) {
1618+
<-make(chan struct{})
1619+
})
1620+
1621+
// Create pool with exactly 1 slot and 1 dial slot.
1622+
p := newPool(poolConfig{
1623+
Address: address.Address(addr.String()),
1624+
MaxPoolSize: 1,
1625+
MaxConnecting: 1,
1626+
ConnectTimeout: defaultConnectionTimeout,
1627+
})
1628+
require.NoError(t, p.ready(), "pool ready error")
1629+
1630+
// Drain the only connection so the pool is full.
1631+
c, err := p.checkOut(context.Background())
1632+
require.NoError(t, err, "checkOut error")
1633+
1634+
defer func() {
1635+
_ = p.checkIn(c)
1636+
p.close(context.Background())
1637+
}()
1638+
1639+
// Snapshot base goroutine count.
1640+
before := runtime.NumGoroutine()
1641+
1642+
// Flood with N background checkOuts
1643+
const N = 100
1644+
for i := 0; i < N; i++ {
1645+
go func() {
1646+
_, _ = p.checkOut(context.Background())
1647+
}()
1648+
}
1649+
1650+
// Give them a moment to spin up
1651+
time.Sleep(1000 * time.Millisecond)
1652+
1653+
after := runtime.NumGoroutine()
1654+
delta := after - before - N
1655+
1656+
assert.LessOrEqual(t, delta, int(p.maxConnecting))
1657+
1658+
}

0 commit comments

Comments
 (0)