Skip to content

Commit 7561f45

Browse files
GODRIVER-3419 Remove tmp file
1 parent 58f5fa5 commit 7561f45

File tree

2 files changed

+150
-72
lines changed

2 files changed

+150
-72
lines changed

x/mongo/driver/topology/pool.go

Lines changed: 150 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
596596

597597
// If we didn't get an immediately available idle connection, also get in the queue for a new
598598
// connection while we're waiting for an idle connection.
599-
p.queueForNewConn(w)
599+
p.queueForNewConn(ctx, w)
600600
p.stateMu.RUnlock()
601601

602602
// Wait for either the wantConn to be ready or for the Context to time out.
@@ -1116,7 +1116,7 @@ func (p *pool) getOrQueueForIdleConn(w *wantConn) bool {
11161116
return false
11171117
}
11181118

1119-
func (p *pool) queueForNewConn(w *wantConn) {
1119+
func (p *pool) queueForNewConn(ctx context.Context, w *wantConn) {
11201120
p.createConnectionsCond.L.Lock()
11211121
defer p.createConnectionsCond.L.Unlock()
11221122

@@ -1125,7 +1125,7 @@ func (p *pool) queueForNewConn(w *wantConn) {
11251125
p.createConnectionsCond.Signal()
11261126

11271127
// Try to spawn without blocking the caller.
1128-
go p.spawnConnectionIfNeeded()
1128+
go p.spawnConnectionIfNeeded(ctx)
11291129
}
11301130

11311131
func (p *pool) totalConnectionCount() int {
@@ -1216,7 +1216,7 @@ func (p *pool) maintain(ctx context.Context, wg *sync.WaitGroup) {
12161216

12171217
for i := 0; i < n; i++ {
12181218
w := newWantConn()
1219-
p.queueForNewConn(w)
1219+
p.queueForNewConn(ctx, w)
12201220
wantConns = append(wantConns, w)
12211221

12221222
// Start a goroutine for each new wantConn, waiting for it to be ready.
@@ -1403,3 +1403,149 @@ func (q *wantConnQueue) cleanFront() {
14031403
q.popFront()
14041404
}
14051405
}
1406+
1407+
func (p *pool) spawnConnection(w *wantConn, conn *connection) {
1408+
defer func() { <-p.connectionSem }() // Release slot when done, see maxConnecting.
1409+
1410+
// Perform dial/handshake with optional timeout.
1411+
start := time.Now()
1412+
1413+
// Pass the createConnections context to connect to allow pool close to
1414+
// cancel connection establishment so shutdown doesn't block indefinitely if
1415+
// connectTimeout=0.
1416+
//
1417+
// Per the specifications, an explicit value of connectTimeout=0 means the
1418+
// timeout is "infinite".
1419+
dialCtx := context.Background()
1420+
var cancel context.CancelFunc
1421+
if p.connectTimeout > 0 {
1422+
dialCtx, cancel = context.WithTimeout(dialCtx, p.connectTimeout)
1423+
defer cancel()
1424+
}
1425+
1426+
err := conn.connect(dialCtx)
1427+
1428+
if err != nil {
1429+
// Deliver error and run SDAM handshake error logic
1430+
w.tryDeliver(nil, err)
1431+
1432+
// If there's an error connecting the new connection, call the handshake error handler
1433+
// that implements the SDAM handshake error handling logic. This must be called after
1434+
// delivering the connection error to the waiting wantConn. If it's called before, the
1435+
// handshake error handler may clear the connection pool, leading to a different error
1436+
// message being delivered to the same waiting wantConn in idleConnWait when the wait
1437+
// queues are cleared.
1438+
if p.handshakeErrFn != nil {
1439+
p.handshakeErrFn(err, conn.generation, conn.desc.ServiceID)
1440+
}
1441+
1442+
_ = p.removeConnection(conn, reason{
1443+
loggerConn: logger.ReasonConnClosedError,
1444+
event: event.ReasonError,
1445+
}, err)
1446+
1447+
_ = p.closeConnection(conn)
1448+
1449+
return
1450+
}
1451+
1452+
// emit "ConnectionReady"
1453+
duration := time.Since(start)
1454+
if mustLogPoolMessage(p) {
1455+
keysAndValues := logger.KeyValues{
1456+
logger.KeyDriverConnectionID, conn.driverConnectionID,
1457+
logger.KeyDurationMS, duration.Milliseconds(),
1458+
}
1459+
1460+
logPoolMessage(p, logger.ConnectionReady, keysAndValues...)
1461+
}
1462+
1463+
if p.monitor != nil {
1464+
p.monitor.Event(&event.PoolEvent{
1465+
Type: event.ConnectionReady,
1466+
Address: p.address.String(),
1467+
ConnectionID: conn.driverConnectionID,
1468+
Duration: duration,
1469+
})
1470+
}
1471+
1472+
// deliver the connection or check it back in on spurious wakeup
1473+
if !w.tryDeliver(conn, nil) {
1474+
_ = p.checkInNoEvent(conn)
1475+
}
1476+
}
1477+
1478+
// hasSpace checks if the pool has space for a new connection.
1479+
func (p *pool) hasSpace() bool {
1480+
return p.maxSize == 0 || uint64(len(p.conns)) < p.maxSize
1481+
}
1482+
1483+
// checkOutWaiting checks if there are any waiting connections that need to be
1484+
// checked out.
1485+
func (p *pool) checkOutWaiting() bool {
1486+
return p.newConnWait.len() > 0
1487+
}
1488+
1489+
// waitForNewConn blocks until there's both work and room in the pool (or the
1490+
// context is canceled) then pops exactly one wantconn and creates+registers its
1491+
// connection.
1492+
func (p *pool) waitForNewConn(ctx context.Context) (*wantConn, *connection, bool) {
1493+
p.createConnectionsCond.L.Lock()
1494+
defer p.createConnectionsCond.L.Unlock()
1495+
1496+
for !(p.checkOutWaiting() && p.hasSpace()) && ctx.Err() == nil {
1497+
p.createConnectionsCond.Wait()
1498+
}
1499+
1500+
if ctx.Err() != nil {
1501+
return nil, nil, false
1502+
}
1503+
1504+
p.newConnWait.cleanFront()
1505+
w := p.newConnWait.popFront()
1506+
if w == nil {
1507+
return nil, nil, false
1508+
}
1509+
1510+
conn := newConnection(p.address, p.connOpts...)
1511+
conn.pool = p
1512+
conn.driverConnectionID = atomic.AddInt64(&p.nextID, 1)
1513+
p.conns[conn.driverConnectionID] = conn
1514+
1515+
return w, conn, true
1516+
}
1517+
1518+
// spawnConnectionIfNeeded takes on waiting waitConn (if any) and starts its
1519+
// connection creation subject to the semaphore limit.
1520+
func (p *pool) spawnConnectionIfNeeded(ctx context.Context) {
1521+
// Block until we're allowed to start another connection.
1522+
p.connectionSem <- struct{}{}
1523+
1524+
// Wait on pool space & context.
1525+
w, conn, ok := p.waitForNewConn(ctx)
1526+
if !ok {
1527+
<-p.connectionSem // Release slot on failure.
1528+
1529+
return
1530+
}
1531+
1532+
// Emit "ConnectionCreated"
1533+
if mustLogPoolMessage(p) {
1534+
keysAndValues := logger.KeyValues{
1535+
logger.KeyDriverConnectionID, conn.driverConnectionID,
1536+
}
1537+
1538+
logPoolMessage(p, logger.ConnectionCreated, keysAndValues...)
1539+
}
1540+
1541+
if p.monitor != nil {
1542+
p.monitor.Event(&event.PoolEvent{
1543+
Type: event.ConnectionCreated,
1544+
Address: p.address.String(),
1545+
ConnectionID: conn.driverConnectionID,
1546+
})
1547+
}
1548+
1549+
// Dial the connection and spawn it in the background.
1550+
go p.spawnConnection(w, conn)
1551+
}

x/mongo/driver/topology/pool.tmp.go

Lines changed: 0 additions & 68 deletions
This file was deleted.

0 commit comments

Comments
 (0)