-
-
Notifications
You must be signed in to change notification settings - Fork 19.1k
ENH: Add table prefixes to to_sql method #60409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 36 commits
03c8183
f09fd54
03b8642
ccb9eac
d792f27
2582834
b138532
f696257
33b69d9
0bc6504
8a94c2b
ec32a70
be305ff
35a6394
145d18c
eddf687
3190142
154c208
98a153f
8a0611d
eb6e9f0
522f842
ca0551d
b595021
3c8a12e
1ad55bb
25ecbc3
775a05a
efad6b4
0543666
90d1f1f
4da49dd
b0db943
c52c71a
cd2fff4
d1233ce
cd51d7c
5d306c2
8cafc44
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -744,6 +744,7 @@ def to_sql( | |
chunksize: int | None = None, | ||
dtype: DtypeArg | None = None, | ||
method: Literal["multi"] | Callable | None = None, | ||
temporary: bool = False, | ||
engine: str = "auto", | ||
**engine_kwargs, | ||
) -> int | None: | ||
|
@@ -791,6 +792,8 @@ def to_sql( | |
|
||
Details and a sample callable implementation can be found in the | ||
section :ref:`insert method <io.sql.method>`. | ||
temporary : bool, default False | ||
Indicates if the created, replaced or appended table is temporary. | ||
engine : {'auto', 'sqlalchemy'}, default 'auto' | ||
SQL engine library to use. If 'auto', then the option | ||
``io.sql.engine`` is used. The default ``io.sql.engine`` | ||
|
@@ -839,6 +842,7 @@ def to_sql( | |
chunksize=chunksize, | ||
dtype=dtype, | ||
method=method, | ||
temporary=temporary, | ||
engine=engine, | ||
**engine_kwargs, | ||
) | ||
|
@@ -932,6 +936,7 @@ def __init__( | |
schema=None, | ||
keys=None, | ||
dtype: DtypeArg | None = None, | ||
temporary: bool = False, | ||
) -> None: | ||
self.name = name | ||
self.pd_sql = pandas_sql_engine | ||
|
@@ -942,6 +947,7 @@ def __init__( | |
self.if_exists = if_exists | ||
self.keys = keys | ||
self.dtype = dtype | ||
self.temporary = temporary | ||
|
||
if frame is not None: | ||
# We want to initialize based on a dataframe | ||
|
@@ -956,8 +962,42 @@ def __init__( | |
if not len(self.name): | ||
raise ValueError("Empty table name specified") | ||
|
||
def _drop_temporary_table(self): | ||
"""Drop a temporary table. Temporary tables are not in a database's meta data | ||
and need to be dropped hard coded.""" | ||
if self.schema is None: | ||
query = f"DROP TABLE {self.name}" | ||
else: | ||
query = f"DROP TABLE {self.schema}.{self.name}" | ||
self.pd_sql.execute(query) | ||
|
||
def _exists_temporary(self): | ||
"""Check if a temporary table exists. Temporary tables are not in a database's | ||
meta data. The existence is duck tested by a SELECT statement.""" | ||
from sqlalchemy.exc import ( | ||
OperationalError, | ||
ProgrammingError, | ||
) | ||
|
||
if self.schema is None: | ||
query = f"SELECT * FROM {self.name} LIMIT 1" | ||
else: | ||
query = f"SELECT * FROM {self.schema}.{self.name} LIMIT 1" | ||
try: | ||
_ = self.pd_sql.read_query(query) | ||
return True | ||
except ProgrammingError: | ||
# Some DBMS (e.g. postgres) require a rollback after a caught exception | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The DMBS-specific features are something we want to avoid in pandas, as maintaining compatability on those is not a core specialty of our team. Does sqlalchemy not handle this natively? |
||
self.pd_sql.execute("rollback") | ||
return False | ||
except OperationalError: | ||
return False | ||
|
||
def exists(self): | ||
return self.pd_sql.has_table(self.name, self.schema) | ||
if self.temporary: | ||
return self._exists_temporary() | ||
else: | ||
return self.pd_sql.has_table(self.name, self.schema) | ||
|
||
def sql_schema(self) -> str: | ||
from sqlalchemy.schema import CreateTable | ||
|
@@ -966,7 +1006,9 @@ def sql_schema(self) -> str: | |
|
||
def _execute_create(self) -> None: | ||
# Inserting table into database, add to MetaData object | ||
self.table = self.table.to_metadata(self.pd_sql.meta) | ||
if not self.temporary: | ||
# only insert into meta data, if table is not temporary | ||
self.table = self.table.to_metadata(self.pd_sql.meta) | ||
with self.pd_sql.run_transaction(): | ||
self.table.create(bind=self.pd_sql.con) | ||
|
||
|
@@ -975,7 +1017,10 @@ def create(self) -> None: | |
if self.if_exists == "fail": | ||
raise ValueError(f"Table '{self.name}' already exists.") | ||
if self.if_exists == "replace": | ||
self.pd_sql.drop_table(self.name, self.schema) | ||
if self.temporary: | ||
self._drop_temporary_table() | ||
else: | ||
self.pd_sql.drop_table(self.name, self.schema) | ||
self._execute_create() | ||
elif self.if_exists == "append": | ||
pass | ||
|
@@ -1266,10 +1311,16 @@ def _create_table_setup(self): | |
|
||
schema = self.schema or self.pd_sql.meta.schema | ||
|
||
# check if table is temporary | ||
if self.temporary: | ||
prefixes = ["TEMPORARY"] | ||
else: | ||
prefixes = None | ||
|
||
# At this point, attach to new metadata, only attach to self.meta | ||
# once table is created. | ||
meta = MetaData() | ||
return Table(self.name, meta, *columns, schema=schema) | ||
return Table(self.name, meta, *columns, schema=schema, prefixes=prefixes) | ||
|
||
def _harmonize_columns( | ||
self, | ||
|
@@ -1487,6 +1538,7 @@ def to_sql( | |
chunksize: int | None = None, | ||
dtype: DtypeArg | None = None, | ||
method: Literal["multi"] | Callable | None = None, | ||
temporary: bool = False, | ||
engine: str = "auto", | ||
**engine_kwargs, | ||
) -> int | None: | ||
|
@@ -1871,6 +1923,7 @@ def prep_table( | |
index_label=None, | ||
schema=None, | ||
dtype: DtypeArg | None = None, | ||
temporary: bool = False, | ||
) -> SQLTable: | ||
""" | ||
Prepares table in the database for data insertion. Creates it if needed, etc. | ||
|
@@ -1906,6 +1959,7 @@ def prep_table( | |
index_label=index_label, | ||
schema=schema, | ||
dtype=dtype, | ||
temporary=temporary, | ||
) | ||
table.create() | ||
return table | ||
|
@@ -1950,6 +2004,7 @@ def to_sql( | |
chunksize: int | None = None, | ||
dtype: DtypeArg | None = None, | ||
method: Literal["multi"] | Callable | None = None, | ||
temporary: bool = False, | ||
engine: str = "auto", | ||
**engine_kwargs, | ||
) -> int | None: | ||
|
@@ -1991,6 +2046,8 @@ def to_sql( | |
|
||
Details and a sample callable implementation can be found in the | ||
section :ref:`insert method <io.sql.method>`. | ||
temporary : bool, default False | ||
Indicates if the created, replaced or appended table is temporary. | ||
engine : {'auto', 'sqlalchemy'}, default 'auto' | ||
SQL engine library to use. If 'auto', then the option | ||
``io.sql.engine`` is used. The default ``io.sql.engine`` | ||
|
@@ -2011,6 +2068,7 @@ def to_sql( | |
index_label=index_label, | ||
schema=schema, | ||
dtype=dtype, | ||
temporary=temporary, | ||
) | ||
|
||
total_inserted = sql_engine.insert_records( | ||
|
@@ -2025,7 +2083,9 @@ def to_sql( | |
**engine_kwargs, | ||
) | ||
|
||
self.check_case_sensitive(name=name, schema=schema) | ||
# only check case sensitivity for non temporary tables | ||
if not table.temporary: | ||
self.check_case_sensitive(name=name, schema=schema) | ||
return total_inserted | ||
|
||
@property | ||
|
@@ -2303,6 +2363,7 @@ def to_sql( | |
chunksize: int | None = None, | ||
dtype: DtypeArg | None = None, | ||
method: Literal["multi"] | Callable | None = None, | ||
temporary: bool = False, | ||
engine: str = "auto", | ||
**engine_kwargs, | ||
) -> int | None: | ||
|
@@ -2332,6 +2393,8 @@ def to_sql( | |
Raises NotImplementedError | ||
method : {None', 'multi', callable}, default None | ||
Raises NotImplementedError | ||
temporary : bool, default False | ||
Indicates if the created, replaced or appended table is temporary. | ||
engine : {'auto', 'sqlalchemy'}, default 'auto' | ||
Raises NotImplementedError if not set to 'auto' | ||
""" | ||
|
@@ -2360,7 +2423,14 @@ def to_sql( | |
# as applicable modes, so the semantics get blurred across | ||
# the libraries | ||
mode = "create" | ||
if self.has_table(name, schema): | ||
|
||
# for temporary tables use duck testing for existence check | ||
if temporary: | ||
exists = self._has_table_temporary(name, schema) | ||
else: | ||
exists = self.has_table(name, schema) | ||
|
||
if exists: | ||
if if_exists == "fail": | ||
raise ValueError(f"Table '{table_name}' already exists.") | ||
elif if_exists == "replace": | ||
|
@@ -2378,12 +2448,41 @@ def to_sql( | |
|
||
with self.con.cursor() as cur: | ||
total_inserted = cur.adbc_ingest( | ||
table_name=name, data=tbl, mode=mode, db_schema_name=schema | ||
table_name=name, | ||
data=tbl, | ||
mode=mode, | ||
db_schema_name=schema, | ||
temporary=temporary, | ||
) | ||
|
||
self.con.commit() | ||
return total_inserted | ||
|
||
def _has_table_temporary(self, name: str, schema: str | None = None) -> bool: | ||
"""Check if a temporary table exists. Temporary tables are not in a database's | ||
meta data. The existence is duck tested by a SELECT statement.""" | ||
from adbc_driver_manager import ProgrammingError | ||
|
||
# sqlite doesn't allow a rollback at this point | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar comment as before - we really want to avoid putting DBMS-specific logic into our implementation |
||
rollback = ( | ||
True if not self.con.adbc_get_info()["vendor_name"] == "SQLite" else False | ||
) | ||
|
||
if schema is None: | ||
query = f"SELECT * FROM {name} LIMIT 1" | ||
else: | ||
query = f"SELECT * FROM {schema}.{name} LIMIT 1" | ||
try: | ||
with self.con.cursor() as cur: | ||
cur.execute(query) | ||
return True | ||
except ProgrammingError: | ||
if rollback: | ||
# Some DBMS (e.g. postgres) require a rollback after a caught exception | ||
with self.con.cursor() as cur: | ||
cur.execute("rollback") | ||
return False | ||
|
||
def has_table(self, name: str, schema: str | None = None) -> bool: | ||
meta = self.con.adbc_get_objects( | ||
db_schema_filter=schema, table_name_filter=name | ||
|
@@ -2758,6 +2857,7 @@ def to_sql( | |
chunksize: int | None = None, | ||
dtype: DtypeArg | None = None, | ||
method: Literal["multi"] | Callable | None = None, | ||
temporary: bool = False, | ||
engine: str = "auto", | ||
**engine_kwargs, | ||
) -> int | None: | ||
|
@@ -2798,6 +2898,8 @@ def to_sql( | |
|
||
Details and a sample callable implementation can be found in the | ||
section :ref:`insert method <io.sql.method>`. | ||
temporary : bool, default False | ||
Indicates if the created, replaced or appended table is temporary. | ||
""" | ||
if dtype: | ||
if not is_dict_like(dtype): | ||
|
@@ -2823,6 +2925,7 @@ def to_sql( | |
if_exists=if_exists, | ||
index_label=index_label, | ||
dtype=dtype, | ||
temporary=temporary, | ||
) | ||
table.create() | ||
return table.insert(chunksize, method) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does sqlalchemy not provide a higher level abstraction than ProgrammingError or OperationalError? I'm not sure I understand the distinction between those, and I am not sure if they would catch all possible errors thrown by sqlalchemy either
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The is a higher abstraction possible with
DatabaseError
.