Skip to content
43 changes: 0 additions & 43 deletions x/mongo/driver/topology/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"fmt"
"io"
"net"
"os"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -526,48 +525,6 @@ func (c *connection) closed() bool {
return atomic.LoadInt64(&c.state) == connDisconnected
}

// isAlive returns true if the connection is alive and ready to be used for an
// operation.
//
// Note that the liveness check can be slow (at least 1ms), so isAlive only
// checks the liveness of the connection if it's been idle for at least 10
// seconds. For frequently in-use connections, a network error during an
// operation will be the first indication of a dead connection.
func (c *connection) isAlive() bool {
if c.nc == nil {
return false
}

// If the connection has been idle for less than 10 seconds, skip the
// liveness check.
//
// The 10-seconds idle bypass is based on the liveness check implementation
// in the Python Driver. That implementation uses 1 second as the idle
// threshold, but we chose to be more conservative in the Go Driver because
// this is new behavior with unknown side-effects. See
// https://github.com/mongodb/mongo-python-driver/blob/e6b95f65953e01e435004af069a6976473eaf841/pymongo/synchronous/pool.py#L983-L985
idleStart, ok := c.idleStart.Load().(time.Time)
if !ok || idleStart.Add(10*time.Second).After(time.Now()) {
return true
}

// Set a 1ms read deadline and attempt to read 1 byte from the connection.
// Expect it to block for 1ms then return a deadline exceeded error. If it
// returns any other error, the connection is not usable, so return false.
// If it doesn't return an error and actually reads data, the connection is
// also not usable, so return false.
//
// Note that we don't need to un-set the read deadline because the "read"
// and "write" methods always reset the deadlines.
err := c.nc.SetReadDeadline(time.Now().Add(1 * time.Millisecond))
if err != nil {
return false
}
var b [1]byte
_, err = c.nc.Read(b[:])
return errors.Is(err, os.ErrDeadlineExceeded)
}

func (c *connection) idleTimeoutExpired() bool {
if c.idleTimeout == 0 {
return false
Expand Down
82 changes: 0 additions & 82 deletions x/mongo/driver/topology/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,85 +1161,3 @@ func TestConnectionError(t *testing.T) {
assert.ErrorContains(t, err, "client timed out waiting for server response")
})
}

func TestConnection_IsAlive(t *testing.T) {
t.Parallel()

t.Run("uninitialized", func(t *testing.T) {
t.Parallel()

conn := newConnection("")
assert.False(t,
conn.isAlive(),
"expected isAlive for an uninitialized connection to always return false")
})

t.Run("connection open", func(t *testing.T) {
t.Parallel()

cleanup := make(chan struct{})
defer close(cleanup)
addr := bootstrapConnections(t, 1, func(nc net.Conn) {
// Keep the connection open until the end of the test.
<-cleanup
_ = nc.Close()
})

conn := newConnection(address.Address(addr.String()))
err := conn.connect(context.Background())
require.NoError(t, err)

conn.idleStart.Store(time.Now().Add(-11 * time.Second))
assert.True(t,
conn.isAlive(),
"expected isAlive for an open connection to return true")
})

t.Run("connection closed", func(t *testing.T) {
t.Parallel()

conns := make(chan net.Conn)
addr := bootstrapConnections(t, 1, func(nc net.Conn) {
conns <- nc
})

conn := newConnection(address.Address(addr.String()))
err := conn.connect(context.Background())
require.NoError(t, err)

// Close the connection before calling isAlive.
nc := <-conns
err = nc.Close()
require.NoError(t, err)

conn.idleStart.Store(time.Now().Add(-11 * time.Second))
assert.False(t,
conn.isAlive(),
"expected isAlive for a closed connection to return false")
})

t.Run("connection reads data", func(t *testing.T) {
t.Parallel()

cleanup := make(chan struct{})
defer close(cleanup)
addr := bootstrapConnections(t, 1, func(nc net.Conn) {
// Write some data to the connection before calling isAlive.
_, err := nc.Write([]byte{5, 0, 0, 0, 0})
require.NoError(t, err)

// Keep the connection open until the end of the test.
<-cleanup
_ = nc.Close()
})

conn := newConnection(address.Address(addr.String()))
err := conn.connect(context.Background())
require.NoError(t, err)

conn.idleStart.Store(time.Now().Add(-11 * time.Second))
assert.False(t,
conn.isAlive(),
"expected isAlive for an open connection that reads data to return false")
})
}
6 changes: 2 additions & 4 deletions x/mongo/driver/topology/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,9 @@ type reason struct {
// connectionPerished checks if a given connection is perished and should be removed from the pool.
func connectionPerished(conn *connection) (reason, bool) {
switch {
case conn.closed() || !conn.isAlive():
case conn.closed():
// A connection would only be closed if it encountered a network error
// during an operation and closed itself. If a connection is not alive
// (e.g. the connection was closed by the server-side), it's also
// considered a network error.
// during an operation and closed itself.
return reason{
loggerConn: logger.ReasonConnClosedError,
event: event.ReasonError,
Expand Down
73 changes: 0 additions & 73 deletions x/mongo/driver/topology/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,79 +843,6 @@ func TestPool_checkOut(t *testing.T) {
assert.Containsf(t, err.Error(), "canceled", `expected error message to contain "canceled"`)
}

p.close(context.Background())
})
t.Run("discards connections closed by the server side", func(t *testing.T) {
t.Parallel()

cleanup := make(chan struct{})
defer close(cleanup)

ncs := make(chan net.Conn, 2)
addr := bootstrapConnections(t, 2, func(nc net.Conn) {
// Send all "server-side" connections to a channel so we can
// interact with them during the test.
ncs <- nc

<-cleanup
_ = nc.Close()
})

d := newdialer(&net.Dialer{})
p := newPool(poolConfig{
Address: address.Address(addr.String()),
}, WithDialer(func(Dialer) Dialer { return d }))
err := p.ready()
require.NoError(t, err)

// Add 1 idle connection to the pool by checking-out and checking-in
// a connection.
conn, err := p.checkOut(context.Background())
require.NoError(t, err)
err = p.checkIn(conn)
require.NoError(t, err)
assertConnectionsOpened(t, d, 1)
assert.Equalf(t, 1, p.availableConnectionCount(), "should be 1 idle connections in pool")
assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool")

// Make that connection appear as if it's been idle for a minute.
conn.idleStart.Store(time.Now().Add(-1 * time.Minute))

// Close the "server-side" of the connection we just created. The idle
// connection in the pool is now unusable because the "server-side"
// closed it.
nc := <-ncs
err = nc.Close()
require.NoError(t, err)

// In a separate goroutine, write a valid wire message to the 2nd
// connection that's about to be created. Stop waiting for a 2nd
// connection after 100ms to prevent leaking a goroutine.
go func() {
select {
case nc := <-ncs:
_, err := nc.Write([]byte{5, 0, 0, 0, 0})
require.NoError(t, err, "Write error")
case <-time.After(100 * time.Millisecond):
}
}()

// Check out a connection and try to read from it. Expect the pool to
// discard the connection that was closed by the "server-side" and
// return a newly created connection instead.
conn, err = p.checkOut(context.Background())
require.NoError(t, err)
msg, err := conn.readWireMessage(context.Background())
require.NoError(t, err)
assert.Equal(t, []byte{5, 0, 0, 0, 0}, msg)

err = p.checkIn(conn)
require.NoError(t, err)

assertConnectionsOpened(t, d, 2)
assert.Equalf(t, 1, p.availableConnectionCount(), "should be 1 idle connections in pool")
assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool")

p.close(context.Background())
})
}
Expand Down
Loading