Skip to content

Commit 0f2415e

Browse files
committed
refactor: deprecate usage of cursor.execute statements in favor of the in class implementation of execute.
1 parent 42bf375 commit 0f2415e

File tree

2 files changed

+46
-26
lines changed

2 files changed

+46
-26
lines changed

pandas/io/sql.py

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
)
1212
from contextlib import (
1313
ExitStack,
14+
closing,
1415
contextmanager,
1516
)
1617
from datetime import (
@@ -1651,10 +1652,18 @@ def run_transaction(self):
16511652

16521653
def execute(self, sql: str | Select | TextClause, params=None):
16531654
"""Simple passthrough to SQLAlchemy connectable"""
1655+
from sqlalchemy.exc import DBAPIError as SQLAlchemyDatabaseError
1656+
16541657
args = [] if params is None else [params]
16551658
if isinstance(sql, str):
1656-
return self.con.exec_driver_sql(sql, *args)
1657-
return self.con.execute(sql, *args)
1659+
execute_function = self.con.exec_driver_sql
1660+
else:
1661+
execute_function = self.con.execute
1662+
1663+
try:
1664+
return execute_function(sql, *args)
1665+
except SQLAlchemyDatabaseError as exc:
1666+
raise DatabaseError(f"Execution failed on sql '{sql}': {exc}") from exc
16581667

16591668
def read_table(
16601669
self,
@@ -2108,17 +2117,19 @@ def run_transaction(self):
21082117
self.con.commit()
21092118

21102119
def execute(self, sql: str | Select | TextClause, params=None):
2120+
from adbc_driver_manager import DatabaseError as ADBCDatabaseError
2121+
21112122
if not isinstance(sql, str):
21122123
raise TypeError("Query must be a string unless using sqlalchemy.")
21132124
args = [] if params is None else [params]
21142125
cur = self.con.cursor()
21152126
try:
21162127
cur.execute(sql, *args)
21172128
return cur
2118-
except Exception as exc:
2129+
except ADBCDatabaseError as exc:
21192130
try:
21202131
self.con.rollback()
2121-
except Exception as inner_exc: # pragma: no cover
2132+
except ADBCDatabaseError as inner_exc: # pragma: no cover
21222133
ex = DatabaseError(
21232134
f"Execution failed on sql: {sql}\n{exc}\nunable to rollback"
21242135
)
@@ -2207,8 +2218,7 @@ def read_table(
22072218
else:
22082219
stmt = f"SELECT {select_list} FROM {table_name}"
22092220

2210-
with self.con.cursor() as cur:
2211-
cur.execute(stmt)
2221+
with closing(self.execute(stmt)) as cur:
22122222
pa_table = cur.fetch_arrow_table()
22132223
df = arrow_table_to_pandas(pa_table, dtype_backend=dtype_backend)
22142224

@@ -2278,8 +2288,7 @@ def read_query(
22782288
if chunksize:
22792289
raise NotImplementedError("'chunksize' is not implemented for ADBC drivers")
22802290

2281-
with self.con.cursor() as cur:
2282-
cur.execute(sql)
2291+
with closing(self.execute(sql)) as cur:
22832292
pa_table = cur.fetch_arrow_table()
22842293
df = arrow_table_to_pandas(pa_table, dtype_backend=dtype_backend)
22852294

@@ -2335,6 +2344,9 @@ def to_sql(
23352344
engine : {'auto', 'sqlalchemy'}, default 'auto'
23362345
Raises NotImplementedError if not set to 'auto'
23372346
"""
2347+
from adbc_driver_manager import DatabaseError as ADBCDatabaseError
2348+
import pyarrow as pa
2349+
23382350
if index_label:
23392351
raise NotImplementedError(
23402352
"'index_label' is not implemented for ADBC drivers"
@@ -2364,22 +2376,23 @@ def to_sql(
23642376
if if_exists == "fail":
23652377
raise ValueError(f"Table '{table_name}' already exists.")
23662378
elif if_exists == "replace":
2367-
with self.con.cursor() as cur:
2368-
cur.execute(f"DROP TABLE {table_name}")
2379+
sql_statement = f"DROP TABLE {table_name}"
2380+
self.execute(sql_statement).close()
23692381
elif if_exists == "append":
23702382
mode = "append"
23712383

2372-
import pyarrow as pa
2373-
23742384
try:
23752385
tbl = pa.Table.from_pandas(frame, preserve_index=index)
23762386
except pa.ArrowNotImplementedError as exc:
23772387
raise ValueError("datatypes not supported") from exc
23782388

23792389
with self.con.cursor() as cur:
2380-
total_inserted = cur.adbc_ingest(
2381-
table_name=name, data=tbl, mode=mode, db_schema_name=schema
2382-
)
2390+
try:
2391+
total_inserted = cur.adbc_ingest(
2392+
table_name=name, data=tbl, mode=mode, db_schema_name=schema
2393+
)
2394+
except ADBCDatabaseError as exc:
2395+
raise DatabaseError("Execution failed") from exc
23832396

23842397
self.con.commit()
23852398
return total_inserted
@@ -2496,9 +2509,9 @@ def sql_schema(self) -> str:
24962509
return str(";\n".join(self.table))
24972510

24982511
def _execute_create(self) -> None:
2499-
with self.pd_sql.run_transaction() as conn:
2512+
with self.pd_sql.run_transaction():
25002513
for stmt in self.table:
2501-
conn.execute(stmt)
2514+
self.pd_sql.execute(stmt).close()
25022515

25032516
def insert_statement(self, *, num_rows: int) -> str:
25042517
names = list(map(str, self.frame.columns))
@@ -2520,8 +2533,13 @@ def insert_statement(self, *, num_rows: int) -> str:
25202533
return insert_statement
25212534

25222535
def _execute_insert(self, conn, keys, data_iter) -> int:
2536+
from sqlite3 import DatabaseError as SQLiteDatabaseError
2537+
25232538
data_list = list(data_iter)
2524-
conn.executemany(self.insert_statement(num_rows=1), data_list)
2539+
try:
2540+
conn.executemany(self.insert_statement(num_rows=1), data_list)
2541+
except SQLiteDatabaseError as exc:
2542+
raise DatabaseError("Execution failed") from exc
25252543
return conn.rowcount
25262544

25272545
def _execute_insert_multi(self, conn, keys, data_iter) -> int:
@@ -2643,17 +2661,19 @@ def run_transaction(self):
26432661
cur.close()
26442662

26452663
def execute(self, sql: str | Select | TextClause, params=None):
2664+
from sqlite3 import DatabaseError as SQLiteDatabaseError
2665+
26462666
if not isinstance(sql, str):
26472667
raise TypeError("Query must be a string unless using sqlalchemy.")
26482668
args = [] if params is None else [params]
26492669
cur = self.con.cursor()
26502670
try:
26512671
cur.execute(sql, *args)
26522672
return cur
2653-
except Exception as exc:
2673+
except SQLiteDatabaseError as exc:
26542674
try:
26552675
self.con.rollback()
2656-
except Exception as inner_exc: # pragma: no cover
2676+
except SQLiteDatabaseError as inner_exc: # pragma: no cover
26572677
ex = DatabaseError(
26582678
f"Execution failed on sql: {sql}\n{exc}\nunable to rollback"
26592679
)

pandas/tests/io/test_sql.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ def mysql_pymysql_engine():
605605
sqlalchemy = pytest.importorskip("sqlalchemy")
606606
pymysql = pytest.importorskip("pymysql")
607607
engine = sqlalchemy.create_engine(
608-
"mysql+pymysql://root@localhost:3306/pandas",
608+
"mysql+pymysql://root@mysql:3306/pandas",
609609
connect_args={"client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS},
610610
poolclass=sqlalchemy.pool.NullPool,
611611
)
@@ -653,7 +653,7 @@ def postgresql_psycopg2_engine():
653653
sqlalchemy = pytest.importorskip("sqlalchemy")
654654
pytest.importorskip("psycopg2")
655655
engine = sqlalchemy.create_engine(
656-
"postgresql+psycopg2://postgres:postgres@localhost:5432/pandas",
656+
"postgresql+psycopg2://postgres:postgres@postgres:5432/pandas",
657657
poolclass=sqlalchemy.pool.NullPool,
658658
)
659659
yield engine
@@ -689,7 +689,7 @@ def postgresql_adbc_conn():
689689
pytest.importorskip("adbc_driver_postgresql")
690690
from adbc_driver_postgresql import dbapi
691691

692-
uri = "postgresql://postgres:postgres@localhost:5432/pandas"
692+
uri = "postgresql://postgres:postgres@postgres:5432/pandas"
693693
with dbapi.connect(uri) as conn:
694694
yield conn
695695
for view in get_all_views(conn):
@@ -2593,7 +2593,7 @@ def test_sql_open_close(test_frame3):
25932593

25942594
@td.skip_if_installed("sqlalchemy")
25952595
def test_con_string_import_error():
2596-
conn = "mysql://root@localhost/pandas"
2596+
conn = "mysql://root@mysql/pandas"
25972597
msg = "Using URI string without sqlalchemy installed"
25982598
with pytest.raises(ImportError, match=msg):
25992599
sql.read_sql("SELECT * FROM iris", conn)
@@ -3409,8 +3409,8 @@ def test_to_sql_with_negative_npinf(conn, request, input):
34093409
mark = pytest.mark.xfail(reason="GH 36465")
34103410
request.applymarker(mark)
34113411

3412-
msg = "inf cannot be used with MySQL"
3413-
with pytest.raises(ValueError, match=msg):
3412+
msg = "Execution failed on sql"
3413+
with pytest.raises(pd.errors.DatabaseError, match=msg):
34143414
df.to_sql(name="foobar", con=conn, index=False)
34153415
else:
34163416
assert df.to_sql(name="foobar", con=conn, index=False) == 1

0 commit comments

Comments
 (0)