@@ -197,6 +197,101 @@ def test_check_constraints(self):
197197 constraints_check = next ((r for r in report .check_results if r .key == "constraints" ), None )
198198 self .assertTrue (constraints_check .passed )
199199
200+ def test_check_constraint_definition_changes (self ):
201+ """Test that constraint definition changes are detected.
202+
203+ This test addresses the issue from https://github.com/qwat/qwat-data-model/pull/366
204+ where a constraint definition was changed from 'year > 1800' to 'year >= 1800'.
205+ The checker should detect such changes in constraint definitions, not just
206+ added/removed constraints.
207+ """
208+ # Clean up any previous test data
209+ for service in (self .pg_service1 , self .pg_service2 ):
210+ with psycopg .connect (f"service={ service } " ) as conn :
211+ cur = conn .cursor ()
212+ cur .execute ("DROP SCHEMA IF EXISTS pum_test_constraint_def CASCADE;" )
213+ cur .execute ("DROP TABLE IF EXISTS public.pum_migrations;" )
214+
215+ # Use the constraint definition change test data
216+ test_dir = Path ("test" ) / "data" / "constraint_definition_change"
217+ cfg = PumConfig (test_dir )
218+
219+ # Install version 1.0.0 on both databases with old constraint definition (year > 1800)
220+ with psycopg .connect (f"service={ self .pg_service1 } " ) as conn :
221+ upgrader = Upgrader (config = cfg )
222+ upgrader .install (connection = conn , max_version = "1.0.0" )
223+
224+ with psycopg .connect (f"service={ self .pg_service2 } " ) as conn :
225+ upgrader = Upgrader (config = cfg )
226+ upgrader .install (connection = conn , max_version = "1.0.0" )
227+
228+ # At this point, both databases should be identical
229+ checker = Checker (self .pg_service1 , self .pg_service2 , exclude_schema = ["public" ])
230+ report = checker .run_checks ()
231+ checker .conn1 .close ()
232+ checker .conn2 .close ()
233+
234+ constraints_check = next ((r for r in report .check_results if r .key == "constraints" ), None )
235+ self .assertIsNotNone (constraints_check )
236+ self .assertTrue (
237+ constraints_check .passed ,
238+ "Databases should be identical after both installed with 1.0.0" ,
239+ )
240+
241+ # Now upgrade DB1 to 1.1.0 which changes the constraint definition
242+ # from 'year > 1800' to 'year >= 1800'
243+ with psycopg .connect (f"service={ self .pg_service1 } " ) as conn :
244+ upgrader = Upgrader (config = cfg )
245+ upgrader .upgrade (connection = conn )
246+
247+ # Check should now detect the constraint definition difference
248+ checker = Checker (self .pg_service1 , self .pg_service2 , exclude_schema = ["public" ])
249+ report = checker .run_checks ()
250+ checker .conn1 .close ()
251+ checker .conn2 .close ()
252+
253+ constraints_check = next ((r for r in report .check_results if r .key == "constraints" ), None )
254+ self .assertIsNotNone (constraints_check )
255+ self .assertFalse (
256+ constraints_check .passed , "Checker should detect constraint definition changes"
257+ )
258+
259+ # Verify we have differences reported
260+ self .assertGreater (
261+ len (constraints_check .differences ),
262+ 0 ,
263+ "Should report differences in constraint definitions" ,
264+ )
265+
266+ # Verify the differences mention the constraint
267+ differences_str = str (constraints_check .differences )
268+ self .assertIn (
269+ "pipe_year_check" , differences_str , "Should report the pipe_year_check constraint"
270+ )
271+
272+ # Now upgrade DB2 to match DB1
273+ with psycopg .connect (f"service={ self .pg_service2 } " ) as conn :
274+ upgrader = Upgrader (config = cfg )
275+ upgrader .upgrade (connection = conn )
276+
277+ # Should now be identical again
278+ checker = Checker (self .pg_service1 , self .pg_service2 , exclude_schema = ["public" ])
279+ report = checker .run_checks ()
280+ checker .conn1 .close ()
281+ checker .conn2 .close ()
282+
283+ constraints_check = next ((r for r in report .check_results if r .key == "constraints" ), None )
284+ self .assertTrue (
285+ constraints_check .passed , "Databases should be identical after both upgraded to 1.1.0"
286+ )
287+
288+ # Cleanup
289+ for service in (self .pg_service1 , self .pg_service2 ):
290+ with psycopg .connect (f"service={ service } " ) as conn :
291+ cur = conn .cursor ()
292+ cur .execute ("DROP SCHEMA IF EXISTS pum_test_constraint_def CASCADE;" )
293+ cur .execute ("DROP TABLE IF EXISTS public.pum_migrations;" )
294+
200295 def test_check_views (self ):
201296 """Test view comparison between databases."""
202297 # Install same version on both databases first
0 commit comments