diff --git a/doc/source/whatsnew/v2.3.0.rst b/doc/source/whatsnew/v2.3.0.rst index 8bdddb5b7f85d..e8630fd63b02b 100644 --- a/doc/source/whatsnew/v2.3.0.rst +++ b/doc/source/whatsnew/v2.3.0.rst @@ -39,6 +39,10 @@ Other enhancements - :meth:`~Series.to_hdf` and :meth:`~DataFrame.to_hdf` now round-trip with ``StringDtype`` (:issue:`60663`) - The :meth:`~Series.cumsum`, :meth:`~Series.cummin`, and :meth:`~Series.cummax` reductions are now implemented for ``StringDtype`` columns when backed by PyArrow (:issue:`60633`) - The :meth:`~Series.sum` reduction is now implemented for ``StringDtype`` columns (:issue:`59853`) +- The ``to_sql`` method now supports a new parameter ``temporary: bool = False``. + Setting this parameter to ``True`` enables creating, appending or replacing temporary + tables via ``to_sql`` using ``sqlalchemy``, ``adbc`` and ``sqlite`` connectors. + (:issue:`60422`) .. --------------------------------------------------------------------------- .. _whatsnew_230.notable_bug_fixes: diff --git a/pandas/core/generic.py b/pandas/core/generic.py index f376518d4d3b8..6befc7b2fe524 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -2788,6 +2788,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, + temporary: bool = False, ) -> int | None: """ Write records stored in a DataFrame to a SQL database. @@ -2844,6 +2845,8 @@ def to_sql( Details and a sample callable implementation can be found in the section :ref:`insert method `. + temporary : bool, default False + Indicates if the created, replaced or appended table is temporary. Returns ------- @@ -3017,6 +3020,7 @@ def to_sql( chunksize=chunksize, dtype=dtype, method=method, + temporary=temporary, ) @final diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 5652d7fab0c7c..c778f7997b653 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -744,6 +744,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, + temporary: bool = False, engine: str = "auto", **engine_kwargs, ) -> int | None: @@ -791,6 +792,8 @@ def to_sql( Details and a sample callable implementation can be found in the section :ref:`insert method `. + temporary : bool, default False + Indicates if the created, replaced or appended table is temporary. engine : {'auto', 'sqlalchemy'}, default 'auto' SQL engine library to use. If 'auto', then the option ``io.sql.engine`` is used. The default ``io.sql.engine`` @@ -839,6 +842,7 @@ def to_sql( chunksize=chunksize, dtype=dtype, method=method, + temporary=temporary, engine=engine, **engine_kwargs, ) @@ -932,6 +936,7 @@ def __init__( schema=None, keys=None, dtype: DtypeArg | None = None, + temporary: bool = False, ) -> None: self.name = name self.pd_sql = pandas_sql_engine @@ -942,6 +947,7 @@ def __init__( self.if_exists = if_exists self.keys = keys self.dtype = dtype + self.temporary = temporary if frame is not None: # We want to initialize based on a dataframe @@ -956,8 +962,41 @@ def __init__( if not len(self.name): raise ValueError("Empty table name specified") + def _drop_temporary_table(self): + """Drop a temporary table. Temporary tables are not in a database's meta data + and need to be dropped hard coded.""" + if self.schema is None: + query = f"DROP TABLE {self.name}" + else: + query = f"DROP TABLE {self.schema}.{self.name}" + self.pd_sql.execute(query) + + def _exists_temporary(self): + """Check if a temporary table exists. Temporary tables are not in a database's + meta data. The existence is duck tested by a SELECT statement.""" + from sqlalchemy import text + from sqlalchemy.exc import DatabaseError + + if self.schema is None: + query = f"SELECT * FROM {self.name} LIMIT 1" + else: + query = f"SELECT * FROM {self.schema}.{self.name} LIMIT 1" + try: + _ = self.pd_sql.con.execute(text(query)) + return True + except DatabaseError: + # Some DBMS (e.g. postgres) require a rollback after a caught exception + try: + self.pd_sql.con.execute(text("rollback")) + return False + except DatabaseError: + return False + def exists(self): - return self.pd_sql.has_table(self.name, self.schema) + if self.temporary: + return self._exists_temporary() + else: + return self.pd_sql.has_table(self.name, self.schema) def sql_schema(self) -> str: from sqlalchemy.schema import CreateTable @@ -966,7 +1005,9 @@ def sql_schema(self) -> str: def _execute_create(self) -> None: # Inserting table into database, add to MetaData object - self.table = self.table.to_metadata(self.pd_sql.meta) + if not self.temporary: + # only insert into meta data, if table is not temporary + self.table = self.table.to_metadata(self.pd_sql.meta) with self.pd_sql.run_transaction(): self.table.create(bind=self.pd_sql.con) @@ -975,7 +1016,10 @@ def create(self) -> None: if self.if_exists == "fail": raise ValueError(f"Table '{self.name}' already exists.") if self.if_exists == "replace": - self.pd_sql.drop_table(self.name, self.schema) + if self.temporary: + self._drop_temporary_table() + else: + self.pd_sql.drop_table(self.name, self.schema) self._execute_create() elif self.if_exists == "append": pass @@ -1266,10 +1310,16 @@ def _create_table_setup(self): schema = self.schema or self.pd_sql.meta.schema + # check if table is temporary + if self.temporary: + prefixes = ["TEMPORARY"] + else: + prefixes = None + # At this point, attach to new metadata, only attach to self.meta # once table is created. meta = MetaData() - return Table(self.name, meta, *columns, schema=schema) + return Table(self.name, meta, *columns, schema=schema, prefixes=prefixes) def _harmonize_columns( self, @@ -1487,6 +1537,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, + temporary: bool = False, engine: str = "auto", **engine_kwargs, ) -> int | None: @@ -1871,6 +1922,7 @@ def prep_table( index_label=None, schema=None, dtype: DtypeArg | None = None, + temporary: bool = False, ) -> SQLTable: """ Prepares table in the database for data insertion. Creates it if needed, etc. @@ -1906,6 +1958,7 @@ def prep_table( index_label=index_label, schema=schema, dtype=dtype, + temporary=temporary, ) table.create() return table @@ -1950,6 +2003,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, + temporary: bool = False, engine: str = "auto", **engine_kwargs, ) -> int | None: @@ -1991,6 +2045,8 @@ def to_sql( Details and a sample callable implementation can be found in the section :ref:`insert method `. + temporary : bool, default False + Indicates if the created, replaced or appended table is temporary. engine : {'auto', 'sqlalchemy'}, default 'auto' SQL engine library to use. If 'auto', then the option ``io.sql.engine`` is used. The default ``io.sql.engine`` @@ -2011,6 +2067,7 @@ def to_sql( index_label=index_label, schema=schema, dtype=dtype, + temporary=temporary, ) total_inserted = sql_engine.insert_records( @@ -2025,7 +2082,9 @@ def to_sql( **engine_kwargs, ) - self.check_case_sensitive(name=name, schema=schema) + # only check case sensitivity for non temporary tables + if not table.temporary: + self.check_case_sensitive(name=name, schema=schema) return total_inserted @property @@ -2303,6 +2362,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, + temporary: bool = False, engine: str = "auto", **engine_kwargs, ) -> int | None: @@ -2332,6 +2392,8 @@ def to_sql( Raises NotImplementedError method : {None', 'multi', callable}, default None Raises NotImplementedError + temporary : bool, default False + Indicates if the created, replaced or appended table is temporary. engine : {'auto', 'sqlalchemy'}, default 'auto' Raises NotImplementedError if not set to 'auto' """ @@ -2360,7 +2422,14 @@ def to_sql( # as applicable modes, so the semantics get blurred across # the libraries mode = "create" - if self.has_table(name, schema): + + # for temporary tables use duck testing for existence check + if temporary: + exists = self._has_table_temporary(name, schema) + else: + exists = self.has_table(name, schema) + + if exists: if if_exists == "fail": raise ValueError(f"Table '{table_name}' already exists.") elif if_exists == "replace": @@ -2378,12 +2447,41 @@ def to_sql( with self.con.cursor() as cur: total_inserted = cur.adbc_ingest( - table_name=name, data=tbl, mode=mode, db_schema_name=schema + table_name=name, + data=tbl, + mode=mode, + db_schema_name=schema, + temporary=temporary, ) self.con.commit() return total_inserted + def _has_table_temporary(self, name: str, schema: str | None = None) -> bool: + """Check if a temporary table exists. Temporary tables are not in a database's + meta data. The existence is duck tested by a SELECT statement.""" + from adbc_driver_manager import ProgrammingError + + # sqlite doesn't allow a rollback at this point + rollback = ( + True if not self.con.adbc_get_info()["vendor_name"] == "SQLite" else False + ) + + if schema is None: + query = f"SELECT * FROM {name} LIMIT 1" + else: + query = f"SELECT * FROM {schema}.{name} LIMIT 1" + try: + with self.con.cursor() as cur: + cur.execute(query) + return True + except ProgrammingError: + if rollback: + # Some DBMS (e.g. postgres) require a rollback after a caught exception + with self.con.cursor() as cur: + cur.execute("rollback") + return False + def has_table(self, name: str, schema: str | None = None) -> bool: meta = self.con.adbc_get_objects( db_schema_filter=schema, table_name_filter=name @@ -2758,6 +2856,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, + temporary: bool = False, engine: str = "auto", **engine_kwargs, ) -> int | None: @@ -2798,6 +2897,8 @@ def to_sql( Details and a sample callable implementation can be found in the section :ref:`insert method `. + temporary : bool, default False + Indicates if the created, replaced or appended table is temporary. """ if dtype: if not is_dict_like(dtype): @@ -2823,6 +2924,7 @@ def to_sql( if_exists=if_exists, index_label=index_label, dtype=dtype, + temporary=temporary, ) table.create() return table.insert(chunksize, method) diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index 7e1220ecee218..df9fac70bcfa1 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -39,6 +39,7 @@ to_timedelta, ) import pandas._testing as tm +from pandas.testing import assert_frame_equal from pandas.util.version import Version from pandas.io import sql @@ -617,6 +618,22 @@ def mysql_pymysql_engine(): engine.dispose() +@pytest.fixture +def mysql_pymysql_engine_default_pool(): + sqlalchemy = pytest.importorskip("sqlalchemy") + pymysql = pytest.importorskip("pymysql") + engine = sqlalchemy.create_engine( + "mysql+pymysql://root@localhost:3306/pandas", + connect_args={"client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS}, + ) + yield engine + for view in get_all_views(engine): + drop_view(view, engine) + for tbl in get_all_tables(engine): + drop_table(tbl, engine) + engine.dispose() + + @pytest.fixture def mysql_pymysql_engine_iris(mysql_pymysql_engine, iris_path): create_and_load_iris(mysql_pymysql_engine, iris_path) @@ -636,6 +653,12 @@ def mysql_pymysql_conn(mysql_pymysql_engine): yield conn +@pytest.fixture +def mysql_pymysql_conn_default_pool(mysql_pymysql_engine_default_pool): + with mysql_pymysql_engine_default_pool.connect() as conn: + yield conn + + @pytest.fixture def mysql_pymysql_conn_iris(mysql_pymysql_engine_iris): with mysql_pymysql_engine_iris.connect() as conn: @@ -664,6 +687,21 @@ def postgresql_psycopg2_engine(): engine.dispose() +@pytest.fixture +def postgresql_psycopg2_engine_default_pool(): + sqlalchemy = pytest.importorskip("sqlalchemy") + pytest.importorskip("psycopg2") + engine = sqlalchemy.create_engine( + "postgresql+psycopg2://postgres:postgres@localhost:5432/pandas", + ) + yield engine + for view in get_all_views(engine): + drop_view(view, engine) + for tbl in get_all_tables(engine): + drop_table(tbl, engine) + engine.dispose() + + @pytest.fixture def postgresql_psycopg2_engine_iris(postgresql_psycopg2_engine, iris_path): create_and_load_iris(postgresql_psycopg2_engine, iris_path) @@ -683,6 +721,12 @@ def postgresql_psycopg2_conn(postgresql_psycopg2_engine): yield conn +@pytest.fixture +def postgresql_psycopg2_conn_default_pool(postgresql_psycopg2_engine_default_pool): + with postgresql_psycopg2_engine_default_pool.connect() as conn: + yield conn + + @pytest.fixture def postgresql_adbc_conn(): pytest.importorskip("pyarrow") @@ -766,12 +810,30 @@ def sqlite_engine(sqlite_str): engine.dispose() +@pytest.fixture +def sqlite_engine_default_pool(sqlite_str): + sqlalchemy = pytest.importorskip("sqlalchemy") + engine = sqlalchemy.create_engine(sqlite_str) + yield engine + for view in get_all_views(engine): + drop_view(view, engine) + for tbl in get_all_tables(engine): + drop_table(tbl, engine) + engine.dispose() + + @pytest.fixture def sqlite_conn(sqlite_engine): with sqlite_engine.connect() as conn: yield conn +@pytest.fixture +def sqlite_conn_default_pool(sqlite_engine_default_pool): + with sqlite_engine_default_pool.connect() as conn: + yield conn + + @pytest.fixture def sqlite_str_iris(sqlite_str, iris_path): sqlalchemy = pytest.importorskip("sqlalchemy") @@ -899,6 +961,11 @@ def sqlite_buildin_types(sqlite_buildin, types_data): pytest.param("mysql_pymysql_conn", marks=pytest.mark.db), ] +mysql_connectable_default_pool = [ + pytest.param("mysql_pymysql_engine_default_pool", marks=pytest.mark.db), + pytest.param("mysql_pymysql_conn_default_pool", marks=pytest.mark.db), +] + mysql_connectable_iris = [ pytest.param("mysql_pymysql_engine_iris", marks=pytest.mark.db), pytest.param("mysql_pymysql_conn_iris", marks=pytest.mark.db), @@ -914,6 +981,11 @@ def sqlite_buildin_types(sqlite_buildin, types_data): pytest.param("postgresql_psycopg2_conn", marks=pytest.mark.db), ] +postgresql_connectable_default_pool = [ + pytest.param("postgresql_psycopg2_engine_default_pool", marks=pytest.mark.db), + pytest.param("postgresql_psycopg2_conn_default_pool", marks=pytest.mark.db), +] + postgresql_connectable_iris = [ pytest.param("postgresql_psycopg2_engine_iris", marks=pytest.mark.db), pytest.param("postgresql_psycopg2_conn_iris", marks=pytest.mark.db), @@ -930,6 +1002,11 @@ def sqlite_buildin_types(sqlite_buildin, types_data): "sqlite_str", ] +sqlite_connectable_default_pool = [ + "sqlite_engine_default_pool", + "sqlite_conn_default_pool", +] + sqlite_connectable_iris = [ "sqlite_engine_iris", "sqlite_conn_iris", @@ -944,6 +1021,12 @@ def sqlite_buildin_types(sqlite_buildin, types_data): sqlalchemy_connectable = mysql_connectable + postgresql_connectable + sqlite_connectable +sqlalchemy_connectable_default_pool = ( + mysql_connectable_default_pool + + postgresql_connectable_default_pool + + sqlite_connectable_default_pool +) + sqlalchemy_connectable_iris = ( mysql_connectable_iris + postgresql_connectable_iris + sqlite_connectable_iris ) @@ -978,6 +1061,8 @@ def sqlite_buildin_types(sqlite_buildin, types_data): sqlalchemy_connectable_types + ["sqlite_buildin_types"] + adbc_connectable_types ) +temporary_connectable = sqlalchemy_connectable_default_pool + adbc_connectable + @pytest.mark.parametrize("conn", all_connectable) def test_dataframe_to_sql(conn, test_frame1, request): @@ -4358,3 +4443,98 @@ def test_xsqlite_if_exists(sqlite_buildin): (5, "E"), ] drop_table(table_name, sqlite_buildin) + + +@pytest.mark.parametrize("conn", sqlalchemy_connectable_default_pool) +def test_exists_temporary_table(conn, test_frame1, request): + conn = request.getfixturevalue(conn) + + pandas_sql = pandasSQL_builder(conn, schema=None, need_transaction=True) + table = sql.SQLTable( + name="test_frame1", + pandas_sql_engine=pandas_sql, + frame=test_frame1, + index=False, + if_exists="fail", + temporary=True, + ) + + table.create() + + assert True if table.exists() else False + + +@pytest.mark.parametrize("conn", temporary_connectable) +def test_to_sql_temporary_table_replace(conn, test_frame1, request): + conn = request.getfixturevalue(conn) + + test_frame1.to_sql( + name="test_frame1", + con=conn, + if_exists="fail", + index=False, + temporary=True, + ) + + test_frame1.to_sql( + name="test_frame1", + con=conn, + if_exists="replace", + index=False, + temporary=True, + ) + + df_test = pd.read_sql("SELECT * FROM test_frame1", conn) + + assert_frame_equal(test_frame1, df_test) + + +@pytest.mark.parametrize("conn", temporary_connectable) +def test_to_sql_temporary_table_fail(conn, test_frame1, request): + conn = request.getfixturevalue(conn) + + test_frame1.to_sql( + name="test_frame1", + con=conn, + if_exists="fail", + index=False, + temporary=True, + ) + + with pytest.raises(ValueError, match=r"Table 'test_frame1' already exists."): + test_frame1.to_sql( + name="test_frame1", + con=conn, + if_exists="fail", + index=False, + temporary=True, + ) + + +@pytest.mark.parametrize("conn", temporary_connectable) +def test_to_sql_temporary_table_append(conn, test_frame1, request): + conn = request.getfixturevalue(conn) + + test_frame1.to_sql( + name="test_frame1", + con=conn, + if_exists="fail", + index=False, + temporary=True, + ) + + test_frame1.to_sql( + name="test_frame1", + con=conn, + if_exists="append", + index=False, + temporary=True, + ) + + df_test = pd.read_sql("SELECT * FROM test_frame1", conn) + + df_true = concat([test_frame1, test_frame1], axis=0, ignore_index=True).reset_index( + drop=True + ) + + assert_frame_equal(df_true, df_test)