Skip to content

Commit f819ff0

Browse files
authored
feat: add Arrow support to SQLite and MySQL adapters (#158)
`sqlite`, `aiosqlite`, and `asyncmy` tests for `select_to_arrow`
1 parent d699b11 commit f819ff0

File tree

3 files changed

+704
-0
lines changed

3 files changed

+704
-0
lines changed
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
"""Integration tests for aiosqlite Arrow query support."""
2+
3+
from collections.abc import AsyncGenerator
4+
5+
import pytest
6+
7+
from sqlspec.adapters.aiosqlite import AiosqliteConfig
8+
9+
pytestmark = pytest.mark.xdist_group("sqlite")
10+
11+
12+
@pytest.fixture
13+
async def aiosqlite_arrow_config() -> AsyncGenerator[AiosqliteConfig, None]:
14+
"""Create aiosqlite config for Arrow testing."""
15+
config = AiosqliteConfig()
16+
try:
17+
yield config
18+
finally:
19+
await config.close_pool()
20+
21+
22+
async def test_select_to_arrow_basic(aiosqlite_arrow_config: AiosqliteConfig) -> None:
23+
"""Test basic select_to_arrow functionality."""
24+
import pyarrow as pa
25+
26+
try:
27+
async with aiosqlite_arrow_config.provide_session() as session:
28+
# Create test table with unique name
29+
await session.execute("DROP TABLE IF EXISTS arrow_users")
30+
await session.execute("CREATE TABLE arrow_users (id INTEGER, name TEXT, age INTEGER)")
31+
await session.execute("INSERT INTO arrow_users VALUES (1, 'Alice', 30), (2, 'Bob', 25)")
32+
33+
# Test Arrow query
34+
result = await session.select_to_arrow("SELECT * FROM arrow_users ORDER BY id")
35+
36+
assert result is not None
37+
assert isinstance(result.data, (pa.Table, pa.RecordBatch))
38+
assert result.rows_affected == 2
39+
40+
# Convert to pandas and verify
41+
df = result.to_pandas()
42+
assert len(df) == 2
43+
assert list(df["name"]) == ["Alice", "Bob"]
44+
finally:
45+
await aiosqlite_arrow_config.close_pool()
46+
47+
48+
async def test_select_to_arrow_table_format(aiosqlite_arrow_config: AiosqliteConfig) -> None:
49+
"""Test select_to_arrow with table return format (default)."""
50+
import pyarrow as pa
51+
52+
try:
53+
async with aiosqlite_arrow_config.provide_session() as session:
54+
await session.execute("DROP TABLE IF EXISTS arrow_table_test")
55+
await session.execute("CREATE TABLE arrow_table_test (id INTEGER, value TEXT)")
56+
await session.execute("INSERT INTO arrow_table_test VALUES (1, 'a'), (2, 'b'), (3, 'c')")
57+
58+
result = await session.select_to_arrow("SELECT * FROM arrow_table_test ORDER BY id", return_format="table")
59+
60+
assert isinstance(result.data, pa.Table)
61+
assert result.rows_affected == 3
62+
finally:
63+
await aiosqlite_arrow_config.close_pool()
64+
65+
66+
async def test_select_to_arrow_batch_format(aiosqlite_arrow_config: AiosqliteConfig) -> None:
67+
"""Test select_to_arrow with batch return format."""
68+
import pyarrow as pa
69+
70+
try:
71+
async with aiosqlite_arrow_config.provide_session() as session:
72+
await session.execute("DROP TABLE IF EXISTS arrow_batch_test")
73+
await session.execute("CREATE TABLE arrow_batch_test (id INTEGER, value TEXT)")
74+
await session.execute("INSERT INTO arrow_batch_test VALUES (1, 'a'), (2, 'b')")
75+
76+
result = await session.select_to_arrow(
77+
"SELECT * FROM arrow_batch_test ORDER BY id", return_format="batches"
78+
)
79+
80+
assert isinstance(result.data, pa.RecordBatch)
81+
assert result.rows_affected == 2
82+
finally:
83+
await aiosqlite_arrow_config.close_pool()
84+
85+
86+
async def test_select_to_arrow_with_parameters(aiosqlite_arrow_config: AiosqliteConfig) -> None:
87+
"""Test select_to_arrow with query parameters."""
88+
try:
89+
async with aiosqlite_arrow_config.provide_session() as session:
90+
await session.execute("DROP TABLE IF EXISTS arrow_params_test")
91+
await session.execute("CREATE TABLE arrow_params_test (id INTEGER, value INTEGER)")
92+
await session.execute("INSERT INTO arrow_params_test VALUES (1, 100), (2, 200), (3, 300)")
93+
94+
# Test with parameterized query - SQLite uses ? style
95+
result = await session.select_to_arrow(
96+
"SELECT * FROM arrow_params_test WHERE value > ? ORDER BY id", (150,)
97+
)
98+
99+
assert result.rows_affected == 2
100+
df = result.to_pandas()
101+
assert list(df["value"]) == [200, 300]
102+
finally:
103+
await aiosqlite_arrow_config.close_pool()
104+
105+
106+
async def test_select_to_arrow_empty_result(aiosqlite_arrow_config: AiosqliteConfig) -> None:
107+
"""Test select_to_arrow with empty result set."""
108+
try:
109+
async with aiosqlite_arrow_config.provide_session() as session:
110+
await session.execute("DROP TABLE IF EXISTS arrow_empty_test")
111+
await session.execute("CREATE TABLE arrow_empty_test (id INTEGER)")
112+
113+
result = await session.select_to_arrow("SELECT * FROM arrow_empty_test")
114+
115+
assert result.rows_affected == 0
116+
assert len(result.to_pandas()) == 0
117+
finally:
118+
await aiosqlite_arrow_config.close_pool()
119+
120+
121+
async def test_select_to_arrow_null_handling(aiosqlite_arrow_config: AiosqliteConfig) -> None:
122+
"""Test select_to_arrow with NULL values."""
123+
try:
124+
async with aiosqlite_arrow_config.provide_session() as session:
125+
await session.execute("DROP TABLE IF EXISTS arrow_null_test")
126+
await session.execute("CREATE TABLE arrow_null_test (id INTEGER, value TEXT)")
127+
await session.execute("INSERT INTO arrow_null_test VALUES (1, 'a'), (2, NULL), (3, 'c')")
128+
129+
result = await session.select_to_arrow("SELECT * FROM arrow_null_test ORDER BY id")
130+
131+
df = result.to_pandas()
132+
assert len(df) == 3
133+
assert df.iloc[1]["value"] is None or df.isna().iloc[1]["value"]
134+
finally:
135+
await aiosqlite_arrow_config.close_pool()
136+
137+
138+
async def test_select_to_arrow_to_polars(aiosqlite_arrow_config: AiosqliteConfig) -> None:
139+
"""Test select_to_arrow conversion to Polars DataFrame."""
140+
pytest.importorskip("polars")
141+
142+
try:
143+
async with aiosqlite_arrow_config.provide_session() as session:
144+
await session.execute("DROP TABLE IF EXISTS arrow_polars_test")
145+
await session.execute("CREATE TABLE arrow_polars_test (id INTEGER, value TEXT)")
146+
await session.execute("INSERT INTO arrow_polars_test VALUES (1, 'a'), (2, 'b')")
147+
148+
result = await session.select_to_arrow("SELECT * FROM arrow_polars_test ORDER BY id")
149+
df = result.to_polars()
150+
151+
assert len(df) == 2
152+
assert df["value"].to_list() == ["a", "b"]
153+
finally:
154+
await aiosqlite_arrow_config.close_pool()
155+
156+
157+
async def test_select_to_arrow_large_dataset(aiosqlite_arrow_config: AiosqliteConfig) -> None:
158+
"""Test select_to_arrow with larger dataset."""
159+
try:
160+
async with aiosqlite_arrow_config.provide_session() as session:
161+
await session.execute("DROP TABLE IF EXISTS arrow_large_test")
162+
await session.execute("CREATE TABLE arrow_large_test (id INTEGER, value INTEGER)")
163+
164+
# Insert 1000 rows
165+
for i in range(1, 1001):
166+
await session.execute("INSERT INTO arrow_large_test VALUES (?, ?)", (i, i * 10))
167+
168+
result = await session.select_to_arrow("SELECT * FROM arrow_large_test ORDER BY id")
169+
170+
assert result.rows_affected == 1000
171+
df = result.to_pandas()
172+
assert len(df) == 1000
173+
assert df["value"].sum() == sum(i * 10 for i in range(1, 1001))
174+
finally:
175+
await aiosqlite_arrow_config.close_pool()
176+
177+
178+
async def test_select_to_arrow_type_preservation(aiosqlite_arrow_config: AiosqliteConfig) -> None:
179+
"""Test that SQLite types are properly converted to Arrow types."""
180+
try:
181+
async with aiosqlite_arrow_config.provide_session() as session:
182+
await session.execute("DROP TABLE IF EXISTS arrow_types_test")
183+
await session.execute(
184+
"""
185+
CREATE TABLE arrow_types_test (
186+
id INTEGER,
187+
name TEXT,
188+
price REAL,
189+
created_at TEXT,
190+
is_active INTEGER
191+
)
192+
"""
193+
)
194+
await session.execute(
195+
"""
196+
INSERT INTO arrow_types_test VALUES
197+
(1, 'Item 1', 19.99, '2025-01-01 10:00:00', 1),
198+
(2, 'Item 2', 29.99, '2025-01-02 15:30:00', 0)
199+
"""
200+
)
201+
202+
result = await session.select_to_arrow("SELECT * FROM arrow_types_test ORDER BY id")
203+
204+
df = result.to_pandas()
205+
assert len(df) == 2
206+
assert df["name"].dtype == object
207+
# SQLite INTEGER (for booleans) comes through as int64
208+
assert df["is_active"].dtype in (int, "int64", "Int64")
209+
finally:
210+
await aiosqlite_arrow_config.close_pool()
211+
212+
213+
async def test_select_to_arrow_json_handling(aiosqlite_arrow_config: AiosqliteConfig) -> None:
214+
"""Test SQLite JSON type handling in Arrow results."""
215+
try:
216+
async with aiosqlite_arrow_config.provide_session() as session:
217+
await session.execute("DROP TABLE IF EXISTS arrow_json_test")
218+
await session.execute("CREATE TABLE arrow_json_test (id INTEGER, data TEXT)")
219+
await session.execute(
220+
"""
221+
INSERT INTO arrow_json_test VALUES
222+
(1, '{"name": "Alice", "age": 30}'),
223+
(2, '{"name": "Bob", "age": 25}')
224+
"""
225+
)
226+
227+
result = await session.select_to_arrow("SELECT * FROM arrow_json_test ORDER BY id")
228+
229+
# SQLite JSON is stored as TEXT, Arrow converts to string
230+
df = result.to_pandas()
231+
assert len(df) == 2
232+
assert isinstance(df["data"].iloc[0], str)
233+
assert "Alice" in df["data"].iloc[0]
234+
finally:
235+
await aiosqlite_arrow_config.close_pool()

0 commit comments

Comments
 (0)