@@ -1944,33 +1944,43 @@ def test_exhaust(self):
1944
1944
self .db .test .insert_many ([{'i' : i } for i in range (150 )])
1945
1945
1946
1946
client = rs_or_single_client (maxPoolSize = 1 )
1947
- socks = get_pool (client ).sockets
1947
+ self .addCleanup (client .close )
1948
+ pool = get_pool (client )
1948
1949
1949
1950
# Make sure the socket is returned after exhaustion.
1950
1951
cur = client [self .db .name ].test .find (cursor_type = CursorType .EXHAUST )
1951
1952
next (cur )
1952
- self .assertEqual (0 , len (socks ))
1953
+ self .assertEqual (0 , len (pool . sockets ))
1953
1954
for _ in cur :
1954
1955
pass
1955
- self .assertEqual (1 , len (socks ))
1956
+ self .assertEqual (1 , len (pool . sockets ))
1956
1957
1957
1958
# Same as previous but don't call next()
1958
1959
for _ in client [self .db .name ].test .find (cursor_type = CursorType .EXHAUST ):
1959
1960
pass
1960
- self .assertEqual (1 , len (socks ))
1961
-
1962
- # If the Cursor instance is discarded before being
1963
- # completely iterated we have to close and
1964
- # discard the socket.
1965
- cur = client [self .db .name ].test .find (cursor_type = CursorType .EXHAUST )
1966
- next (cur )
1967
- self .assertEqual (0 , len (socks ))
1961
+ self .assertEqual (1 , len (pool .sockets ))
1962
+
1963
+ # If the Cursor instance is discarded before being completely iterated
1964
+ # and the socket has pending data (more_to_come=True) we have to close
1965
+ # and discard the socket.
1966
+ cur = client [self .db .name ].test .find (cursor_type = CursorType .EXHAUST ,
1967
+ batch_size = 2 )
1968
+ if client_context .version .at_least (4 , 2 ):
1969
+ # On 4.2+ we use OP_MSG which only sets more_to_come=True after the
1970
+ # first getMore.
1971
+ for _ in range (3 ):
1972
+ next (cur )
1973
+ else :
1974
+ next (cur )
1975
+ self .assertEqual (0 , len (pool .sockets ))
1968
1976
if sys .platform .startswith ('java' ) or 'PyPy' in sys .version :
1969
1977
# Don't wait for GC or use gc.collect(), it's unreliable.
1970
1978
cur .close ()
1971
1979
cur = None
1980
+ # Wait until the background thread returns the socket.
1981
+ wait_until (lambda : pool .active_sockets == 0 , 'return socket' )
1972
1982
# The socket should be discarded.
1973
- self .assertEqual (0 , len (socks ))
1983
+ self .assertEqual (0 , len (pool . sockets ))
1974
1984
1975
1985
def test_distinct (self ):
1976
1986
self .db .drop_collection ("test" )
0 commit comments