Skip to content

Commit 6e4f4c5

Browse files
author
iwysiu
committed
GODRIVER-1589 consolidate pool.drain and pool.clear (#397)
1 parent 7bd78eb commit 6e4f4c5

File tree

4 files changed

+16
-28
lines changed

4 files changed

+16
-28
lines changed

x/mongo/driver/topology/pool.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,6 @@ func newPool(config poolConfig, connOpts ...ConnectionOption) (*pool, error) {
179179
return pool, nil
180180
}
181181

182-
// drain drains the pool by increasing the generation ID.
183-
func (p *pool) drain() { atomic.AddUint64(&p.generation, 1) }
184-
185182
// stale checks if a given connection's generation is below the generation of the pool
186183
func (p *pool) stale(c *connection) bool {
187184
return c == nil || c.generation < atomic.LoadUint64(&p.generation)
@@ -473,15 +470,13 @@ func (p *pool) put(c *connection) error {
473470
return nil
474471
}
475472

476-
// clear clears the pool by incrementing the generation and then maintaining the pool
473+
// clear clears the pool by incrementing the generation
477474
func (p *pool) clear() {
478475
if p.monitor != nil {
479476
p.monitor.Event(&event.PoolEvent{
480477
Type: event.PoolCleared,
481478
Address: p.address.String(),
482479
})
483480
}
484-
485-
p.drain()
486-
p.conns.Maintain()
481+
atomic.AddUint64(&p.generation, 1)
487482
}

x/mongo/driver/topology/resource_pool_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ func TestResourcePool(t *testing.T) {
182182
assert.Equal(t, int32(5), expiredCalled, "expected expire to be called 5 times, got %v", expiredCalled)
183183
closeCalled := ec.getCloseCalled()
184184
assert.Equal(t, int32(3), closeCalled, "expected close to be called 3 times, got %v", closeCalled)
185+
// rp.maintainTimer should be reset after Maintain runs. Stop() returns true if the
186+
// timer was set, ensuring that it was correctly reset.
187+
assert.True(t, rp.maintainTimer.Stop(), "expected timer to be reset")
185188
})
186189
})
187190
}

x/mongo/driver/topology/server.go

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func NewServer(addr address.Address, opts ...ServerOption) (*Server, error) {
148148
}
149149
s.desc.Store(description.Server{Addr: addr})
150150

151-
callback := func(desc description.Server) { s.updateDescription(desc, false) }
151+
callback := func(desc description.Server) { s.updateDescription(desc) }
152152
pc := poolConfig{
153153
Address: addr,
154154
MinPoolSize: cfg.minConns,
@@ -254,7 +254,8 @@ func (s *Server) Connection(ctx context.Context) (driver.Connection, error) {
254254
Kind: description.Unknown,
255255
LastError: wrappedConnErr,
256256
}
257-
s.updateDescription(desc, false)
257+
s.updateDescription(desc)
258+
s.pool.clear()
258259

259260
return nil, err
260261
}
@@ -324,7 +325,7 @@ func (s *Server) ProcessError(err error) {
324325
desc.Kind = description.Unknown
325326
desc.LastError = err
326327
// updates description to unknown
327-
s.updateDescription(desc, false)
328+
s.updateDescription(desc)
328329
// If the node is shutting down or is older than 4.2, we synchronously clear the pool
329330
if cerr.NodeIsShuttingDown() || desc.WireVersion == nil || desc.WireVersion.Max < 8 {
330331
s.RequestImmediateCheck()
@@ -337,7 +338,7 @@ func (s *Server) ProcessError(err error) {
337338
desc.Kind = description.Unknown
338339
desc.LastError = err
339340
// updates description to unknown
340-
s.updateDescription(desc, false)
341+
s.updateDescription(desc)
341342
// If the node is shutting down or is older than 4.2, we synchronously clear the pool
342343
if wcerr.NodeIsShuttingDown() || desc.WireVersion == nil || desc.WireVersion.Max < 8 {
343344
s.RequestImmediateCheck()
@@ -363,7 +364,7 @@ func (s *Server) ProcessError(err error) {
363364
desc.Kind = description.Unknown
364365
desc.LastError = err
365366
// updates description to unknown
366-
s.updateDescription(desc, false)
367+
s.updateDescription(desc)
367368
s.pool.clear()
368369
}
369370

@@ -393,7 +394,7 @@ func (s *Server) update() {
393394
var desc description.Server
394395

395396
desc, conn = s.heartbeat(nil)
396-
s.updateDescription(desc, true)
397+
s.updateDescription(desc)
397398

398399
closeServer := func() {
399400
doneOnce = true
@@ -433,15 +434,15 @@ func (s *Server) update() {
433434
}
434435

435436
desc, conn = s.heartbeat(conn)
436-
s.updateDescription(desc, false)
437+
s.updateDescription(desc)
437438
}
438439
}
439440

440441
// updateDescription handles updating the description on the Server, notifying
441442
// subscribers, and potentially draining the connection pool. The initial
442443
// parameter is used to determine if this is the first description from the
443444
// server.
444-
func (s *Server) updateDescription(desc description.Server, initial bool) {
445+
func (s *Server) updateDescription(desc description.Server) {
445446
defer func() {
446447
// ¯\_(ツ)_/¯
447448
_ = recover()
@@ -463,16 +464,6 @@ func (s *Server) updateDescription(desc description.Server, initial bool) {
463464
c <- desc
464465
}
465466
s.subLock.Unlock()
466-
467-
if initial {
468-
// We don't clear the pool on the first update on the description.
469-
return
470-
}
471-
472-
switch desc.Kind {
473-
case description.Unknown:
474-
s.pool.drain()
475-
}
476467
}
477468

478469
// heartbeat sends a heartbeat to the server using the given connection. The connection can be nil.
@@ -555,7 +546,7 @@ func (s *Server) heartbeat(conn *connection) (description.Server, *connection) {
555546
saved = err
556547
conn = nil
557548
if wrappedConnErr := unwrapConnectionError(err); wrappedConnErr != nil {
558-
s.pool.drain()
549+
s.pool.clear()
559550
// If the server is not connected, give up and exit loop
560551
if s.Description().Kind == description.Unknown {
561552
break

x/mongo/driver/topology/server_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ func TestServer(t *testing.T) {
122122
desc = &descript
123123
require.Nil(t, desc.LastError)
124124
}
125-
//err = s.Connect(nil)
126125
err = s.pool.connect()
127126
require.NoError(t, err, "unable to connect to pool")
128127
s.connectionstate = connected
@@ -245,7 +244,7 @@ func TestServer(t *testing.T) {
245244
updated.Store(false)
246245
s, err := ConnectServer(address.Address("localhost"), func(description.Server) { updated.Store(true) })
247246
require.NoError(t, err)
248-
s.updateDescription(description.Server{Addr: s.address}, false)
247+
s.updateDescription(description.Server{Addr: s.address})
249248
require.True(t, updated.Load().(bool))
250249
})
251250
t.Run("heartbeat", func(t *testing.T) {

0 commit comments

Comments
 (0)