@@ -898,21 +898,30 @@ def sqlite_buildin_types(sqlite_buildin, types_data):
898898@pytest .fixture (scope = "session" )
899899def worker_name (request ):
900900 """
901- Creates a unique schema name for Postgres to use , in order to
901+ Returns a unique name per worker , in order to
902902 isolate tests for parallelization.
903- :return: Name to use for creating an isolated schema
903+ :return: Name to use for creating/accessing an isolated SQL database
904904 :rtype: str
905905 """
906906 return xdist .get_xdist_worker_id (request )
907907
908908
909909@pytest .fixture (scope = "session" )
910910def create_engines ():
911- # Indirectly import dependencies. To avoid being picked up by depdency scanning software.
911+ """
912+ Fixture factory. Returns a list of lambda functions.
913+ :return: create_engine_commands, a list of lambda functions that build an SQLAlchemy engine
914+ :rtype: list[function, function]
915+
916+ :mockup:
917+ create_engine_commands = [
918+ MySQL,
919+ Postgres,
920+ ]
921+ """
922+ # Indirectly import dependencies. To avoid being picked up by dependency scanning software.
912923 sqlalchemy = pytest .importorskip ("sqlalchemy" )
913924 pymysql = pytest .importorskip ("pymysql" )
914-
915- # Round robin creation of DB connections.
916925 create_engine_commands = [
917926 lambda : sqlalchemy .create_engine ("mysql+pymysql://root@localhost:3306/pandas" , connect_args = {"client_flag" : pymysql .constants .CLIENT .MULTI_STATEMENTS }, poolclass = sqlalchemy .pool .NullPool ),
918927 lambda : sqlalchemy .create_engine ("postgresql+psycopg2://postgres:postgres@localhost:5432/pandas" , poolclass = sqlalchemy .pool .NullPool , isolation_level = "AUTOCOMMIT" )
@@ -921,12 +930,78 @@ def create_engines():
921930
922931
923932@pytest .fixture (scope = "session" )
924- def round_robin_ordering (worker_number ):
925- round_robin_order = [(worker_number + i )% len (create_engine_commands ) for i in range (len (create_engine_commands ))]
933+ def build_db_string (worker_name ):
934+ """
935+ Returns a list of queries used per SQL offering (Postgres, MySQL) to create per-worker DBs.
936+ :return: build_db_string_query
937+ :rtype: list[str, str]
938+
939+
940+ :mockup:
941+ build_db_string_query = [
942+ MySQL,
943+ Postgres,
944+ ]
945+ """
946+ build_db_string_query = [
947+ f"""CREATE DATABASE IF NOT EXISTS pandas{ worker_name } """ ,
948+ f"""CREATE DATABASE pandas{ worker_name } """ ,
949+ ]
950+ return build_db_string_query
951+
952+
953+ @pytest .fixture (scope = "session" )
954+ def teardown_db_string (worker_name ):
955+ """
956+ Returns a list of queries used per SQL offering (Postgres, MySQL) to teardown per-worker DBs.
957+ :return: teardown_db_string_query
958+ :rtype: list[str, str]
959+
960+
961+ :mockup:
962+ teardown_db_string_query = [
963+ MySQL,
964+ Postgres,
965+ ]
966+ """
967+ teardown_db_string_query = [
968+ f"""DROP DATABASE pandas{ worker_name } """ ,
969+ f"""DROP DATABASE pandas{ worker_name } """ ,
970+ ]
971+ return teardown_db_string_query
972+
973+
974+ @pytest .fixture (scope = "session" )
975+ def number_of_connections (create_engines , build_db_string , teardown_db_string ):
976+ """
977+ Asserts that there's parity between the number of strings and functions needed to create DBs.
978+ Used for round-robin scheduling of DB initialization and teardown.
979+ :return: len(build_db_string)
980+ :rtype: int
981+ """
982+ assert len (create_engines ) == len (build_db_string ) == len (teardown_db_string )
983+ return len (build_db_string )
984+
985+
986+ @pytest .fixture (scope = "session" )
987+ def round_robin_order (worker_number , number_of_connections ):
988+ """
989+ Round-robin ordering of threads to initialize their own DB, equalizing connectivitiy burden between each SQL engine.
990+ :return: rr_order, a modular ring, e.g. with 2 DBs, w1 gets [1,0], w2 gets [0,1], w3 gets [1,0], etc.
991+ :rtype: list[int]*number_of_connections
992+ """
993+ rr_order = [(worker_number + i ) % number_of_connections for i in range (number_of_connections )]
994+ return rr_order
926995
927996
928997@pytest .fixture (scope = "session" )
929998def worker_number (worker_name ):
999+ """
1000+ Casts worker_name to an integer, making sure that with only one thread, or without xdist, DB connections are
1001+ still made correctly.
1002+ :return: worker_number, integer portion of worker_name, `1` if master.
1003+ :rtype: int
1004+ """
9301005 if worker_name == 'master' :
9311006 worker_number = 1
9321007 else :
@@ -935,87 +1010,48 @@ def worker_number(worker_name):
9351010
9361011
9371012@pytest .fixture (scope = "session" )
938- def create_db_string ():
939- return [
940- f"""CREATE DATABASE IF NOT EXISTS pandas{ worker_name } """ ,
941- f"""CREATE DATABASE pandas{ worker_name } """
942- ]
1013+ def orphan_db_wrapper (request ):
1014+ def take_care_of_orphan_dbs (create_engines , round_robin_order , teardown_db_string ):
1015+ """
1016+ Gets each thread's round-robin order, and connects to the appropriate SQL engine with the
1017+ appropriate teardown query string.
1018+ :return: None
1019+ """
1020+ sqlalchemy = pytest .importorskip ("sqlalchemy" )
1021+ for rr_order in round_robin_order :
1022+ engine = create_engines [rr_order ]()
1023+ with engine .connect () as conn :
1024+ conn .execute (sqlalchemy .text (teardown_db_string [rr_order ]))
1025+ engine .dispose ()
1026+ request .add_finalizer (take_care_of_orphan_dbs )
1027+
9431028
9441029
9451030@pytest .fixture (scope = "session" )
946- def execute_db_command ():
947- for i in range (len (create_engine_commands )):
948- engine = create_engines ()[round_robin_order ()[i ]]()
949- connection = engine .connect ()
950- connection .execute (sqlalchemy .text (create_db_string ()))
1031+ def build_and_teardown_dbs (create_engines , round_robin_order , build_db_string , teardown_db_string ):
1032+ """
1033+ Gets each thread's round-robin order, and connects to the appropriate SQL engine with the
1034+ appropriate build db query string.
1035+ :return: None
1036+ """
1037+ sqlalchemy = pytest .importorskip ("sqlalchemy" )
1038+ for rr_order in round_robin_order :
1039+ engine = create_engines [rr_order ]()
1040+ with engine .connect () as conn :
1041+ conn .execute (sqlalchemy .text (build_db_string [rr_order ]))
1042+ engine .dispose ()
1043+ yield
1044+ # Teardown DBs
1045+ for rr_order in round_robin_order :
1046+ engine = create_engines [rr_order ]()
1047+ with engine .connect () as conn :
1048+ conn .execute (sqlalchemy .text (teardown_db_string [rr_order ]))
1049+ engine .dispose ()
9511050
9521051
9531052@pytest .fixture (scope = "session" , autouse = True )
954- def prepare_db_setup (request , worker_name ):
955- worker_number = worker_number
956- create_engine_commands = create_engines ()
957- create_db_command = create_db_string ()
958- assert len (create_engine_commands ) == len (create_db_command )
959-
960- round_robin_order = round_robin_ordering ()
961-
962- for i in range (len (create_engine_commands )):
963- engine = create_engine_commands [round_robin_order [i ]]()
964- connection = engine .connect ()
965- connection .execute (sqlalchemy .text (create_db_string [round_robin_order [i ]]))
966- engine .dispose ()
967- yield
968- teardown_db_string = [
969- f"""DROP DATABASE IF EXISTS pandas{ worker_name } """ ,
970- f"""DROP DATABASE IF EXISTS pandas{ worker_name } """
971- ]
972-
973- for i in range (len (create_engine_commands )):
974- engine = create_engine_commands [round_robin_order [i ]]()
975- connection = engine .connect ()
976- connection .execute (sqlalchemy .text (teardown_db_string [round_robin_order [i ]]))
977- engine .dispose ()
978-
979-
980-
981-
982- # @pytest.fixture(scope="session")
983- # def parallelize_mysql():
984- # sqlalchemy = pytest.importorskip("sqlalchemy")
985- # pymysql = pytest.importorskip("pymysql")
986- #
987- # engine = sqlalchemy.create_engine(
988- # connection_string,
989- # connect_args={"client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS},
990- # poolclass=sqlalchemy.pool.NullPool,
991- # )
992- # with engine.connect() as connection:
993- # connection.execute(sqlalchemy.text(
994- # f"""
995- # CREATE DATABASE IF NOT EXISTS pandas{worker_name};
996- # """
997- # ))
998- # # connection.commit()
999- # # connection.close()
1000- # yield
1001- # engine.dispose()
1002- #
1003- # pass
1004-
1005-
1006-
1007-
1008- # @pytest.fixture(scope="session", autouse=True)
1009- # def set_up_dbs(parallelize_mysql_dbs, request):
1010- # if hasattr(request.config, "workerinput"):
1011- # # The tests are multi-threaded
1012- # worker_name = xdist.get_xdist_worker_id(request)
1013- # worker_count = request.config.workerinput["workercount"]
1014- # print(worker_name, worker_count)
1015- # parallelize_mysql_dbs(request, worker_name, worker_count)
1016- # else:
1017- # quit(1)
1018- # parallelize_mysql_dbs
1053+ def execution_point (build_and_teardown_dbs ):
1054+ yield
10191055
10201056
10211057
@@ -1104,11 +1140,8 @@ def prepare_db_setup(request, worker_name):
11041140 sqlalchemy_connectable_types + ["sqlite_buildin_types" ] + adbc_connectable_types
11051141)
11061142
1107- #TODO fix
1108- @pytest .mark .parametrize ("conn" , [
1109- #pytest.param("mysql_pymysql_engine", marks=pytest.mark.db),
1110- pytest .param ("mysql_pymysql_conn" , marks = pytest .mark .db ),
1111- ])
1143+
1144+ @pytest .mark .parametrize ("conn" , all_connectable )
11121145def test_dataframe_to_sql (conn , test_frame1 , request ):
11131146 # GH 51086 if conn is sqlite_engine
11141147 conn = request .getfixturevalue (conn )
0 commit comments