|
46 | 46 | from datafusion.expr import Window |
47 | 47 | from pyarrow.csv import write_csv |
48 | 48 |
|
| 49 | +pa_cffi = pytest.importorskip("pyarrow.cffi") |
| 50 | + |
49 | 51 | MB = 1024 * 1024 |
50 | 52 |
|
51 | 53 |
|
@@ -1637,6 +1639,44 @@ def test_arrow_c_stream_reader(df): |
1637 | 1639 | assert table.equals(expected) |
1638 | 1640 |
|
1639 | 1641 |
|
| 1642 | +def test_arrow_c_stream_schema_selection(fail_collect): |
| 1643 | + ctx = SessionContext() |
| 1644 | + |
| 1645 | + batch = pa.RecordBatch.from_arrays( |
| 1646 | + [ |
| 1647 | + pa.array([1, 2]), |
| 1648 | + pa.array([3, 4]), |
| 1649 | + pa.array([5, 6]), |
| 1650 | + ], |
| 1651 | + names=["a", "b", "c"], |
| 1652 | + ) |
| 1653 | + df = ctx.create_dataframe([[batch]]) |
| 1654 | + |
| 1655 | + requested_schema = pa.schema([("c", pa.int64()), ("a", pa.int64())]) |
| 1656 | + |
| 1657 | + c_schema = pa_cffi.ffi.new("struct ArrowSchema*") |
| 1658 | + address = int(pa_cffi.ffi.cast("uintptr_t", c_schema)) |
| 1659 | + requested_schema._export_to_c(address) |
| 1660 | + capsule_new = ctypes.pythonapi.PyCapsule_New |
| 1661 | + capsule_new.restype = ctypes.py_object |
| 1662 | + capsule_new.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_void_p] |
| 1663 | + schema_capsule = capsule_new(ctypes.c_void_p(address), b"arrow_schema", None) |
| 1664 | + |
| 1665 | + reader = pa.RecordBatchReader._import_from_c_capsule( |
| 1666 | + df.__arrow_c_stream__(schema_capsule) |
| 1667 | + ) |
| 1668 | + |
| 1669 | + assert reader.schema == requested_schema |
| 1670 | + |
| 1671 | + batches = list(reader) |
| 1672 | + |
| 1673 | + assert len(batches) == 1 |
| 1674 | + expected_batch = pa.record_batch( |
| 1675 | + [pa.array([5, 6]), pa.array([1, 2])], names=["c", "a"] |
| 1676 | + ) |
| 1677 | + assert batches[0].equals(expected_batch) |
| 1678 | + |
| 1679 | + |
1640 | 1680 | def test_to_pylist(df): |
1641 | 1681 | # Convert datafusion dataframe to Python list |
1642 | 1682 | pylist = df.to_pylist() |
|
0 commit comments