@@ -9,13 +9,19 @@ package topology
99import (
1010 "context"
1111 "errors"
12+ "io"
1213 "net"
14+ "regexp"
15+ "sync"
1316 "testing"
1417 "time"
1518
1619 "go.mongodb.org/mongo-driver/v2/event"
1720 "go.mongodb.org/mongo-driver/v2/internal/assert"
21+ "go.mongodb.org/mongo-driver/v2/internal/csot"
22+ "go.mongodb.org/mongo-driver/v2/internal/driverutil"
1823 "go.mongodb.org/mongo-driver/v2/internal/require"
24+ "go.mongodb.org/mongo-driver/v2/mongo/address"
1925 "go.mongodb.org/mongo-driver/v2/x/mongo/driver/operation"
2026)
2127
@@ -263,6 +269,201 @@ func TestCMAPProse(t *testing.T) {
263269 })
264270 })
265271 })
272+
273+ // Need to test the case where we attempt a non-blocking read to determine if
274+ // we should refresh the remaining time. In the case of the Go Driver, we do
275+ // this by attempt to "pee" at 1 byte with a deadline of 1ns.
276+ t .Run ("connection attempts peek but fails" , func (t * testing.T ) {
277+ const requestID = int32 (- 1 )
278+ timeout := 10 * time .Millisecond
279+
280+ // Mock a TCP listener that will write a byte sequence > 5 (to avoid errors
281+ // due to size) to the TCP socket. Have the listener sleep for 2x the
282+ // timeout provided to the connection AFTER writing the byte sequence. This
283+ // wiill cause the connection to timeout while reading from the socket.
284+ addr := bootstrapConnections (t , 1 , func (nc net.Conn ) {
285+ defer func () {
286+ _ = nc .Close ()
287+ }()
288+
289+ _ , err := nc .Write ([]byte {12 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 1 })
290+ require .NoError (t , err )
291+ time .Sleep (timeout * 2 )
292+
293+ // Write nothing so that the 1 millisecond "non-blocking" peek fails.
294+ })
295+
296+ poolEventsByType := make (map [string ][]event.PoolEvent )
297+ poolEventsByTypeMu := & sync.Mutex {}
298+
299+ monitor := & event.PoolMonitor {
300+ Event : func (pe * event.PoolEvent ) {
301+ poolEventsByTypeMu .Lock ()
302+ poolEventsByType [pe .Type ] = append (poolEventsByType [pe .Type ], * pe )
303+ poolEventsByTypeMu .Unlock ()
304+ },
305+ }
306+
307+ p := newPool (
308+ poolConfig {
309+ Address : address .Address (addr .String ()),
310+ PoolMonitor : monitor ,
311+ },
312+ )
313+ defer p .close (context .Background ())
314+ err := p .ready ()
315+ require .NoError (t , err )
316+
317+ // Check out a connection and read from the socket, causing a timeout and
318+ // pinning the connection to a pending read state.
319+ conn , err := p .checkOut (context .Background ())
320+ require .NoError (t , err )
321+
322+ ctx , cancel := csot .WithTimeout (context .Background (), & timeout )
323+ defer cancel ()
324+
325+ ctx = driverutil .WithValueHasMaxTimeMS (ctx , true )
326+ ctx = driverutil .WithRequestID (ctx , requestID )
327+
328+ _ , err = conn .readWireMessage (ctx )
329+ regex := regexp .MustCompile (
330+ `^connection\(.*\[-\d+\]\) incomplete read of full message: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$` ,
331+ )
332+ assert .True (t , regex .MatchString (err .Error ()), "error %q does not match pattern %q" , err , regex )
333+
334+ // Check in the connection with a pending read state. The next time this
335+ // connection is checked out, it should attempt to read the pending
336+ // response.
337+ err = p .checkIn (conn )
338+ require .NoError (t , err )
339+
340+ // Wait 3s to make sure there is no remaining time on the pending read
341+ // state.
342+ time .Sleep (3 * time .Second )
343+
344+ // Check out the connection again. The remaining time should be exhausted
345+ // requiring us to "peek" at the connection to determine if we should
346+ _ , err = p .checkOut (context .Background ())
347+ assert .ErrorIs (t , err , io .EOF )
348+
349+ // There should be 1 ConnectionPendingResponseStarted event.
350+ started := poolEventsByType [event .ConnectionPendingResponseStarted ]
351+ require .Len (t , started , 1 )
352+
353+ assert .Equal (t , addr .String (), started [0 ].Address )
354+ assert .Equal (t , conn .driverConnectionID , started [0 ].ConnectionID )
355+ assert .Equal (t , requestID , started [0 ].RequestID )
356+
357+ // There should be 1 ConnectionPendingResponseFailed event.
358+ failed := poolEventsByType [event .ConnectionPendingResponseFailed ]
359+ require .Len (t , failed , 1 )
360+
361+ assert .Equal (t , addr .String (), failed [0 ].Address )
362+ assert .Equal (t , conn .driverConnectionID , failed [0 ].ConnectionID )
363+ assert .Equal (t , requestID , failed [0 ].RequestID )
364+ assert .Equal (t , io .EOF .Error (), failed [0 ].Reason )
365+ assert .ErrorIs (t , failed [0 ].Error , io .EOF )
366+ assert .Equal (t , time .Duration (0 ), failed [0 ].RemainingTime )
367+
368+ // There should be 0 ConnectionPendingResponseSucceeded event.
369+ require .Len (t , poolEventsByType [event .ConnectionPendingResponseSucceeded ], 0 )
370+ })
371+
372+ t .Run ("connection attempts peek and succeeds" , func (t * testing.T ) {
373+ const requestID = int32 (- 1 )
374+ timeout := 10 * time .Millisecond
375+
376+ // Mock a TCP listener that will write a byte sequence > 5 (to avoid errors
377+ // due to size) to the TCP socket. Have the listener sleep for 2x the
378+ // timeout provided to the connection AFTER writing the byte sequence. This
379+ // wiill cause the connection to timeout while reading from the socket.
380+ addr := bootstrapConnections (t , 1 , func (nc net.Conn ) {
381+ defer func () {
382+ _ = nc .Close ()
383+ }()
384+
385+ _ , err := nc .Write ([]byte {12 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 1 })
386+ require .NoError (t , err )
387+ time .Sleep (timeout * 2 )
388+
389+ // Write data that can be peeked at.
390+ _ , err = nc .Write ([]byte {12 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 1 })
391+ require .NoError (t , err )
392+
393+ })
394+
395+ poolEventsByType := make (map [string ][]event.PoolEvent )
396+ poolEventsByTypeMu := & sync.Mutex {}
397+
398+ monitor := & event.PoolMonitor {
399+ Event : func (pe * event.PoolEvent ) {
400+ poolEventsByTypeMu .Lock ()
401+ poolEventsByType [pe .Type ] = append (poolEventsByType [pe .Type ], * pe )
402+ poolEventsByTypeMu .Unlock ()
403+ },
404+ }
405+
406+ p := newPool (
407+ poolConfig {
408+ Address : address .Address (addr .String ()),
409+ PoolMonitor : monitor ,
410+ },
411+ )
412+ defer p .close (context .Background ())
413+ err := p .ready ()
414+ require .NoError (t , err )
415+
416+ // Check out a connection and read from the socket, causing a timeout and
417+ // pinning the connection to a pending read state.
418+ conn , err := p .checkOut (context .Background ())
419+ require .NoError (t , err )
420+
421+ ctx , cancel := csot .WithTimeout (context .Background (), & timeout )
422+ defer cancel ()
423+
424+ ctx = driverutil .WithValueHasMaxTimeMS (ctx , true )
425+ ctx = driverutil .WithRequestID (ctx , requestID )
426+
427+ _ , err = conn .readWireMessage (ctx )
428+ regex := regexp .MustCompile (
429+ `^connection\(.*\[-\d+\]\) incomplete read of full message: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$` ,
430+ )
431+ assert .True (t , regex .MatchString (err .Error ()), "error %q does not match pattern %q" , err , regex )
432+
433+ // Check in the connection with a pending read state. The next time this
434+ // connection is checked out, it should attempt to read the pending
435+ // response.
436+ err = p .checkIn (conn )
437+ require .NoError (t , err )
438+
439+ // Wait 3s to make sure there is no remaining time on the pending read
440+ // state.
441+ time .Sleep (3 * time .Second )
442+
443+ // Check out the connection again. The remaining time should be exhausted
444+ // requiring us to "peek" at the connection to determine if we should
445+ _ , err = p .checkOut (context .Background ())
446+ require .NoError (t , err )
447+
448+ // There should be 1 ConnectionPendingResponseStarted event.
449+ started := poolEventsByType [event .ConnectionPendingResponseStarted ]
450+ require .Len (t , started , 1 )
451+
452+ assert .Equal (t , addr .String (), started [0 ].Address )
453+ assert .Equal (t , conn .driverConnectionID , started [0 ].ConnectionID )
454+ assert .Equal (t , requestID , started [0 ].RequestID )
455+
456+ // There should be 0 ConnectionPendingResponseFailed event.
457+ require .Len (t , poolEventsByType [event .ConnectionPendingResponseFailed ], 0 )
458+
459+ // There should be 1 ConnectionPendingResponseSucceeded event.
460+ succeeded := poolEventsByType [event .ConnectionPendingResponseSucceeded ]
461+ require .Len (t , succeeded , 1 )
462+
463+ assert .Equal (t , addr .String (), succeeded [0 ].Address )
464+ assert .Equal (t , conn .driverConnectionID , succeeded [0 ].ConnectionID )
465+ assert .Greater (t , int (succeeded [0 ].Duration ), 0 )
466+ })
266467}
267468
268469func createTestPool (t * testing.T , cfg poolConfig , opts ... ConnectionOption ) * pool {
0 commit comments