Skip to content

Commit 0ccbc43

Browse files
authored
add static method to get schema migration details (#164)
1 parent 69a7a74 commit 0ccbc43

File tree

2 files changed

+145
-0
lines changed

2 files changed

+145
-0
lines changed

pum/schema_migrations.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,75 @@ def schemas_with_migrations(connection: psycopg.Connection) -> list[str]:
9999
cursor = SqlContent(query).execute(connection, parameters={})
100100
return [row[0] for row in (cursor._pum_results or [])]
101101

102+
@staticmethod
103+
def schemas_with_migration_details(
104+
connection: psycopg.Connection,
105+
) -> list[dict]:
106+
"""Return detailed migration info for each schema that has a pum_migrations table.
107+
108+
For each schema, returns the module name, current version, first installation
109+
date and latest upgrade date (if different from the installation date).
110+
111+
Args:
112+
connection: The database connection.
113+
114+
Returns:
115+
list[dict]: A list of dicts with keys: schema, module, version,
116+
installed_date, upgrade_date (None if never upgraded).
117+
"""
118+
schemas = SchemaMigrations.schemas_with_migrations(connection)
119+
details = []
120+
for schema in schemas:
121+
table_id = psycopg.sql.SQL(".").join(
122+
[psycopg.sql.Identifier(schema), psycopg.sql.Identifier(MIGRATION_TABLE_NAME)]
123+
)
124+
query = psycopg.sql.SQL(
125+
"""
126+
SELECT
127+
module,
128+
version,
129+
installed_date,
130+
CASE WHEN upgrade_date > installed_date THEN upgrade_date END AS upgrade_date,
131+
beta_testing
132+
FROM (
133+
SELECT
134+
module,
135+
(SELECT version FROM {table} ORDER BY version DESC, date_installed DESC LIMIT 1) AS version,
136+
MIN(date_installed) AS installed_date,
137+
MAX(date_installed) AS upgrade_date,
138+
(SELECT beta_testing FROM {table} ORDER BY version DESC, date_installed DESC LIMIT 1) AS beta_testing
139+
FROM {table}
140+
GROUP BY module
141+
) sub
142+
"""
143+
)
144+
try:
145+
cursor = SqlContent(query).execute(connection, parameters={"table": table_id})
146+
for row in cursor._pum_results or []:
147+
details.append(
148+
{
149+
"schema": schema,
150+
"module": row[0],
151+
"version": row[1],
152+
"installed_date": row[2],
153+
"upgrade_date": row[3],
154+
"beta_testing": row[4],
155+
}
156+
)
157+
except Exception:
158+
logger.warning(f"Could not read migration details from schema {schema}")
159+
details.append(
160+
{
161+
"schema": schema,
162+
"module": None,
163+
"version": None,
164+
"installed_date": None,
165+
"upgrade_date": None,
166+
"beta_testing": None,
167+
}
168+
)
169+
return details
170+
102171
def exists_in_other_schemas(self, connection: psycopg.Connection) -> list[str]:
103172
"""Check if the schema_migrations information table exists in other schemas.
104173

test/test_schema_migrations.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,79 @@ def test_schemas_with_migrations(self) -> None:
172172
# Test exists_in_other_schemas for sm2
173173
other_schemas = sm2.exists_in_other_schemas(conn)
174174
self.assertEqual(other_schemas, ["public"])
175+
176+
def test_schemas_with_migration_details_empty(self) -> None:
177+
"""Test schemas_with_migration_details when no migrations exist."""
178+
with psycopg.connect(f"service={self.pg_service}") as conn:
179+
details = SchemaMigrations.schemas_with_migration_details(conn)
180+
self.assertEqual(details, [])
181+
182+
def test_schemas_with_migration_details_single_schema(self) -> None:
183+
"""Test schemas_with_migration_details with a single installed module."""
184+
test_dir = Path("test") / "data" / "single_changelog"
185+
cfg = PumConfig(test_dir, pum={"module": "test_module"})
186+
sm = SchemaMigrations(cfg)
187+
188+
with psycopg.connect(f"service={self.pg_service}") as conn:
189+
sm.create(connection=conn)
190+
sm.set_baseline(connection=conn, version="1.2.3")
191+
192+
details = SchemaMigrations.schemas_with_migration_details(conn)
193+
self.assertEqual(len(details), 1)
194+
detail = details[0]
195+
self.assertEqual(detail["schema"], "public")
196+
self.assertEqual(detail["module"], "test_module")
197+
self.assertEqual(detail["version"], "1.2.3")
198+
self.assertIsNotNone(detail["installed_date"])
199+
self.assertIsNone(detail["upgrade_date"]) # No upgrade yet
200+
self.assertFalse(detail["beta_testing"])
201+
202+
def test_schemas_with_migration_details_after_upgrade(self) -> None:
203+
"""Test schemas_with_migration_details shows upgrade_date after a second baseline."""
204+
test_dir = Path("test") / "data" / "single_changelog"
205+
cfg = PumConfig(test_dir, pum={"module": "test_module"})
206+
sm = SchemaMigrations(cfg)
207+
208+
with psycopg.connect(f"service={self.pg_service}") as conn:
209+
sm.create(connection=conn)
210+
sm.set_baseline(connection=conn, version="1.2.3", commit=True)
211+
212+
# Use a separate connection/transaction so now() differs
213+
with psycopg.connect(f"service={self.pg_service}") as conn:
214+
sm.set_baseline(connection=conn, version="1.2.4", commit=True)
215+
216+
details = SchemaMigrations.schemas_with_migration_details(conn)
217+
self.assertEqual(len(details), 1)
218+
detail = details[0]
219+
self.assertEqual(detail["version"], "1.2.4")
220+
self.assertIsNotNone(detail["installed_date"])
221+
self.assertIsNotNone(detail["upgrade_date"]) # Upgrade happened
222+
223+
def test_schemas_with_migration_details_multiple_schemas(self) -> None:
224+
"""Test schemas_with_migration_details with modules in different schemas."""
225+
test_dir = Path("test") / "data" / "single_changelog"
226+
cfg1 = PumConfig(test_dir, pum={"module": "module_a", "migration_table_schema": "public"})
227+
cfg2 = PumConfig(
228+
test_dir,
229+
pum={"module": "module_b", "migration_table_schema": "pum_custom_migrations_schema"},
230+
)
231+
sm1 = SchemaMigrations(cfg1)
232+
sm2 = SchemaMigrations(cfg2)
233+
234+
with psycopg.connect(f"service={self.pg_service}") as conn:
235+
sm1.create(connection=conn, allow_multiple_modules=True)
236+
sm1.set_baseline(connection=conn, version="1.0.0")
237+
238+
sm2.create(connection=conn, allow_multiple_modules=True)
239+
sm2.set_baseline(connection=conn, version="2.0.0")
240+
241+
details = SchemaMigrations.schemas_with_migration_details(conn)
242+
self.assertEqual(len(details), 2)
243+
244+
by_schema = {d["schema"]: d for d in details}
245+
self.assertIn("public", by_schema)
246+
self.assertIn("pum_custom_migrations_schema", by_schema)
247+
self.assertEqual(by_schema["public"]["module"], "module_a")
248+
self.assertEqual(by_schema["public"]["version"], "1.0.0")
249+
self.assertEqual(by_schema["pum_custom_migrations_schema"]["module"], "module_b")
250+
self.assertEqual(by_schema["pum_custom_migrations_schema"]["version"], "2.0.0")

0 commit comments

Comments
 (0)