|
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | 14 | # from datetime import datetime, timedelta |
| 15 | +import concurrent.futures |
15 | 16 | import datetime |
16 | 17 | import tempfile |
| 18 | +import threading |
17 | 19 | import unittest |
18 | 20 | import unittest.mock as mock |
19 | 21 | import warnings |
@@ -266,18 +268,22 @@ def inner(i): |
266 | 268 | raw_data["nested"] = [inner(i) for i in range(3)] |
267 | 269 | return pd.DataFrame(data=raw_data).astype(schema) |
268 | 270 |
|
269 | | - def test_auto_schema(self): |
| 271 | + def _check_auto_schema(self, coll): |
| 272 | + coll.drop() |
270 | 273 | data = self._create_nested_data() |
271 | | - self.coll.drop() |
272 | | - res = write(self.coll, data) |
| 274 | + res = write(coll, data) |
273 | 275 | self.assertEqual(len(data), res.raw_result["insertedCount"]) |
274 | 276 | for func in [find_pandas_all, aggregate_pandas_all]: |
275 | | - out = func(self.coll, {} if func == find_pandas_all else []).drop(columns=["_id"]) |
| 277 | + out = func(coll, {} if func == find_pandas_all else []).drop(columns=["_id"]) |
276 | 278 | for name in data.columns: |
277 | 279 | val = out[name] |
278 | 280 | if str(val.dtype) == "object": |
279 | 281 | val = val.astype(data[name].dtype) |
280 | 282 | pd.testing.assert_series_equal(data[name], val) |
| 283 | + coll.drop() |
| 284 | + |
| 285 | + def test_auto_schema(self): |
| 286 | + self._check_auto_schema(self.coll) |
281 | 287 |
|
282 | 288 | def test_auto_schema_heterogeneous(self): |
283 | 289 | vals = [1, "2", True, 4] |
@@ -345,6 +351,26 @@ def test_exclude_none(self): |
345 | 351 | col_data = list(self.coll.find({})) |
346 | 352 | assert "b" not in col_data[3] |
347 | 353 |
|
| 354 | + def test_threading(self): |
| 355 | + def run_test(): |
| 356 | + client = client_context.get_client( |
| 357 | + event_listeners=[self.getmore_listener, self.cmd_listener] |
| 358 | + ) |
| 359 | + name = f"test-{threading.current_thread().name}" |
| 360 | + coll = client.pymongoarrow_test.get_collection( |
| 361 | + name, write_concern=WriteConcern(w="majority") |
| 362 | + ) |
| 363 | + self._check_auto_schema(coll) |
| 364 | + client.close() |
| 365 | + |
| 366 | + with concurrent.futures.ThreadPoolExecutor() as executor: |
| 367 | + futures = [] |
| 368 | + for i in range(5): |
| 369 | + futures.append(executor.submit(run_test)) |
| 370 | + concurrent.futures.wait(futures) |
| 371 | + for future in futures: |
| 372 | + future.result() |
| 373 | + |
348 | 374 |
|
349 | 375 | class TestBSONTypes(PandasTestBase): |
350 | 376 | @classmethod |
|
0 commit comments