Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/dist-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,17 @@ jobs:
- [ubuntu-24.04, manylinux_aarch64]
- [macos-14, macosx_*]
- [windows-2019, win_amd64]
python: ["cp39", "cp310", "cp311", "cp312", "cp313"]
python: ["cp39", "cp310", "cp311", "cp312", "cp313", "cp313t"]
exclude:
- buildplat: [macos-14, macosx_*]
python: "cp39"
- buildplat: [windows-2019, win_amd64]
python: "cp313t"
include:
- buildplat: [macos-13, macosx_*]
python: "cp39"


steps:
- name: Checkout pymongoarrow
uses: actions/checkout@v4
Expand Down Expand Up @@ -85,13 +88,15 @@ jobs:
env:
MACOS_TEST_SKIP: "*arm64"
CIBW_BUILD: cp39-macosx_*
CIBW_ENABLE: cpython-freethreading
MACOSX_DEPLOYMENT_TARGET: "10.14"
run: python -m cibuildwheel --output-dir wheelhouse

- name: Build wheels
if: ${{ matrix.buildplat[0] != 'macos-11' }}
env:
CIBW_BUILD: ${{ matrix.python }}-${{ matrix.buildplat[1] }}
CIBW_ENABLE: cpython-freethreading
MACOSX_DEPLOYMENT_TARGET: "12.0"
run: python -m cibuildwheel --output-dir wheelhouse

Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/test-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ jobs:
strategy:
matrix:
os: ["ubuntu-latest", "macos-latest", "windows-latest"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.13t"]
exclude:
- os: "windows-latest"
python: "3.13t"
fail-fast: false
name: CPython ${{ matrix.python-version }}-${{ matrix.os }}
steps:
Expand Down
1 change: 1 addition & 0 deletions bindings/python/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# Changes in Version 1.8.0 (2025/MM/YY)

- Add support for PyArrow 20.0.
- Add support for free-threaded python on Linux and MacOS.

# Changes in Version 1.7.2 (2025/04/23)

Expand Down
1 change: 1 addition & 0 deletions bindings/python/pymongoarrow/lib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# Cython compiler directives
# distutils: language=c++
# cython: language_level=3
# cython: freethreading_compatible = True

# Stdlib imports
import sys
Expand Down
1 change: 1 addition & 0 deletions bindings/python/pymongoarrow/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# Cython compiler directives
# cython: language_level=3
# distutils: language=c++
# cython: freethreading_compatible = True
from libcpp.vector cimport vector
from libc.stdint cimport int32_t, uint8_t
from pyarrow.lib cimport *
Expand Down
1 change: 1 addition & 0 deletions bindings/python/pymongoarrow/libbson.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# Cython compiler directives
# cython: language_level=3
# distutils: language=c
# cython: freethreading_compatible = True
from libc.stdint cimport int32_t, int64_t, uint8_t, uint32_t, uint64_t


Expand Down
29 changes: 27 additions & 2 deletions bindings/python/test/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import concurrent.futures
import io
import json
import tempfile
import threading
import unittest
import unittest.mock as mock
from datetime import date, datetime
Expand Down Expand Up @@ -244,8 +246,8 @@ def test_aggregate_omits_id_if_not_in_schema(self):
def round_trip(self, data, schema, coll=None):
if coll is None:
coll = self.coll
self.coll.drop()
res = write(self.coll, data)
coll.drop()
res = write(coll, data)
self.assertEqual(len(data), res.raw_result["insertedCount"])
self.assertEqual(data, find_arrow_all(coll, {}, schema=schema))
return res
Expand Down Expand Up @@ -1052,6 +1054,29 @@ def test_empty_embedded_array(self):
assert pmapatable2.to_pylist()[0] == doc2
write_table(pmapatable2, io.BytesIO())

def test_threading(self):
schema, data = self._create_data()

def run_test():
client = client_context.get_client(
event_listeners=[self.getmore_listener, self.cmd_listener]
)
name = f"test-{threading.current_thread().name}"
coll = client.pymongoarrow_test.get_collection(
name, write_concern=WriteConcern(w="majority")
)
coll.drop()
self.round_trip(data, Schema(schema), coll=coll)
client.close()

with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for i in range(5):
futures.append(executor.submit(run_test))
concurrent.futures.wait(futures)
for future in futures:
future.result()


class TestArrowExplicitApi(ArrowApiTestMixin, unittest.TestCase):
def run_find(self, *args, **kwargs):
Expand Down
34 changes: 30 additions & 4 deletions bindings/python/test/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# from datetime import datetime, timedelta
import concurrent.futures
import datetime
import tempfile
import threading
import unittest
import unittest.mock as mock
import warnings
Expand Down Expand Up @@ -266,18 +268,22 @@ def inner(i):
raw_data["nested"] = [inner(i) for i in range(3)]
return pd.DataFrame(data=raw_data).astype(schema)

def test_auto_schema(self):
def _check_auto_schema(self, coll):
coll.drop()
data = self._create_nested_data()
self.coll.drop()
res = write(self.coll, data)
res = write(coll, data)
self.assertEqual(len(data), res.raw_result["insertedCount"])
for func in [find_pandas_all, aggregate_pandas_all]:
out = func(self.coll, {} if func == find_pandas_all else []).drop(columns=["_id"])
out = func(coll, {} if func == find_pandas_all else []).drop(columns=["_id"])
for name in data.columns:
val = out[name]
if str(val.dtype) == "object":
val = val.astype(data[name].dtype)
pd.testing.assert_series_equal(data[name], val)
coll.drop()

def test_auto_schema(self):
self._check_auto_schema(self.coll)

def test_auto_schema_heterogeneous(self):
vals = [1, "2", True, 4]
Expand Down Expand Up @@ -345,6 +351,26 @@ def test_exclude_none(self):
col_data = list(self.coll.find({}))
assert "b" not in col_data[3]

def test_threading(self):
def run_test():
client = client_context.get_client(
event_listeners=[self.getmore_listener, self.cmd_listener]
)
name = f"test-{threading.current_thread().name}"
coll = client.pymongoarrow_test.get_collection(
name, write_concern=WriteConcern(w="majority")
)
self._check_auto_schema(coll)
client.close()

with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for i in range(5):
futures.append(executor.submit(run_test))
concurrent.futures.wait(futures)
for future in futures:
future.result()


class TestBSONTypes(PandasTestBase):
@classmethod
Expand Down
46 changes: 36 additions & 10 deletions bindings/python/test/test_polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import concurrent.futures
import threading
import unittest
import unittest.mock as mock
import uuid
Expand Down Expand Up @@ -65,14 +67,13 @@ def setUpClass(cls):
def setUp(self):
"""Insert simple use case data."""
self.coll.drop()
self.coll.insert_many(
[
{"_id": 1, "data": 10},
{"_id": 2, "data": 20},
{"_id": 3, "data": 30},
{"_id": 4},
]
)
self.data = [
{"_id": 1, "data": 10},
{"_id": 2, "data": 20},
{"_id": 3, "data": 30},
{"_id": 4},
]
self.coll.insert_many(self.data)
self.cmd_listener.reset()
self.getmore_listener.reset()

Expand Down Expand Up @@ -115,15 +116,17 @@ def test_find_simple(self):
self.assertEqual(find_cmd.command_name, "find")
self.assertEqual(find_cmd.command["projection"], {"_id": True, "data": True})

def test_aggregate_simple(self):
def _check_aggregation_simple(self, coll):
expected = pl.DataFrame(
data={
"_id": pl.Series(values=[1, 2, 3, 4], dtype=pl.Int32),
"data": pl.Series(values=[20, 40, 60, None], dtype=pl.Int64),
}
)
coll.drop()
coll.insert_many(self.data)
projection = {"_id": True, "data": {"$multiply": [2, "$data"]}}
table = aggregate_polars_all(self.coll, [{"$project": projection}], schema=self.schema)
table = aggregate_polars_all(coll, [{"$project": projection}], schema=self.schema)
self.assertTrue(table.equals(expected))

agg_cmd = self.cmd_listener.results["started"][-1]
Expand All @@ -132,6 +135,9 @@ def test_aggregate_simple(self):
self.assertEqual(agg_cmd.command["pipeline"][0]["$project"], projection)
self.assertEqual(agg_cmd.command["pipeline"][1]["$project"], {"_id": True, "data": True})

def test_aggregate_simple(self):
self._check_aggregation_simple(self.coll)

@mock.patch.object(Collection, "insert_many", side_effect=Collection.insert_many, autospec=True)
def test_write_batching(self, mock):
data = pl.DataFrame(data={"_id": pl.Series(values=range(100040), dtype=pl.Int64)})
Expand Down Expand Up @@ -395,3 +401,23 @@ def test_bson_types(self):
assert dfpl["value"].dtype == data_type["ptype"]
except pl.exceptions.ComputeError:
assert isinstance(table["value"].type, pa.ExtensionType)

def test_threading(self):
def run_test():
client = client_context.get_client(
event_listeners=[self.getmore_listener, self.cmd_listener]
)
name = f"test-{threading.current_thread().name}"
coll = client.pymongoarrow_test.get_collection(
name, write_concern=WriteConcern(w="majority")
)
self._check_aggregation_simple(coll)
client.close()

with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for i in range(5):
futures.append(executor.submit(run_test))
concurrent.futures.wait(futures)
for future in futures:
future.result()
Loading
Loading