Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 41 additions & 11 deletions x/mongo/driver/topology/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"
"io"
"net"
"os"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -54,7 +55,7 @@ type connection struct {
nc net.Conn // When nil, the connection is closed.
addr address.Address
idleTimeout time.Duration
idleDeadline atomic.Value // Stores a time.Time
idleStart atomic.Value // Stores a time.Time
readTimeout time.Duration
writeTimeout time.Duration
desc description.Server
Expand Down Expand Up @@ -512,24 +513,53 @@ func (c *connection) close() error {
}

func (c *connection) closed() bool {
return atomic.LoadInt64(&c.state) == connDisconnected
if atomic.LoadInt64(&c.state) == connDisconnected {
return true
}

// If the connection has been idle for less than 10 seconds, skip the liveness
// check.
idleStart, ok := c.idleStart.Load().(time.Time)
if !ok || idleStart.Add(10*time.Second).After(time.Now()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the significance of 10 seconds?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reference was the PyMongo driver, which checks connection liveness when it's idle for >1 second. I wanted to mitigate the risk that checking liveness too often would cause performance problems (since a liveness check takes at least 1ms), so I increased that threshold to 10s.

return false
}

// Set a 1ms read deadline and attempt to read 1 byte from the wire. Expect
// it to block for 1ms then return a deadline exceeded error. If it returns
// any other error, the connection is not usable, so close it and return
// true. If it doesn't return an error and actually reads data, the
// connection is also not usable, so close it and return true.
err := c.nc.SetReadDeadline(time.Now().Add(1 * time.Millisecond))
if err != nil {
_ = c.close()
return true
}

var b [1]byte
_, err = c.nc.Read(b[:])
if !errors.Is(err, os.ErrDeadlineExceeded) {
_ = c.close()
return true
}

// We don't need to un-set the read deadline because the "read" and "write"
// methods always reset the deadlines.

return false
}

func (c *connection) idleTimeoutExpired() bool {
now := time.Now()
if c.idleTimeout > 0 {
idleDeadline, ok := c.idleDeadline.Load().(time.Time)
if ok && now.After(idleDeadline) {
return true
}
if c.idleTimeout == 0 {
return false
}

return false
idleStart, ok := c.idleStart.Load().(time.Time)
return ok && idleStart.Add(c.idleTimeout).Before(time.Now())
}

func (c *connection) bumpIdleDeadline() {
func (c *connection) bumpIdleStart() {
if c.idleTimeout > 0 {
c.idleDeadline.Store(time.Now().Add(c.idleTimeout))
c.idleStart.Store(time.Now())
}
}

Expand Down
16 changes: 9 additions & 7 deletions x/mongo/driver/topology/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,13 +892,15 @@ func (p *pool) checkInNoEvent(conn *connection) error {
return nil
}

// Bump the connection idle deadline here because we're about to make the connection "available".
// The idle deadline is used to determine when a connection has reached its max idle time and
// should be closed. A connection reaches its max idle time when it has been "available" in the
// idle connections stack for more than the configured duration (maxIdleTimeMS). Set it before
// we call connectionPerished(), which checks the idle deadline, because a newly "available"
// connection should never be perished due to max idle time.
conn.bumpIdleDeadline()
// Bump the connection idle start time here because we're about to make the
// connection "available". The idle start time is used to determine how long
// a connection has been idle and when it has reached its max idle time and
// should be closed. A connection reaches its max idle time when it has been
// "available" in the idle connections stack for more than the configured
// duration (maxIdleTimeMS). Set it before we call connectionPerished(),
// which checks the idle deadline, because a newly "available" connection
// should never be perished due to max idle time.
conn.bumpIdleStart()

r, perished := connectionPerished(conn)
if !perished && conn.pool.getState() == poolClosed {
Expand Down
77 changes: 75 additions & 2 deletions x/mongo/driver/topology/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,79 @@ func TestPool(t *testing.T) {
assert.Containsf(t, err.Error(), "canceled", `expected error message to contain "canceled"`)
}

p.close(context.Background())
})
t.Run("discards connections closed by the server side", func(t *testing.T) {
t.Parallel()

cleanup := make(chan struct{})
defer close(cleanup)

ncs := make(chan net.Conn, 2)
addr := bootstrapConnections(t, 2, func(nc net.Conn) {
// Send all "server-side" connections to a channel so we can
// interact with them during the test.
ncs <- nc

<-cleanup
_ = nc.Close()
})

d := newdialer(&net.Dialer{})
p := newPool(poolConfig{
Address: address.Address(addr.String()),
}, WithDialer(func(Dialer) Dialer { return d }))
err := p.ready()
noerr(t, err)

// Add 1 idle connection to the pool by checking-out and checking-in
// a connection.
conn, err := p.checkOut(context.Background())
noerr(t, err)
err = p.checkIn(conn)
noerr(t, err)
assertConnectionsOpened(t, d, 1)
assert.Equalf(t, 1, p.availableConnectionCount(), "should be 1 idle connections in pool")
assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool")

// Make that connection appear as if it's been idle for a minute.
conn.idleStart.Store(time.Now().Add(-1 * time.Minute))

// Close the "server-side" of the connection we just created. The idle
// connection in the pool is now unusable because the "server-side"
// closed it.
nc := <-ncs
err = nc.Close()
noerr(t, err)

// In a separate goroutine, write a valid wire message to the 2nd
// connection that's about to be created. Stop waiting for a 2nd
// connection after 100ms to prevent leaking a goroutine.
go func() {
select {
case nc := <-ncs:
_, err = nc.Write([]byte{5, 0, 0, 0, 0})
noerr(t, err)
case <-time.After(100 * time.Millisecond):
}
}()

// Check out a connection and try to read from it. Expect the pool to
// discard the connection that was closed by the "server-side" and
// return a newly created connection instead.
conn, err = p.checkOut(context.Background())
noerr(t, err)
msg, err := conn.readWireMessage(context.Background())
noerr(t, err)
assert.Equal(t, []byte{5, 0, 0, 0, 0}, msg)

err = p.checkIn(conn)
noerr(t, err)

assertConnectionsOpened(t, d, 2)
assert.Equalf(t, 1, p.availableConnectionCount(), "should be 1 idle connections in pool")
assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool")

p.close(context.Background())
})
})
Expand Down Expand Up @@ -1076,7 +1149,7 @@ func TestPool(t *testing.T) {
p.idleMu.Lock()
for i := 0; i < 2; i++ {
p.idleConns[i].idleTimeout = time.Millisecond
p.idleConns[i].idleDeadline.Store(time.Now().Add(-1 * time.Hour))
p.idleConns[i].idleStart.Store(time.Now().Add(-1 * time.Hour))
}
p.idleMu.Unlock()
assertConnectionsClosed(t, d, 2)
Expand Down Expand Up @@ -1111,7 +1184,7 @@ func TestPool(t *testing.T) {
p.idleMu.Lock()
for i := 0; i < 2; i++ {
p.idleConns[i].idleTimeout = time.Millisecond
p.idleConns[i].idleDeadline.Store(time.Now().Add(-1 * time.Hour))
p.idleConns[i].idleStart.Store(time.Now().Add(-1 * time.Hour))
}
p.idleMu.Unlock()
assertConnectionsClosed(t, d, 2)
Expand Down
Loading