17
17
import os
18
18
from multiprocessing import Pipe
19
19
from test import IntegrationTest , client_context
20
+ from test .utils import ExceptionCatchingThread , rs_or_single_client
20
21
from unittest import skipIf
21
22
22
23
from bson .objectid import ObjectId
@@ -51,7 +52,11 @@ def exit_cond():
51
52
# In the parent, it'll return here.
52
53
# In the child, it'll end with the calling thread.
53
54
if lock_pid == 0 :
54
- os ._exit (exit_cond ())
55
+ code = - 1
56
+ try :
57
+ code = exit_cond ()
58
+ finally :
59
+ os ._exit (code )
55
60
else :
56
61
self .assertEqual (0 , os .waitpid (lock_pid , 0 )[1 ])
57
62
@@ -65,7 +70,11 @@ def test_lock_object_id(self):
65
70
lock_pid : int = os .fork ()
66
71
67
72
if lock_pid == 0 :
68
- os ._exit (int (ObjectId ._inc_lock .locked ()))
73
+ code = - 1
74
+ try :
75
+ code = int (ObjectId ._inc_lock .locked ())
76
+ finally :
77
+ os ._exit (code )
69
78
else :
70
79
self .assertEqual (0 , os .waitpid (lock_pid , 0 )[1 ])
71
80
@@ -98,3 +107,54 @@ def test_topology_reset(self):
98
107
self .assertNotEqual (child_id , init_id )
99
108
passed , msg = parent_conn .recv ()
100
109
self .assertTrue (passed , msg )
110
+
111
+ def test_many_threaded (self ):
112
+ # Fork randomly while doing operations.
113
+
114
+ clients = []
115
+ for _ in range (10 ):
116
+ c = rs_or_single_client ()
117
+ clients .append (c )
118
+ self .addCleanup (c .close )
119
+
120
+ class ForkThread (ExceptionCatchingThread ):
121
+ def __init__ (self , runner , clients ):
122
+ self .runner = runner
123
+ self .clients = clients
124
+ self .fork = False
125
+
126
+ super ().__init__ (target = self .fork_behavior )
127
+
128
+ def fork_behavior (self ) -> None :
129
+ def action (client ):
130
+ client .admin .command ("ping" )
131
+ return 0
132
+
133
+ for i in range (200 ):
134
+ # Pick a random client.
135
+ rc = self .clients [i % len (self .clients )]
136
+ if i % 50 == 0 and self .fork :
137
+ # Fork
138
+ pid = os .fork ()
139
+ if pid == 0 :
140
+ code = - 1
141
+ try :
142
+ for c in self .clients :
143
+ action (c )
144
+ code = 0
145
+ finally :
146
+ os ._exit (code )
147
+ else :
148
+ self .runner .assertEqual (0 , os .waitpid (pid , 0 )[1 ])
149
+ action (rc )
150
+
151
+ threads = [ForkThread (self , clients ) for _ in range (10 )]
152
+ threads [- 1 ].fork = True
153
+ for t in threads :
154
+ t .start ()
155
+
156
+ for t in threads :
157
+ t .join ()
158
+
159
+ for c in clients :
160
+ c .close ()
0 commit comments