|
10 | 10 |
|
11 | 11 | from ansible_base.lib.utils.db import (
|
12 | 12 | advisory_lock,
|
| 13 | + advisory_lock_id_to_debug_info, |
| 14 | + get_active_advisory_locks, |
13 | 15 | get_pg_notify_params,
|
14 | 16 | migrations_are_complete,
|
15 | 17 | psycopg_conn_string_from_settings_dict,
|
16 | 18 | psycopg_connection_from_django,
|
17 | 19 | psycopg_kwargs_from_settings_dict,
|
| 20 | + string_to_advisory_lock_id, |
18 | 21 | )
|
19 | 22 |
|
20 | 23 |
|
@@ -148,6 +151,118 @@ def test_psycopg_connection_from_django_new_conn(self):
|
148 | 151 | assert isinstance(psycopg_connection_from_django(), psycopg.Connection)
|
149 | 152 |
|
150 | 153 |
|
| 154 | +class TestStringToAdvisoryLockId: |
| 155 | + """Test the string to advisory lock ID conversion function. |
| 156 | +
|
| 157 | + Generated by Claude Code (Sonnet 4) |
| 158 | + """ |
| 159 | + |
| 160 | + def test_string_to_advisory_lock_id_basic(self): |
| 161 | + """Test basic string to lock ID conversion.""" |
| 162 | + lock_id = string_to_advisory_lock_id("test_string") |
| 163 | + assert isinstance(lock_id, int) |
| 164 | + assert -(2**31) <= lock_id <= 2**31 - 1 |
| 165 | + |
| 166 | + def test_string_to_advisory_lock_id_consistency(self): |
| 167 | + """Test that the same string always produces the same lock ID.""" |
| 168 | + test_string = "consistent_test" |
| 169 | + lock_id1 = string_to_advisory_lock_id(test_string) |
| 170 | + lock_id2 = string_to_advisory_lock_id(test_string) |
| 171 | + assert lock_id1 == lock_id2 |
| 172 | + |
| 173 | + def test_string_to_advisory_lock_id_different_strings(self): |
| 174 | + """Test that different strings produce different lock IDs.""" |
| 175 | + lock_id1 = string_to_advisory_lock_id("string1") |
| 176 | + lock_id2 = string_to_advisory_lock_id("string2") |
| 177 | + assert lock_id1 != lock_id2 |
| 178 | + |
| 179 | + def test_string_to_advisory_lock_id_unicode(self): |
| 180 | + """Test string to lock ID conversion with unicode characters.""" |
| 181 | + lock_id = string_to_advisory_lock_id("test_🔒_unicode") |
| 182 | + assert isinstance(lock_id, int) |
| 183 | + assert -(2**31) <= lock_id <= 2**31 - 1 |
| 184 | + |
| 185 | + |
| 186 | +class TestAdvisoryLockIdToDebugInfo: |
| 187 | + """Test the advisory lock ID to debug info function. |
| 188 | +
|
| 189 | + Generated by Claude Code (Sonnet 4) |
| 190 | + """ |
| 191 | + |
| 192 | + def test_advisory_lock_id_to_debug_info_positive(self): |
| 193 | + """Test debug info for positive lock ID.""" |
| 194 | + lock_id = 12345 |
| 195 | + debug_info = advisory_lock_id_to_debug_info(lock_id) |
| 196 | + |
| 197 | + assert debug_info['lock_id'] == lock_id |
| 198 | + assert debug_info['unsigned_crc32'] == lock_id |
| 199 | + assert debug_info['had_high_bit_set'] is False |
| 200 | + assert debug_info['hex_representation'] == hex(lock_id) |
| 201 | + |
| 202 | + def test_advisory_lock_id_to_debug_info_negative(self): |
| 203 | + """Test debug info for negative lock ID.""" |
| 204 | + lock_id = -12345 |
| 205 | + debug_info = advisory_lock_id_to_debug_info(lock_id) |
| 206 | + |
| 207 | + assert debug_info['lock_id'] == lock_id |
| 208 | + assert debug_info['unsigned_crc32'] == lock_id + 2**31 |
| 209 | + assert debug_info['had_high_bit_set'] is True |
| 210 | + assert debug_info['hex_representation'] == hex(lock_id) |
| 211 | + |
| 212 | + def test_advisory_lock_id_to_debug_info_roundtrip(self): |
| 213 | + """Test debug info for a string-generated lock ID.""" |
| 214 | + test_string = "test_debug_roundtrip" |
| 215 | + lock_id = string_to_advisory_lock_id(test_string) |
| 216 | + debug_info = advisory_lock_id_to_debug_info(lock_id) |
| 217 | + |
| 218 | + assert debug_info['lock_id'] == lock_id |
| 219 | + assert isinstance(debug_info['unsigned_crc32'], int) |
| 220 | + assert isinstance(debug_info['had_high_bit_set'], bool) |
| 221 | + assert debug_info['hex_representation'] == hex(lock_id) |
| 222 | + |
| 223 | + |
| 224 | +class TestGetActiveAdvisoryLocks(SkipIfSqlite): |
| 225 | + """Test the get active advisory locks function. |
| 226 | +
|
| 227 | + Generated by Claude Code (Sonnet 4) |
| 228 | + """ |
| 229 | + |
| 230 | + @pytest.mark.django_db |
| 231 | + def test_get_active_advisory_locks_empty(self): |
| 232 | + """Test getting active locks when none are held.""" |
| 233 | + locks = get_active_advisory_locks() |
| 234 | + assert isinstance(locks, list) |
| 235 | + # We can't guarantee no locks since other tests might be running |
| 236 | + |
| 237 | + @pytest.mark.django_db |
| 238 | + def test_get_active_advisory_locks_with_lock(self): |
| 239 | + """Test getting active locks when we hold one.""" |
| 240 | + test_lock_name = "test_get_active_locks" |
| 241 | + |
| 242 | + with advisory_lock(test_lock_name): |
| 243 | + locks = get_active_advisory_locks() |
| 244 | + assert isinstance(locks, list) |
| 245 | + # Should have at least our lock |
| 246 | + assert len(locks) >= 1 |
| 247 | + |
| 248 | + # Check that lock entries have expected structure |
| 249 | + for lock in locks: |
| 250 | + assert 'locktype' in lock |
| 251 | + assert 'classid' in lock |
| 252 | + assert 'objid' in lock |
| 253 | + assert 'objsubid' in lock |
| 254 | + assert 'pid' in lock |
| 255 | + assert 'mode' in lock |
| 256 | + assert 'granted' in lock |
| 257 | + assert lock['locktype'] == 'advisory' |
| 258 | + |
| 259 | + @pytest.mark.django_db |
| 260 | + def test_get_active_advisory_locks_sqlite_returns_empty(self): |
| 261 | + """Test that SQLite returns empty list.""" |
| 262 | + # This test will be skipped by SkipIfSqlite for completeness |
| 263 | + pass |
| 264 | + |
| 265 | + |
151 | 266 | class TestAdvisoryLock(SkipIfSqlite):
|
152 | 267 | THREAD_WAIT_TIME = 0.1
|
153 | 268 |
|
|
0 commit comments