Skip to content

Commit f3d3a2e

Browse files
GODRIVER-3419 Sketch out pattern
1 parent f8e4368 commit f3d3a2e

File tree

2 files changed

+87
-142
lines changed

2 files changed

+87
-142
lines changed

x/mongo/driver/topology/pool.go

Lines changed: 6 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ type pool struct {
128128
idleConns []*connection // idleConns holds all idle connections.
129129
idleConnWait wantConnQueue // idleConnWait holds all wantConn requests for idle connections.
130130
connectTimeout time.Duration
131+
132+
connectionSem chan struct{}
131133
}
132134

133135
// getState returns the current state of the pool. Callers must not hold the stateMu lock.
@@ -226,6 +228,7 @@ func newPool(config poolConfig, connOpts ...ConnectionOption) *pool {
226228
conns: make(map[int64]*connection, config.MaxPoolSize),
227229
idleConns: make([]*connection, 0, config.MaxPoolSize),
228230
connectTimeout: config.ConnectTimeout,
231+
connectionSem: make(chan struct{}, maxConnecting),
229232
}
230233
// minSize must not exceed maxSize if maxSize is not 0
231234
if pool.maxSize != 0 && pool.minSize > pool.maxSize {
@@ -241,11 +244,6 @@ func newPool(config poolConfig, connOpts ...ConnectionOption) *pool {
241244
var ctx context.Context
242245
ctx, pool.cancelBackgroundCtx = context.WithCancel(context.Background())
243246

244-
for i := 0; i < int(pool.maxConnecting); i++ {
245-
pool.backgroundDone.Add(1)
246-
go pool.createConnections(ctx, pool.backgroundDone)
247-
}
248-
249247
// If maintainInterval is not positive, don't start the maintain() goroutine. Expect that
250248
// negative values are only used in testing; this config value is not user-configurable.
251249
if maintainInterval > 0 {
@@ -1125,6 +1123,9 @@ func (p *pool) queueForNewConn(w *wantConn) {
11251123
p.newConnWait.cleanFront()
11261124
p.newConnWait.pushBack(w)
11271125
p.createConnectionsCond.Signal()
1126+
1127+
// Try to spawn without blocking the caller.
1128+
go p.spawnConnectionIfNeeded()
11281129
}
11291130

11301131
func (p *pool) totalConnectionCount() int {
@@ -1141,143 +1142,6 @@ func (p *pool) availableConnectionCount() int {
11411142
return len(p.idleConns)
11421143
}
11431144

1144-
// createConnections creates connections for wantConn requests on the newConnWait queue.
1145-
func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) {
1146-
defer wg.Done()
1147-
1148-
// condition returns true if the createConnections() loop should continue and false if it should
1149-
// wait. Note that the condition also listens for Context cancellation, which also causes the
1150-
// loop to continue, allowing for a subsequent check to return from createConnections().
1151-
condition := func() bool {
1152-
checkOutWaiting := p.newConnWait.len() > 0
1153-
poolHasSpace := p.maxSize == 0 || uint64(len(p.conns)) < p.maxSize
1154-
cancelled := ctx.Err() != nil
1155-
return (checkOutWaiting && poolHasSpace) || cancelled
1156-
}
1157-
1158-
// wait waits for there to be an available wantConn and for the pool to have space for a new
1159-
// connection. When the condition becomes true, it creates a new connection and returns the
1160-
// waiting wantConn and new connection. If the Context is cancelled or there are any
1161-
// errors, wait returns with "ok = false".
1162-
wait := func() (*wantConn, *connection, bool) {
1163-
p.createConnectionsCond.L.Lock()
1164-
defer p.createConnectionsCond.L.Unlock()
1165-
1166-
for !condition() {
1167-
p.createConnectionsCond.Wait()
1168-
}
1169-
1170-
if ctx.Err() != nil {
1171-
return nil, nil, false
1172-
}
1173-
1174-
p.newConnWait.cleanFront()
1175-
w := p.newConnWait.popFront()
1176-
if w == nil {
1177-
return nil, nil, false
1178-
}
1179-
1180-
conn := newConnection(p.address, p.connOpts...)
1181-
conn.pool = p
1182-
conn.driverConnectionID = atomic.AddInt64(&p.nextID, 1)
1183-
p.conns[conn.driverConnectionID] = conn
1184-
1185-
return w, conn, true
1186-
}
1187-
1188-
for ctx.Err() == nil {
1189-
w, conn, ok := wait()
1190-
if !ok {
1191-
continue
1192-
}
1193-
1194-
if mustLogPoolMessage(p) {
1195-
keysAndValues := logger.KeyValues{
1196-
logger.KeyDriverConnectionID, conn.driverConnectionID,
1197-
}
1198-
1199-
logPoolMessage(p, logger.ConnectionCreated, keysAndValues...)
1200-
}
1201-
1202-
if p.monitor != nil {
1203-
p.monitor.Event(&event.PoolEvent{
1204-
Type: event.ConnectionCreated,
1205-
Address: p.address.String(),
1206-
ConnectionID: conn.driverConnectionID,
1207-
})
1208-
}
1209-
1210-
start := time.Now()
1211-
// Pass the createConnections context to connect to allow pool close to
1212-
// cancel connection establishment so shutdown doesn't block indefinitely if
1213-
// connectTimeout=0.
1214-
//
1215-
// Per the specifications, an explicit value of connectTimeout=0 means the
1216-
// timeout is "infinite".
1217-
1218-
var cancel context.CancelFunc
1219-
1220-
connctx := context.Background()
1221-
if p.connectTimeout != 0 {
1222-
connctx, cancel = context.WithTimeout(ctx, p.connectTimeout)
1223-
}
1224-
1225-
err := conn.connect(connctx)
1226-
1227-
if cancel != nil {
1228-
cancel()
1229-
}
1230-
1231-
if err != nil {
1232-
w.tryDeliver(nil, err)
1233-
1234-
// If there's an error connecting the new connection, call the handshake error handler
1235-
// that implements the SDAM handshake error handling logic. This must be called after
1236-
// delivering the connection error to the waiting wantConn. If it's called before, the
1237-
// handshake error handler may clear the connection pool, leading to a different error
1238-
// message being delivered to the same waiting wantConn in idleConnWait when the wait
1239-
// queues are cleared.
1240-
if p.handshakeErrFn != nil {
1241-
p.handshakeErrFn(err, conn.generation, conn.desc.ServiceID)
1242-
}
1243-
1244-
_ = p.removeConnection(conn, reason{
1245-
loggerConn: logger.ReasonConnClosedError,
1246-
event: event.ReasonError,
1247-
}, err)
1248-
1249-
_ = p.closeConnection(conn)
1250-
1251-
continue
1252-
}
1253-
1254-
duration := time.Since(start)
1255-
if mustLogPoolMessage(p) {
1256-
keysAndValues := logger.KeyValues{
1257-
logger.KeyDriverConnectionID, conn.driverConnectionID,
1258-
logger.KeyDurationMS, duration.Milliseconds(),
1259-
}
1260-
1261-
logPoolMessage(p, logger.ConnectionReady, keysAndValues...)
1262-
}
1263-
1264-
if p.monitor != nil {
1265-
p.monitor.Event(&event.PoolEvent{
1266-
Type: event.ConnectionReady,
1267-
Address: p.address.String(),
1268-
ConnectionID: conn.driverConnectionID,
1269-
Duration: duration,
1270-
})
1271-
}
1272-
1273-
if w.tryDeliver(conn, nil) {
1274-
continue
1275-
}
1276-
1277-
_ = p.checkInNoEvent(conn)
1278-
}
1279-
}
1280-
12811145
func (p *pool) maintain(ctx context.Context, wg *sync.WaitGroup) {
12821146
defer wg.Done()
12831147

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package topology
2+
3+
import (
4+
"context"
5+
"sync/atomic"
6+
)
7+
8+
func (p *pool) spawnConnection(ctx context.Context, w *wantConn, conn *connection) {
9+
//// Relase slot when done.
10+
//defer func() { <-p.sem }()
11+
12+
//// Simulate connection setup delay
13+
//time.Sleep(1000 * time.Millisecond)
14+
15+
//// Register new conenction
16+
//p.mu.Lock()
17+
//p.nextID++
18+
//conn := &conn{id: p.nextID}
19+
//p.conns[conn.id] = conn
20+
//p.mu.Unlock()
21+
22+
//w.ready <- conn // Notify the waiting goroutine that the connection is ready
23+
}
24+
25+
// hasSpace checks if the pool has space for a new connection.
26+
func (p *pool) hasSpace() bool {
27+
return p.maxSize == 0 || uint64(len(p.conns)) < p.maxSize
28+
}
29+
30+
// checkOutWaiting checks if there are any waiting connections that need to be
31+
// checked out.
32+
func (p *pool) checkOutWaiting() bool {
33+
return p.newConnWait.len() > 0
34+
}
35+
36+
// waitForNewConn blocks until there's both work and room in the pool (or the
37+
// context is canceled) then pops exactly one wantconn and creates+registes its
38+
// connection.
39+
func (p *pool) waitForNewConn(ctx context.Context) (*wantConn, *connection, bool) {
40+
p.createConnectionsCond.L.Lock()
41+
defer p.createConnectionsCond.L.Unlock()
42+
43+
for !(p.checkOutWaiting() && p.hasSpace()) && ctx.Err() == nil {
44+
p.createConnectionsCond.Wait()
45+
}
46+
47+
if ctx.Err() != nil {
48+
return nil, nil, false
49+
}
50+
51+
p.newConnWait.cleanFront()
52+
w := p.newConnWait.popFront()
53+
if w == nil {
54+
return nil, nil, false
55+
}
56+
57+
conn := newConnection(p.address, p.connOpts...)
58+
conn.pool = p
59+
conn.driverConnectionID = atomic.AddInt64(&p.nextID, 1)
60+
p.conns[conn.driverConnectionID] = conn
61+
62+
return w, conn, true
63+
}
64+
65+
// spawnConnectionIfNeeded takes on waiting waitConn (if any) and starts its
66+
// connection creation subject to the semaphore limit.
67+
func (p *pool) spawnConnectionIfNeeded(ctx context.Context) {
68+
// Block until we're allowed to start another connection.
69+
p.connectionSem <- struct{}{}
70+
71+
// Wait on pool space & context.
72+
w, conn, ok := p.waitForNewConn(ctx)
73+
if !ok {
74+
<-p.connectionSem // Release slot on failure.
75+
76+
return
77+
}
78+
79+
// Check out connection in background as non-blocking.
80+
go p.spawnConnection(ctx, w, conn)
81+
}

0 commit comments

Comments
 (0)