@@ -9,6 +9,7 @@ package topology
99import (
1010 "context"
1111 "errors"
12+ "io"
1213 "net"
1314 "regexp"
1415 "sync"
@@ -1566,61 +1567,156 @@ func TestAwaitPendingRead(t *testing.T) {
15661567
15671568 assert .EqualError (t , pendingReadError , "error discarding 3 byte message: EOF" )
15681569 })
1569- //t.Run("no remaining time with no response", func(t *testing.T) {
1570- // timeout := 10 * time.Millisecond
1571-
1572- // addr := bootstrapConnections(t, 1, func(nc net.Conn) {
1573- // defer func() {
1574- // _ = nc.Close()
1575- // }()
1576-
1577- // // Write nothing so that the 10 microseconds "non-blocking"
1578- // })
1579-
1580- // var pendingReadError error
1581- // monitor := &event.PoolMonitor{
1582- // Event: func(pe *event.PoolEvent) {
1583- // if pe.Type == event.ConnectionPendingReadFailed {
1584- // pendingReadError = pe.Error
1585- // }
1586- // },
1587- // }
1588-
1589- // p := newPool(
1590- // poolConfig{
1591- // Address: address.Address(addr.String()),
1592- // PoolMonitor: monitor,
1593- // },
1594- // )
1595- // defer p.close(context.Background())
1596- // err := p.ready()
1597- // require.NoError(t, err)
1598-
1599- // conn, err := p.checkOut(context.Background())
1600- // require.NoError(t, err)
1601-
1602- // ctx, cancel := csot.WithTimeout(context.Background(), &timeout)
1603- // defer cancel()
1604-
1605- // ctx = driverutil.WithValueHasMaxTimeMS(ctx, true)
1606- // ctx = driverutil.WithRequestID(ctx, -1)
1607-
1608- // _, err = conn.readWireMessage(ctx)
1609- // regex := regexp.MustCompile(
1610- // `^connection\(.*\[-\d+\]\) incomplete read of message header: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`,
1611- // )
1612- // assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex)
1613- // err = p.checkIn(conn)
1614- // require.NoError(t, err)
1615-
1616- // // Wait 400ms to ensure there is no time left on the pending read.
1617- // time.Sleep(400 * time.Millisecond)
1618-
1619- // _, err = p.checkOut(context.Background())
1620- // require.NoError(t, err)
1621-
1622- // require.NoError(t, pendingReadError)
1623- //})
1570+
1571+ // Need to test the case where we attempt a non-blocking read to determine if
1572+ // we should refresh the remaining time. In the case of the Go Driver, we do
1573+ // this by attempt to "pee" at 1 byte with a deadline of 1ns.
1574+ t .Run ("connection attempts peek but fails" , func (t * testing.T ) {
1575+ timeout := 10 * time .Millisecond
1576+
1577+ // Mock a TCP listener that will write a byte sequence > 5 (to avoid errors
1578+ // due to size) to the TCP socket. Have the listener sleep for 2x the
1579+ // timeout provided to the connection AFTER writing the byte sequence. This
1580+ // wiill cause the connection to timeout while reading from the socket.
1581+ addr := bootstrapConnections (t , 1 , func (nc net.Conn ) {
1582+ defer func () {
1583+ _ = nc .Close ()
1584+ }()
1585+
1586+ _ , err := nc .Write ([]byte {12 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 1 })
1587+ require .NoError (t , err )
1588+ time .Sleep (timeout * 2 )
1589+
1590+ // Write nothing so that the 1 millisecond "non-blocking" peek fails.
1591+ })
1592+
1593+ var pendingReadError error
1594+ monitor := & event.PoolMonitor {
1595+ Event : func (pe * event.PoolEvent ) {
1596+ if pe .Type == event .ConnectionPendingReadFailed {
1597+ pendingReadError = pe .Error
1598+ }
1599+ },
1600+ }
1601+
1602+ p := newPool (
1603+ poolConfig {
1604+ Address : address .Address (addr .String ()),
1605+ PoolMonitor : monitor ,
1606+ },
1607+ )
1608+ defer p .close (context .Background ())
1609+ err := p .ready ()
1610+ require .NoError (t , err )
1611+
1612+ // Check out a connection and read from the socket, causing a timeout and
1613+ // pinning the connection to a pending read state.
1614+ conn , err := p .checkOut (context .Background ())
1615+ require .NoError (t , err )
1616+
1617+ ctx , cancel := csot .WithTimeout (context .Background (), & timeout )
1618+ defer cancel ()
1619+
1620+ ctx = driverutil .WithValueHasMaxTimeMS (ctx , true )
1621+ ctx = driverutil .WithRequestID (ctx , - 1 )
1622+
1623+ _ , err = conn .readWireMessage (ctx )
1624+ regex := regexp .MustCompile (
1625+ `^connection\(.*\[-\d+\]\) incomplete read of full message: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$` ,
1626+ )
1627+ assert .True (t , regex .MatchString (err .Error ()), "error %q does not match pattern %q" , err , regex )
1628+
1629+ // Check in the connection with a pending read state. The next time this
1630+ // connection is checked out, it should attempt to read the pending
1631+ // response.
1632+ err = p .checkIn (conn )
1633+ require .NoError (t , err )
1634+
1635+ // Wait 3s to make sure there is no remaining time on the pending read
1636+ // state.
1637+ time .Sleep (3 * time .Second )
1638+
1639+ // Check out the connection again. The remaining time should be exhausted
1640+ // requiring us to "peek" at the connection to determine if we should
1641+ _ , err = p .checkOut (context .Background ())
1642+ assert .ErrorIs (t , err , io .EOF )
1643+ assert .ErrorIs (t , pendingReadError , io .EOF )
1644+ })
1645+
1646+ t .Run ("connection attempts peek and succeeds" , func (t * testing.T ) {
1647+ timeout := 10 * time .Millisecond
1648+
1649+ // Mock a TCP listener that will write a byte sequence > 5 (to avoid errors
1650+ // due to size) to the TCP socket. Have the listener sleep for 2x the
1651+ // timeout provided to the connection AFTER writing the byte sequence. This
1652+ // wiill cause the connection to timeout while reading from the socket.
1653+ addr := bootstrapConnections (t , 1 , func (nc net.Conn ) {
1654+ defer func () {
1655+ _ = nc .Close ()
1656+ }()
1657+
1658+ _ , err := nc .Write ([]byte {12 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 1 })
1659+ require .NoError (t , err )
1660+ time .Sleep (timeout * 2 )
1661+
1662+ // Write data that can be peeked at.
1663+ _ , err = nc .Write ([]byte {12 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 1 })
1664+ require .NoError (t , err )
1665+
1666+ })
1667+
1668+ var pendingReadError error
1669+ monitor := & event.PoolMonitor {
1670+ Event : func (pe * event.PoolEvent ) {
1671+ if pe .Type == event .ConnectionPendingReadFailed {
1672+ pendingReadError = pe .Error
1673+ }
1674+ },
1675+ }
1676+
1677+ p := newPool (
1678+ poolConfig {
1679+ Address : address .Address (addr .String ()),
1680+ PoolMonitor : monitor ,
1681+ },
1682+ )
1683+ defer p .close (context .Background ())
1684+ err := p .ready ()
1685+ require .NoError (t , err )
1686+
1687+ // Check out a connection and read from the socket, causing a timeout and
1688+ // pinning the connection to a pending read state.
1689+ conn , err := p .checkOut (context .Background ())
1690+ require .NoError (t , err )
1691+
1692+ ctx , cancel := csot .WithTimeout (context .Background (), & timeout )
1693+ defer cancel ()
1694+
1695+ ctx = driverutil .WithValueHasMaxTimeMS (ctx , true )
1696+ ctx = driverutil .WithRequestID (ctx , - 1 )
1697+
1698+ _ , err = conn .readWireMessage (ctx )
1699+ regex := regexp .MustCompile (
1700+ `^connection\(.*\[-\d+\]\) incomplete read of full message: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$` ,
1701+ )
1702+ assert .True (t , regex .MatchString (err .Error ()), "error %q does not match pattern %q" , err , regex )
1703+
1704+ // Check in the connection with a pending read state. The next time this
1705+ // connection is checked out, it should attempt to read the pending
1706+ // response.
1707+ err = p .checkIn (conn )
1708+ require .NoError (t , err )
1709+
1710+ // Wait 3s to make sure there is no remaining time on the pending read
1711+ // state.
1712+ time .Sleep (3 * time .Second )
1713+
1714+ // Check out the connection again. The remaining time should be exhausted
1715+ // requiring us to "peek" at the connection to determine if we should
1716+ _ , err = p .checkOut (context .Background ())
1717+ require .NoError (t , err )
1718+ require .NoError (t , pendingReadError )
1719+ })
16241720}
16251721
16261722func assertConnectionsClosed (t * testing.T , dialer * dialer , count int ) {
0 commit comments