16
16
17
17
import os
18
18
from multiprocessing import Pipe
19
- from test import IntegrationTest , client_context
19
+ from test import IntegrationTest
20
20
from test .utils import (
21
21
ExceptionCatchingThread ,
22
22
is_greenthread_patched ,
27
27
from bson .objectid import ObjectId
28
28
29
29
30
- @client_context .require_connection
31
- def setUpModule ():
32
- pass
33
-
34
-
35
- # Not available for versions of Python without "register_at_fork"
36
30
@skipIf (
37
31
not hasattr (os , "register_at_fork" ), "register_at_fork not available in this version of Python"
38
32
)
@@ -42,83 +36,52 @@ def setUpModule():
42
36
)
43
37
class TestFork (IntegrationTest ):
44
38
def test_lock_client (self ):
45
- """
46
- Forks the client with some items locked.
47
- Parent => All locks should be as before the fork.
48
- Child => All locks should be reset.
49
- """
50
-
51
- def exit_cond ():
52
- self .client .admin .command ("ping" )
53
- return 0
54
-
39
+ # Forks the client with some items locked.
40
+ # Parent => All locks should be as before the fork.
41
+ # Child => All locks should be reset.
55
42
with self .client ._MongoClient__lock :
56
- # Call _get_topology, will launch a thread to fork upon __enter__ing
57
- # the with region.
58
- lock_pid = os .fork ()
59
- # The POSIX standard states only the forking thread is cloned.
60
- # In the parent, it'll return here.
61
- # In the child, it'll end with the calling thread.
62
- if lock_pid == 0 :
63
- code = - 1
64
- try :
65
- code = exit_cond ()
66
- finally :
67
- os ._exit (code )
68
- else :
69
- self .assertEqual (0 , os .waitpid (lock_pid , 0 )[1 ])
43
+ with self .fork () as pid :
44
+ if pid == 0 : # Child
45
+ self .client .admin .command ("ping" )
46
+ self .client .admin .command ("ping" )
70
47
71
48
def test_lock_object_id (self ):
72
- """
73
- Forks the client with ObjectId's _inc_lock locked.
74
- Parent => _inc_lock should remain locked.
75
- Child => _inc_lock should be unlocked.
76
- """
49
+ # Forks the client with ObjectId's _inc_lock locked.
50
+ # Parent => _inc_lock should remain locked.
51
+ # Child => _inc_lock should be unlocked.
77
52
with ObjectId ._inc_lock :
78
- lock_pid : int = os .fork ()
79
-
80
- if lock_pid == 0 :
81
- code = - 1
82
- try :
83
- code = int (ObjectId ._inc_lock .locked ())
84
- finally :
85
- os ._exit (code )
86
- else :
87
- self .assertEqual (0 , os .waitpid (lock_pid , 0 )[1 ])
53
+ with self .fork () as pid :
54
+ if pid == 0 : # Child
55
+ self .assertFalse (ObjectId ._inc_lock .locked ())
56
+ self .assertTrue (ObjectId ())
88
57
89
58
def test_topology_reset (self ):
90
- """
91
- Tests that topologies are different from each other.
92
- Cannot use ID because virtual memory addresses may be the same.
93
- Cannot reinstantiate ObjectId in the topology settings.
94
- Relies on difference in PID when opened again.
95
- """
59
+ # Tests that topologies are different from each other.
60
+ # Cannot use ID because virtual memory addresses may be the same.
61
+ # Cannot reinstantiate ObjectId in the topology settings.
62
+ # Relies on difference in PID when opened again.
96
63
parent_conn , child_conn = Pipe ()
97
64
init_id = self .client ._topology ._pid
98
65
parent_cursor_exc = self .client ._kill_cursors_executor
99
- lock_pid : int = os .fork ()
100
-
101
- if lock_pid == 0 : # Child
102
- self .client .admin . command ( "ping" )
103
- child_conn .send (self . client . _topology . _pid )
104
- child_conn . send (
105
- (
106
- parent_cursor_exc != self . client ._kill_cursors_executor ,
107
- "client._kill_cursors_executor was not reinitialized" ,
66
+ with self .fork () as pid :
67
+ if pid == 0 : # Child
68
+ self . client . admin . command ( "ping" )
69
+ child_conn . send ( self .client ._topology . _pid )
70
+ child_conn .send (
71
+ (
72
+ parent_cursor_exc != self . client . _kill_cursors_executor ,
73
+ " client._kill_cursors_executor was not reinitialized" ,
74
+ )
108
75
)
109
- )
110
- os ._exit (0 )
111
- else : # Parent
112
- self .assertEqual (0 , os .waitpid (lock_pid , 0 )[1 ])
113
- self .assertEqual (self .client ._topology ._pid , init_id )
114
- child_id = parent_conn .recv ()
115
- self .assertNotEqual (child_id , init_id )
116
- passed , msg = parent_conn .recv ()
117
- self .assertTrue (passed , msg )
76
+ else : # Parent
77
+ self .assertEqual (self .client ._topology ._pid , init_id )
78
+ child_id = parent_conn .recv ()
79
+ self .assertNotEqual (child_id , init_id )
80
+ passed , msg = parent_conn .recv ()
81
+ self .assertTrue (passed , msg )
118
82
119
83
def test_many_threaded (self ):
120
84
# Fork randomly while doing operations.
121
-
122
85
clients = []
123
86
for _ in range (10 ):
124
87
c = rs_or_single_client ()
@@ -143,17 +106,10 @@ def action(client):
143
106
rc = self .clients [i % len (self .clients )]
144
107
if i % 50 == 0 and self .fork :
145
108
# Fork
146
- pid = os .fork ()
147
- if pid == 0 :
148
- code = - 1
149
- try :
109
+ with self .runner .fork () as pid :
110
+ if pid == 0 : # Child
150
111
for c in self .clients :
151
112
action (c )
152
- code = 0
153
- finally :
154
- os ._exit (code )
155
- else :
156
- self .runner .assertEqual (0 , os .waitpid (pid , 0 )[1 ])
157
113
action (rc )
158
114
159
115
threads = [ForkThread (self , clients ) for _ in range (10 )]
0 commit comments