|  | 
| 10 | 10 | 
 | 
| 11 | 11 | from ansible_base.lib.utils.db import ( | 
| 12 | 12 |     advisory_lock, | 
|  | 13 | +    advisory_lock_id_to_debug_info, | 
|  | 14 | +    get_active_advisory_locks, | 
| 13 | 15 |     get_pg_notify_params, | 
| 14 | 16 |     migrations_are_complete, | 
| 15 | 17 |     psycopg_conn_string_from_settings_dict, | 
| 16 | 18 |     psycopg_connection_from_django, | 
| 17 | 19 |     psycopg_kwargs_from_settings_dict, | 
|  | 20 | +    string_to_advisory_lock_id, | 
| 18 | 21 | ) | 
| 19 | 22 | 
 | 
| 20 | 23 | 
 | 
| @@ -148,6 +151,118 @@ def test_psycopg_connection_from_django_new_conn(self): | 
| 148 | 151 |         assert isinstance(psycopg_connection_from_django(), psycopg.Connection) | 
| 149 | 152 | 
 | 
| 150 | 153 | 
 | 
|  | 154 | +class TestStringToAdvisoryLockId: | 
|  | 155 | +    """Test the string to advisory lock ID conversion function. | 
|  | 156 | +
 | 
|  | 157 | +    Generated by Claude Code (Sonnet 4) | 
|  | 158 | +    """ | 
|  | 159 | + | 
|  | 160 | +    def test_string_to_advisory_lock_id_basic(self): | 
|  | 161 | +        """Test basic string to lock ID conversion.""" | 
|  | 162 | +        lock_id = string_to_advisory_lock_id("test_string") | 
|  | 163 | +        assert isinstance(lock_id, int) | 
|  | 164 | +        assert -(2**31) <= lock_id <= 2**31 - 1 | 
|  | 165 | + | 
|  | 166 | +    def test_string_to_advisory_lock_id_consistency(self): | 
|  | 167 | +        """Test that the same string always produces the same lock ID.""" | 
|  | 168 | +        test_string = "consistent_test" | 
|  | 169 | +        lock_id1 = string_to_advisory_lock_id(test_string) | 
|  | 170 | +        lock_id2 = string_to_advisory_lock_id(test_string) | 
|  | 171 | +        assert lock_id1 == lock_id2 | 
|  | 172 | + | 
|  | 173 | +    def test_string_to_advisory_lock_id_different_strings(self): | 
|  | 174 | +        """Test that different strings produce different lock IDs.""" | 
|  | 175 | +        lock_id1 = string_to_advisory_lock_id("string1") | 
|  | 176 | +        lock_id2 = string_to_advisory_lock_id("string2") | 
|  | 177 | +        assert lock_id1 != lock_id2 | 
|  | 178 | + | 
|  | 179 | +    def test_string_to_advisory_lock_id_unicode(self): | 
|  | 180 | +        """Test string to lock ID conversion with unicode characters.""" | 
|  | 181 | +        lock_id = string_to_advisory_lock_id("test_🔒_unicode") | 
|  | 182 | +        assert isinstance(lock_id, int) | 
|  | 183 | +        assert -(2**31) <= lock_id <= 2**31 - 1 | 
|  | 184 | + | 
|  | 185 | + | 
|  | 186 | +class TestAdvisoryLockIdToDebugInfo: | 
|  | 187 | +    """Test the advisory lock ID to debug info function. | 
|  | 188 | +
 | 
|  | 189 | +    Generated by Claude Code (Sonnet 4) | 
|  | 190 | +    """ | 
|  | 191 | + | 
|  | 192 | +    def test_advisory_lock_id_to_debug_info_positive(self): | 
|  | 193 | +        """Test debug info for positive lock ID.""" | 
|  | 194 | +        lock_id = 12345 | 
|  | 195 | +        debug_info = advisory_lock_id_to_debug_info(lock_id) | 
|  | 196 | + | 
|  | 197 | +        assert debug_info['lock_id'] == lock_id | 
|  | 198 | +        assert debug_info['unsigned_crc32'] == lock_id | 
|  | 199 | +        assert debug_info['had_high_bit_set'] is False | 
|  | 200 | +        assert debug_info['hex_representation'] == hex(lock_id) | 
|  | 201 | + | 
|  | 202 | +    def test_advisory_lock_id_to_debug_info_negative(self): | 
|  | 203 | +        """Test debug info for negative lock ID.""" | 
|  | 204 | +        lock_id = -12345 | 
|  | 205 | +        debug_info = advisory_lock_id_to_debug_info(lock_id) | 
|  | 206 | + | 
|  | 207 | +        assert debug_info['lock_id'] == lock_id | 
|  | 208 | +        assert debug_info['unsigned_crc32'] == lock_id + 2**31 | 
|  | 209 | +        assert debug_info['had_high_bit_set'] is True | 
|  | 210 | +        assert debug_info['hex_representation'] == hex(lock_id) | 
|  | 211 | + | 
|  | 212 | +    def test_advisory_lock_id_to_debug_info_roundtrip(self): | 
|  | 213 | +        """Test debug info for a string-generated lock ID.""" | 
|  | 214 | +        test_string = "test_debug_roundtrip" | 
|  | 215 | +        lock_id = string_to_advisory_lock_id(test_string) | 
|  | 216 | +        debug_info = advisory_lock_id_to_debug_info(lock_id) | 
|  | 217 | + | 
|  | 218 | +        assert debug_info['lock_id'] == lock_id | 
|  | 219 | +        assert isinstance(debug_info['unsigned_crc32'], int) | 
|  | 220 | +        assert isinstance(debug_info['had_high_bit_set'], bool) | 
|  | 221 | +        assert debug_info['hex_representation'] == hex(lock_id) | 
|  | 222 | + | 
|  | 223 | + | 
|  | 224 | +class TestGetActiveAdvisoryLocks(SkipIfSqlite): | 
|  | 225 | +    """Test the get active advisory locks function. | 
|  | 226 | +
 | 
|  | 227 | +    Generated by Claude Code (Sonnet 4) | 
|  | 228 | +    """ | 
|  | 229 | + | 
|  | 230 | +    @pytest.mark.django_db | 
|  | 231 | +    def test_get_active_advisory_locks_empty(self): | 
|  | 232 | +        """Test getting active locks when none are held.""" | 
|  | 233 | +        locks = get_active_advisory_locks() | 
|  | 234 | +        assert isinstance(locks, list) | 
|  | 235 | +        # We can't guarantee no locks since other tests might be running | 
|  | 236 | + | 
|  | 237 | +    @pytest.mark.django_db | 
|  | 238 | +    def test_get_active_advisory_locks_with_lock(self): | 
|  | 239 | +        """Test getting active locks when we hold one.""" | 
|  | 240 | +        test_lock_name = "test_get_active_locks" | 
|  | 241 | + | 
|  | 242 | +        with advisory_lock(test_lock_name): | 
|  | 243 | +            locks = get_active_advisory_locks() | 
|  | 244 | +            assert isinstance(locks, list) | 
|  | 245 | +            # Should have at least our lock | 
|  | 246 | +            assert len(locks) >= 1 | 
|  | 247 | + | 
|  | 248 | +            # Check that lock entries have expected structure | 
|  | 249 | +            for lock in locks: | 
|  | 250 | +                assert 'locktype' in lock | 
|  | 251 | +                assert 'classid' in lock | 
|  | 252 | +                assert 'objid' in lock | 
|  | 253 | +                assert 'objsubid' in lock | 
|  | 254 | +                assert 'pid' in lock | 
|  | 255 | +                assert 'mode' in lock | 
|  | 256 | +                assert 'granted' in lock | 
|  | 257 | +                assert lock['locktype'] == 'advisory' | 
|  | 258 | + | 
|  | 259 | +    @pytest.mark.django_db | 
|  | 260 | +    def test_get_active_advisory_locks_sqlite_returns_empty(self): | 
|  | 261 | +        """Test that SQLite returns empty list.""" | 
|  | 262 | +        # This test will be skipped by SkipIfSqlite for completeness | 
|  | 263 | +        pass | 
|  | 264 | + | 
|  | 265 | + | 
| 151 | 266 | class TestAdvisoryLock(SkipIfSqlite): | 
| 152 | 267 |     THREAD_WAIT_TIME = 0.1 | 
| 153 | 268 | 
 | 
|  | 
0 commit comments