|
3 | 3 | import os |
4 | 4 | from contextlib import asynccontextmanager |
5 | 5 |
|
| 6 | +import psycopg |
6 | 7 | import pytest |
7 | | -import pytest_pgsql |
| 8 | +from pytest_postgresql.janitor import DatabaseJanitor |
8 | 9 |
|
9 | 10 | from tipg.settings import CustomSQLSettings, DatabaseSettings, PostgresSettings |
10 | 11 |
|
|
17 | 18 | TEMPLATE_DIRECTORY = os.path.join(FIXTURES_DIR, "templates") |
18 | 19 | SQL_FUNCTIONS_DIRECTORY = os.path.join(FIXTURES_DIR, "functions") |
19 | 20 |
|
20 | | -test_db = pytest_pgsql.TransactedPostgreSQLTestDB.create_fixture( |
21 | | - "test_db", scope="session", use_restore_state=False |
22 | | -) |
23 | | - |
24 | 21 |
|
25 | 22 | @pytest.fixture(scope="session") |
26 | | -def database_url(test_db): |
27 | | - """ |
28 | | - Session scoped fixture to launch a postgresql database in a separate process. We use psycopg2 to ingest test data |
29 | | - because pytest-asyncio event loop is a function scoped fixture and cannot be called within the current scope. Yields |
30 | | - a database url which we pass to our application through a monkeypatched environment variable. |
31 | | - """ |
32 | | - assert test_db.install_extension("postgis") |
33 | | - |
34 | | - # make sure we have a `public` schema |
35 | | - test_db.create_schema("public", exists_ok=True) |
36 | | - |
37 | | - test_db.run_sql_file(os.path.join(FIXTURES_DIR, "landsat_wrs.sql")) |
38 | | - assert test_db.has_table("public.landsat_wrs") |
39 | | - |
40 | | - test_db.run_sql_file(os.path.join(FIXTURES_DIR, "my_data.sql")) |
41 | | - assert test_db.has_table("public.my_data") |
42 | | - |
43 | | - test_db.run_sql_file(os.path.join(FIXTURES_DIR, "nongeo_data.sql")) |
44 | | - assert test_db.has_table("public.nongeo_data") |
45 | | - |
46 | | - test_db.connection.execute( |
47 | | - "CREATE TABLE public.landsat AS SELECT geom, ST_Centroid(geom) as centroid, ogc_fid, id, pr, path, row from public.landsat_wrs;" |
48 | | - ) |
49 | | - test_db.connection.execute("ALTER TABLE public.landsat ADD PRIMARY KEY (ogc_fid);") |
50 | | - assert test_db.has_table("public.landsat") |
51 | | - |
52 | | - count_landsat = test_db.connection.execute( |
53 | | - "SELECT COUNT(*) FROM public.landsat_wrs" |
54 | | - ).scalar() |
55 | | - count_landsat_centroid = test_db.connection.execute( |
56 | | - "SELECT COUNT(*) FROM public.landsat" |
57 | | - ).scalar() |
58 | | - assert count_landsat == count_landsat_centroid |
59 | | - |
60 | | - # Add table with Huge geometries |
61 | | - test_db.run_sql_file(os.path.join(FIXTURES_DIR, "canada.sql")) |
62 | | - assert test_db.has_table("public.canada") |
63 | | - |
64 | | - # add table with geometries not in WGS84 |
65 | | - test_db.run_sql_file(os.path.join(FIXTURES_DIR, "minnesota.sql")) |
66 | | - assert test_db.has_table("public.minnesota") |
| 23 | +def database(postgresql_proc): |
| 24 | + """Create Database Fixture.""" |
| 25 | + with DatabaseJanitor( |
| 26 | + user=postgresql_proc.user, |
| 27 | + host=postgresql_proc.host, |
| 28 | + port=postgresql_proc.port, |
| 29 | + dbname="test_db", |
| 30 | + version=postgresql_proc.version, |
| 31 | + password="password", |
| 32 | + ) as jan: |
| 33 | + yield jan |
67 | 34 |
|
68 | | - # add a `myschema` schema |
69 | | - test_db.create_schema("myschema") |
70 | | - assert test_db.has_schema("myschema") |
71 | 35 |
|
72 | | - test_db.connection.execute( |
73 | | - "CREATE TABLE myschema.landsat AS SELECT * FROM public.landsat_wrs;" |
74 | | - ) |
75 | | - assert test_db.has_table("myschema.landsat") |
76 | | - count_landsat_schema = test_db.connection.execute( |
77 | | - "SELECT COUNT(*) FROM myschema.landsat" |
78 | | - ).scalar() |
79 | | - assert count_landsat == count_landsat_schema |
| 36 | +def _get_sql(source: str) -> str: |
| 37 | + with open(source, "r") as fd: |
| 38 | + to_run = fd.readlines() |
80 | 39 |
|
81 | | - # add a `userschema` schema |
82 | | - test_db.create_schema("userschema") |
83 | | - assert test_db.has_schema("userschema") |
| 40 | + return "\n".join(to_run) |
84 | 41 |
|
85 | | - test_db.connection.execute( |
86 | | - "CREATE OR REPLACE FUNCTION userschema.test_no_params() RETURNS TABLE(foo integer, location geometry) AS 'SELECT 1, ST_MakePoint(0,0);' LANGUAGE SQL;" |
87 | | - ) |
88 | 42 |
|
89 | | - return str(test_db.connection.engine.url) |
| 43 | +@pytest.fixture(scope="session") |
| 44 | +def database_url(database): |
| 45 | + """add data to the database fixture""" |
| 46 | + db_url = f"postgresql://{database.user}:{database.password}@{database.host}:{database.port}/{database.dbname}" |
| 47 | + with psycopg.connect(db_url, autocommit=True) as conn: |
| 48 | + with conn.cursor() as cur: |
| 49 | + cur.execute(f"ALTER DATABASE {database.dbname} SET TIMEZONE='UTC';") |
| 50 | + cur.execute("SET TIME ZONE 'UTC';") |
| 51 | + |
| 52 | + cur.execute("CREATE EXTENSION IF NOT EXISTS postgis;") |
| 53 | + # make sure postgis extension exists |
| 54 | + assert cur.execute( |
| 55 | + "SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname='postgis' LIMIT 1);" |
| 56 | + ).fetchone()[0] |
| 57 | + |
| 58 | + cur.execute("CREATE SCHEMA IF NOT EXISTS public;") |
| 59 | + |
| 60 | + # load table |
| 61 | + cur.execute(_get_sql(os.path.join(FIXTURES_DIR, "landsat_wrs.sql"))) |
| 62 | + cur.execute(_get_sql(os.path.join(FIXTURES_DIR, "my_data.sql"))) |
| 63 | + cur.execute(_get_sql(os.path.join(FIXTURES_DIR, "nongeo_data.sql"))) |
| 64 | + |
| 65 | + cur.execute( |
| 66 | + "CREATE TABLE public.landsat AS SELECT geom, ST_Centroid(geom) as centroid, ogc_fid, id, pr, path, row from public.landsat_wrs;" |
| 67 | + ) |
| 68 | + cur.execute("ALTER TABLE public.landsat ADD PRIMARY KEY (ogc_fid);") |
| 69 | + |
| 70 | + count_landsat = cur.execute( |
| 71 | + "SELECT COUNT(*) FROM public.landsat_wrs" |
| 72 | + ).fetchone()[0] |
| 73 | + count_landsat_centroid = cur.execute( |
| 74 | + "SELECT COUNT(*) FROM public.landsat" |
| 75 | + ).fetchone()[0] |
| 76 | + assert count_landsat == count_landsat_centroid |
| 77 | + |
| 78 | + # Add table with Huge geometries |
| 79 | + cur.execute(_get_sql(os.path.join(FIXTURES_DIR, "canada.sql"))) |
| 80 | + |
| 81 | + # add table with geometries not in WGS84 |
| 82 | + cur.execute(_get_sql(os.path.join(FIXTURES_DIR, "minnesota.sql"))) |
| 83 | + |
| 84 | + # add a `myschema` schema |
| 85 | + cur.execute("CREATE SCHEMA IF NOT EXISTS myschema;") |
| 86 | + cur.execute( |
| 87 | + "CREATE TABLE myschema.landsat AS SELECT * FROM public.landsat_wrs;" |
| 88 | + ) |
| 89 | + count_landsat_schema = cur.execute( |
| 90 | + "SELECT COUNT(*) FROM myschema.landsat" |
| 91 | + ).fetchone()[0] |
| 92 | + assert count_landsat == count_landsat_schema |
| 93 | + |
| 94 | + # add a `userschema` schema |
| 95 | + cur.execute("CREATE SCHEMA IF NOT EXISTS userschema;") |
| 96 | + |
| 97 | + cur.execute( |
| 98 | + "CREATE OR REPLACE FUNCTION userschema.test_no_params() RETURNS TABLE(foo integer, location geometry) AS 'SELECT 1, ST_MakePoint(0,0);' LANGUAGE SQL;" |
| 99 | + ) |
| 100 | + |
| 101 | + return db_url |
90 | 102 |
|
91 | 103 |
|
92 | 104 | def create_tipg_app( |
@@ -174,10 +186,6 @@ def app(database_url, monkeypatch): |
174 | 186 | db_settings.tables = None |
175 | 187 | db_settings.functions = None |
176 | 188 |
|
177 | | - # Remove middlewares https://github.com/encode/starlette/issues/472 |
178 | | - app.user_middleware = [] |
179 | | - app.middleware_stack = app.build_middleware_stack() |
180 | | - |
181 | 189 | with TestClient(app) as app: |
182 | 190 | yield app |
183 | 191 |
|
|
0 commit comments