diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b5d361..1cd25c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ### 1.5.0 +- fix: #139: Fix to replicas query when using default as cluster name - feat: #133: Fix simultaneous queries error when iteration is interrupted - feat: #130: Add `distributed_migrations` database setting to support distributed migration queries. - feat: #129: Add `toYearWeek` datetime functionality diff --git a/clickhouse_backend/patch/migrations.py b/clickhouse_backend/patch/migrations.py index e4e3f15..3ef3708 100644 --- a/clickhouse_backend/patch/migrations.py +++ b/clickhouse_backend/patch/migrations.py @@ -44,13 +44,20 @@ def _check_replicas(connection): return connection.has_replicas with connection.cursor() as cursor: - cursor.execute( - f"select replica_num from system.clusters where cluster={connection.migration_cluster}" - ) - (replica_count,) = cursor.fetchone() + replica_count = _get_replicas(connection.migration_cluster, cursor) return replica_count >= 1 +def _get_replicas(cluster_name, cursor): + cursor.execute( + "select replica_num from system.clusters where cluster=%s", [cluster_name] + ) + res = cursor.fetchone() + if not res: + return 0 + return res[0] + + def patch_migrations(): patch_migration_recorder() patch_migration() diff --git a/tests/migrations/test_loader.py b/tests/migrations/test_loader.py index 1110a0e..06d38e6 100644 --- a/tests/migrations/test_loader.py +++ b/tests/migrations/test_loader.py @@ -15,6 +15,7 @@ from clickhouse_backend import compat from clickhouse_backend.backend.base import DatabaseWrapper +from clickhouse_backend.patch.migrations import _check_replicas, _get_replicas from .test_base import MigrationTestBase @@ -201,6 +202,28 @@ def test_apply_unapply_distributed(self): conn = connections[db] self.assertMigrationExists(conn, "0432_ponies", "myapp", deleted=True) + def test_checking_cluster_replicas(self): + """ + Tests checking cluster replicas for migrations + """ + db = DatabaseWrapper(deepcopy(self.lb), alias="load_balancer") + connections["load_balancer"] = db + + for db in self.databases: + wrapper = connections[db] + conn = wrapper.client.connection + has_clusters = _check_replicas(conn) + self.assertEqual(has_clusters, True) + + with connections["default"].cursor() as cursor: + # testing "default" cluster name on query + replicas = _get_replicas("default", cursor) + self.assertFalse(replicas) + + # testing migration_cluster query + replicas = _get_replicas(self.lb["OPTIONS"]["migration_cluster"], cursor) + self.assertEqual(replicas, 1) + class LoaderTests(TestCase): """