Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
### 1.5.0
- fix: #139: Fix to replicas query when using default as cluster name
- feat: #133: Fix simultaneous queries error when iteration is interrupted
- feat: #130: Add `distributed_migrations` database setting to support distributed migration queries.
- feat: #129: Add `toYearWeek` datetime functionality
Expand Down
15 changes: 11 additions & 4 deletions clickhouse_backend/patch/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,20 @@ def _check_replicas(connection):
return connection.has_replicas

with connection.cursor() as cursor:
cursor.execute(
f"select replica_num from system.clusters where cluster={connection.migration_cluster}"
)
(replica_count,) = cursor.fetchone()
replica_count = _get_replicas(connection.migration_cluster, cursor)
return replica_count >= 1


def _get_replicas(cluster_name, cursor):
cursor.execute(
"select replica_num from system.clusters where cluster=%s", [cluster_name]
)
res = cursor.fetchone()
if not res:
return 0
return res[0]


def patch_migrations():
patch_migration_recorder()
patch_migration()
Expand Down
23 changes: 23 additions & 0 deletions tests/migrations/test_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from clickhouse_backend import compat
from clickhouse_backend.backend.base import DatabaseWrapper
from clickhouse_backend.patch.migrations import _check_replicas, _get_replicas

from .test_base import MigrationTestBase

Expand Down Expand Up @@ -201,6 +202,28 @@ def test_apply_unapply_distributed(self):
conn = connections[db]
self.assertMigrationExists(conn, "0432_ponies", "myapp", deleted=True)

def test_checking_cluster_replicas(self):
"""
Tests checking cluster replicas for migrations
"""
db = DatabaseWrapper(deepcopy(self.lb), alias="load_balancer")
connections["load_balancer"] = db

for db in self.databases:
wrapper = connections[db]
conn = wrapper.client.connection
has_clusters = _check_replicas(conn)
self.assertEqual(has_clusters, True)

with connections["default"].cursor() as cursor:
# testing "default" cluster name on query
replicas = _get_replicas("default", cursor)
self.assertFalse(replicas)

# testing migration_cluster query
replicas = _get_replicas(self.lb["OPTIONS"]["migration_cluster"], cursor)
self.assertEqual(replicas, 1)


class LoaderTests(TestCase):
"""
Expand Down