From 5ce88348046b63fe787e4a02d2a65cbf1f944dc1 Mon Sep 17 00:00:00 2001 From: Ronaldo Campos Date: Wed, 13 Aug 2025 17:33:02 +0100 Subject: [PATCH 01/10] Handle distributed migrations --- clickhouse_backend/backend/base.py | 3 + clickhouse_backend/patch/migrations.py | 133 +++++++++++++++++++++---- compose.yaml | 20 ++++ proxy-config/haproxy.cfg | 26 +++++ tests/migrations/test_loader.py | 104 +++++++++++++++++++ 5 files changed, 268 insertions(+), 18 deletions(-) create mode 100644 proxy-config/haproxy.cfg diff --git a/clickhouse_backend/backend/base.py b/clickhouse_backend/backend/base.py index 3e382fa..152191b 100644 --- a/clickhouse_backend/backend/base.py +++ b/clickhouse_backend/backend/base.py @@ -163,6 +163,9 @@ def __init__(self, settings_dict, alias=DEFAULT_DB_ALIAS): self.migration_cluster = self.settings_dict["OPTIONS"].pop( "migration_cluster", None ) + self.distributed_migrations = self.settings_dict["OPTIONS"].pop( + "distributed_migrations", None + ) # https://clickhouse-driver.readthedocs.io/en/latest/quickstart.html#streaming-results self.max_block_size = self.settings_dict["OPTIONS"].pop("max_block_size", 65409) if not self.settings_dict["NAME"]: diff --git a/clickhouse_backend/patch/migrations.py b/clickhouse_backend/patch/migrations.py index bdf2bd9..2203491 100644 --- a/clickhouse_backend/patch/migrations.py +++ b/clickhouse_backend/patch/migrations.py @@ -1,7 +1,8 @@ from django.apps.registry import Apps +from django.db import DatabaseError from django.db import models as django_models from django.db.migrations import Migration -from django.db.migrations.exceptions import IrreversibleError +from django.db.migrations.exceptions import IrreversibleError, MigrationSchemaMissing from django.db.migrations.operations.fields import FieldOperation from django.db.migrations.operations.models import ( DeleteModel, @@ -15,6 +16,26 @@ __all__ = ["patch_migrations", "patch_migration_recorder", "patch_migration"] +def _should_distribute_migrations(connection): + """ + Check if the connection is configured for distributed migrations. + """ + return getattr(connection, "distributed_migrations", False) and getattr( + connection, "migration_cluster", None + ) + + +def _get_model_table_name(connection): + """ + Return the name of the table that will be used by the MigrationRecorder. + If distributed migrations are enabled, return the distributed table name. + Otherwise, return the regular django_migrations table name. + """ + if _should_distribute_migrations(connection): + return "distributed_django_migrations" + return "django_migrations" + + def patch_migrations(): patch_migration_recorder() patch_migration() @@ -29,22 +50,63 @@ def Migration(self): if self._migration_class is None: if self.connection.vendor == "clickhouse": from clickhouse_backend import models + from clickhouse_backend.models import currentDatabase - class Migration(models.ClickhouseModel): - app = models.StringField(max_length=255) - name = models.StringField(max_length=255) - applied = models.DateTime64Field(default=now) - deleted = models.BoolField(default=False) + # Only create a distributed migration model if the connection + # has distributed migrations enabled and a migration cluster is set. + # otherwise, create a regular merge tree. + if _should_distribute_migrations(self.connection): - class Meta: - apps = Apps() - app_label = "migrations" - db_table = "django_migrations" - engine = models.MergeTree(order_by=("app", "name")) - cluster = getattr(self.connection, "migration_cluster", None) + class _Migration(models.ClickhouseModel): + app = models.StringField(max_length=255) + name = models.StringField(max_length=255) + applied = models.DateTime64Field(default=now) + deleted = models.BoolField(default=False) - def __str__(self): - return "Migration %s for %s" % (self.name, self.app) + class Meta: + apps = Apps() + app_label = "migrations" + db_table = "django_migrations" + engine = models.MergeTree(order_by=("app", "name")) + cluster = getattr(self.connection, "migration_cluster") + + def __str__(self): + return "Migration %s for %s" % (self.name, self.app) + + class Migration(models.ClickhouseModel): + app = models.StringField(max_length=255) + name = models.StringField(max_length=255) + applied = models.DateTime64Field(default=now) + deleted = models.BoolField(default=False) + + class Meta: + apps = Apps() + app_label = "migrations" + db_table = _get_model_table_name(self.connection) + engine = models.Distributed( + getattr(self.connection, "migration_cluster"), + currentDatabase(), + _Migration._meta.db_table, + models.Rand(), + ) + cluster = getattr(self.connection, "migration_cluster") + + Migration._meta.local_model_class = _Migration + + else: + + class Migration(models.ClickhouseModel): + app = models.StringField(max_length=255) + name = models.StringField(max_length=255) + applied = models.DateTime64Field(default=now) + deleted = models.BoolField(default=False) + + class Meta: + apps = Apps() + app_label = "migrations" + db_table = _get_model_table_name(self.connection) + engine = models.MergeTree(order_by=("app", "name")) + cluster = getattr(self.connection, "migration_cluster") else: @@ -69,15 +131,45 @@ def has_table(self): # Assert migration table won't be deleted once created. if not getattr(self, "_has_table", False): with self.connection.cursor() as cursor: + table = self.Migration._meta.db_table tables = self.connection.introspection.table_names(cursor) - self._has_table = self.Migration._meta.db_table in tables + self._has_table = table in tables if self._has_table and self.connection.vendor == "clickhouse": # fix https://github.com/jayvynl/django-clickhouse-backend/issues/51 cursor.execute( - "ALTER table django_migrations ADD COLUMN IF NOT EXISTS deleted Bool" + f"ALTER table {table} ADD COLUMN IF NOT EXISTS deleted Bool" ) return self._has_table + def ensure_schema(self): + """Ensure the table exists and has the correct schema.""" + # If the table's there, that's fine - we've never changed its schema + # in the codebase. + if self.has_table(): + return + + # In case of distributed migrations, we need to ensure the local model exists first and + # then create the distributed model. + try: + with self.connection.schema_editor() as editor: + if ( + editor.connection.vendor == "clickhouse" + and _should_distribute_migrations(editor.connection) + ): + with editor.connection.cursor() as cursor: + tables = editor.connection.introspection.table_names(cursor) + local_model_class = self.Migration._meta.local_model_class + local_table = local_model_class._meta.db_table + if local_table not in tables: + # Create the local model first + editor.create_model(self.Migration._meta.local_model_class) + + editor.create_model(self.Migration) + except DatabaseError as exc: + raise MigrationSchemaMissing( + "Unable to create the django_migrations table (%s)" % exc + ) + def migration_qs(self): if self.connection.vendor == "clickhouse": return self.Migration.objects.using(self.connection.alias).filter( @@ -118,6 +210,7 @@ def flush(self): MigrationRecorder.Migration = property(Migration) MigrationRecorder.has_table = has_table + MigrationRecorder.ensure_schema = ensure_schema MigrationRecorder.migration_qs = property(migration_qs) MigrationRecorder.record_applied = record_applied MigrationRecorder.record_unapplied = record_unapplied @@ -136,13 +229,15 @@ def apply(self, project_state, schema_editor, collect_sql=False): """ applied_on_remote = False if getattr(schema_editor.connection, "migration_cluster", None): + _table = _get_model_table_name(schema_editor.connection) + with schema_editor.connection.cursor() as cursor: cursor.execute( "select EXISTS(select 1 from clusterAllReplicas(%s, currentDatabase(), %s)" " where app=%s and name=%s and deleted=false)", [ schema_editor.connection.migration_cluster, - "django_migrations", + _table, self.app_label, self.name, ], @@ -203,13 +298,15 @@ def unapply(self, project_state, schema_editor, collect_sql=False): """ unapplied_on_remote = False if getattr(schema_editor.connection, "migration_cluster", None): + _table = _get_model_table_name(schema_editor.connection) + with schema_editor.connection.cursor() as cursor: cursor.execute( "select EXISTS(select 1 from clusterAllReplicas(%s, currentDatabase(), %s)" " where app=%s and name=%s and deleted=true)", [ schema_editor.connection.migration_cluster, - "django_migrations", + _table, self.app_label, self.name, ], diff --git a/compose.yaml b/compose.yaml index 9e3c6d4..c2daa51 100644 --- a/compose.yaml +++ b/compose.yaml @@ -21,6 +21,7 @@ services: - "node1:/var/lib/clickhouse/" - "./clickhouse-config/node1/:/etc/clickhouse-server/config.d/" ports: + - "127.0.0.1:8123:8123" - "127.0.0.1:9000:9000" node2: <<: *base-service @@ -29,6 +30,7 @@ services: - "node2:/var/lib/clickhouse/" - "./clickhouse-config/node2/:/etc/clickhouse-server/config.d/" ports: + - "127.0.0.1:8124:8123" - "127.0.0.1:9001:9000" node3: <<: *base-service @@ -37,6 +39,7 @@ services: - "node3:/var/lib/clickhouse/" - "./clickhouse-config/node3/:/etc/clickhouse-server/config.d/" ports: + - "127.0.0.1:8125:8123" - "127.0.0.1:9002:9000" node4: <<: *base-service @@ -45,8 +48,25 @@ services: - "node4:/var/lib/clickhouse/" - "./clickhouse-config/node4/:/etc/clickhouse-server/config.d/" ports: + - "127.0.0.1:8126:8123" - "127.0.0.1:9003:9000" + haproxy: + image: haproxy:latest + container_name: clickhouse-haproxy + command: sh -c "haproxy -f /usr/local/etc/haproxy/haproxy.cfg" + restart: always + volumes: + - "./proxy-config/haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg" + ports: + - "127.0.0.1:8127:8123" + - "127.0.0.1:9004:9000" + depends_on: + node1: + condition: service_healthy + node3: + condition: service_healthy + volumes: node1: name: clickhouse-node1 diff --git a/proxy-config/haproxy.cfg b/proxy-config/haproxy.cfg new file mode 100644 index 0000000..983aef8 --- /dev/null +++ b/proxy-config/haproxy.cfg @@ -0,0 +1,26 @@ +# TCP frontend for native ClickHouse protocol +frontend clickhouse_tcp + bind *:9000 + mode tcp + default_backend clickhouse_tcp_nodes + +backend clickhouse_tcp_nodes + mode tcp + balance roundrobin + option tcp-check + tcp-check connect + server ch1 clickhouse-node1:9000 check + server ch2 clickhouse-node3:9000 check + +# HTTP frontend for ClickHouse's HTTP interface +frontend clickhouse_http + bind *:8123 + mode http + default_backend clickhouse_http_nodes + +backend clickhouse_http_nodes + mode http + balance roundrobin + option httpchk GET /ping + server ch1 clickhouse-node1:8123 check + server ch2 clickhouse-node3:8123 check diff --git a/tests/migrations/test_loader.py b/tests/migrations/test_loader.py index dd448e7..6dfbfe6 100644 --- a/tests/migrations/test_loader.py +++ b/tests/migrations/test_loader.py @@ -1,5 +1,6 @@ import compileall import os +from copy import deepcopy from importlib import import_module from django.db import connection, connections @@ -13,6 +14,7 @@ from django.test import TestCase, modify_settings, override_settings from clickhouse_backend import compat +from clickhouse_backend.backend.base import DatabaseWrapper from .test_base import MigrationTestBase @@ -51,6 +53,108 @@ def test_apply(self): ) +class DistributedMigrationTests(MigrationTestBase): + databases = ("default", "s2r1", "s1r2") + + lb = { + "ENGINE": "clickhouse_backend.backend", + "HOST": "localhost", + "USER": "default", + "PASSWORD": "clickhouse_password", + "PORT": 9004, + "NAME": "test_default", + "OPTIONS": { + "distributed_migrations": True, + "migration_cluster": "cluster", + "connections_min": 1, + "settings": { + "mutations_sync": 2, + "insert_distributed_sync": 1, + "insert_quorum": 2, + "alter_sync": 2, + "allow_suspicious_low_cardinality_types": 1, + "allow_experimental_object_type": 1, + }, + }, + "TEST": {"cluster": "cluster", "managed": False}, + "TIME_ZONE": None, + "AUTOCOMMIT": True, + "CONN_MAX_AGE": 0, + "CONN_HEALTH_CHECKS": True, + } + + def tearDown(self): + if "load_balancer" in connections: + # Remove the load balancer connection to avoid conflicts with other tests + connections["load_balancer"].close() + del connections["load_balancer"] + + def assertMigrationTablesExists(self, conn): + # simulate multiple attempts to read the migration tables + i = 5 + while i: + with conn.cursor() as cursor: + tables = conn.introspection.table_names(cursor) + self.assertIn("django_migrations", tables) + self.assertIn("distributed_django_migrations", tables) + i -= 1 + + def test_distributed_migration_schema_with_existing_migrations(self): + """ + Tests that migration tables are created in all nodes even if the django_migrations table already exists + """ + db = DatabaseWrapper(deepcopy(self.lb), alias="load_balancer") + connections["load_balancer"] = db + recorder = MigrationRecorder(db) + + recorder.ensure_schema() + + self.assertEqual(recorder.migration_qs.db, "load_balancer") + self.assertEqual( + recorder.migration_qs.model._meta.db_table, "distributed_django_migrations" + ) + + self.assertMigrationTablesExists(recorder.connection) + + def test_distributed_migration_schema_without_migrations(self): + """ + Tests that migration tables are created in all nodes when migration tables do not exist + """ + + for conn in self.databases: + recorder = MigrationRecorder(connections[conn]) + self.assertEqual(recorder.migration_qs.db, conn) + self.assertEqual( + recorder.migration_qs.model._meta.db_table, "django_migrations" + ) + with recorder.connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS django_migrations") + cursor.execute("DROP TABLE IF EXISTS distributed_django_migrations") + tables = recorder.connection.introspection.table_names(cursor) + self.assertNotIn("django_migrations", tables) + self.assertNotIn("distributed_django_migrations", tables) + + # node4 can throw error when trying to create django_migrations table, even if other nodes are ok + db = deepcopy(self.lb) + db["PORT"] = 9003 + del db["OPTIONS"]["distributed_migrations"] + recorder = MigrationRecorder(DatabaseWrapper(db, alias="node4")) + with recorder.connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS django_migrations") + cursor.execute("DROP TABLE IF EXISTS distributed_django_migrations") + tables = recorder.connection.introspection.table_names(cursor) + self.assertNotIn("django_migrations", tables) + self.assertNotIn("distributed_django_migrations", tables) + + db = DatabaseWrapper(deepcopy(self.lb), alias="load_balancer") + connections["load_balancer"] = db + recorder = MigrationRecorder(db) + + recorder.ensure_schema() + + self.assertMigrationTablesExists(recorder.connection) + + class LoaderTests(TestCase): """ Tests the disk and database loader, and running through migrations From f63869ec1757bf01f8a49c0b44671156d4268f88 Mon Sep 17 00:00:00 2001 From: Ronaldo Campos Date: Mon, 25 Aug 2025 16:51:20 +0100 Subject: [PATCH 02/10] add extra test and update README.md --- README.md | 28 +++++++++++++++++ tests/migrations/test_loader.py | 54 +++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/README.md b/README.md index 679cce3..20a88e2 100644 --- a/README.md +++ b/README.md @@ -525,6 +525,34 @@ Extra settings explanation: Do not hardcode database name when you define replicated table or distributed table. Because test database name is different from deployed database name. +#### Clickhouse cluster behind a load balancer + +If your clickhouse cluster is running behind a load balancer, you can optionally set `distributed_migrations` to `True` under database OPTIONS. +Then a distributed migration table will be created on all nodes of the cluster, and all migration operations will be performed on this +distributed migrations table instead of a local migrations table. Otherwise, sequentially running migrations will have no effect on other nodes. + +Configuration example: + +```python +DATABASES = { + "default": { + "HOST": "clickhouse-load-balancer", + "PORT": 9000, + "ENGINE": "clickhouse_backend.backend", + "OPTIONS": { + "migration_cluster": "cluster", + "distributed_migrations": True, + "settings": { + "mutations_sync": 2, + "insert_distributed_sync": 1, + "insert_quorum": 2, + "alter_sync": 2, + }, + }, + } +} +``` + ### Model `cluster` in `Meta` class will make models being created on cluster. diff --git a/tests/migrations/test_loader.py b/tests/migrations/test_loader.py index 6dfbfe6..2de5301 100644 --- a/tests/migrations/test_loader.py +++ b/tests/migrations/test_loader.py @@ -2,6 +2,7 @@ import os from copy import deepcopy from importlib import import_module +from time import sleep from django.db import connection, connections from django.db.migrations.exceptions import ( @@ -99,6 +100,28 @@ def assertMigrationTablesExists(self, conn): self.assertIn("distributed_django_migrations", tables) i -= 1 + def assertMigrationExists(self, conn, name, app, deleted=False, tries=5): + # simulate multiple attempts to read a migration from the distributed migration table + while tries: + with conn.cursor() as cursor: + cursor.execute(f"SELECT * FROM distributed_django_migrations where name = '{name}'") + res = cursor.fetchall() + + try: + self.assertEqual(len(res), 1) + except AssertionError: + # handle replication lag + if tries >= 1: + sleep(1) + tries -= 1 + continue + raise ValueError(f"Migration {name} for app {app} not found in distributed_django_migrations table") + + self.assertEqual(res[0][1], app) + self.assertEqual(res[0][2], name) + self.assertEqual(res[0][-1], deleted) + tries -= 1 + def test_distributed_migration_schema_with_existing_migrations(self): """ Tests that migration tables are created in all nodes even if the django_migrations table already exists @@ -154,6 +177,37 @@ def test_distributed_migration_schema_without_migrations(self): self.assertMigrationTablesExists(recorder.connection) + def test_apply_unapply_distributed(self): + """ + Tests marking migrations as applied/unapplied in a distributed setup. + """ + databases = [x for x in self.databases if x != "s1r2"] + + for db in databases: + conn = connections[db] + recorder = MigrationRecorder(conn) + recorder.flush() + + lb = DatabaseWrapper(deepcopy(self.lb), alias="load_balancer") + connections["load_balancer"] = lb + recorder = MigrationRecorder(lb) + + recorder.record_applied("myapp", "0432_ponies") + + for db in databases: + conn = connections[db] + self.assertMigrationExists(conn, "0432_ponies", "myapp", deleted=False) + + self.assertMigrationExists(connections['load_balancer'], "myapp", "0432_ponies") + + recorder.record_unapplied("myapp", "0432_ponies") + + for db in databases: + conn = connections[db] + self.assertMigrationExists(conn, "0432_ponies", "myapp", deleted=True) + + self.assertMigrationExists(connections['load_balancer'], "myapp", "0432_ponies", deleted=True) + class LoaderTests(TestCase): """ From b14830f7d89e68780371354498599f2533797374 Mon Sep 17 00:00:00 2001 From: Ronaldo Campos Date: Mon, 25 Aug 2025 16:57:58 +0100 Subject: [PATCH 03/10] fix --- tests/migrations/test_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/migrations/test_loader.py b/tests/migrations/test_loader.py index 2de5301..3098959 100644 --- a/tests/migrations/test_loader.py +++ b/tests/migrations/test_loader.py @@ -206,7 +206,7 @@ def test_apply_unapply_distributed(self): conn = connections[db] self.assertMigrationExists(conn, "0432_ponies", "myapp", deleted=True) - self.assertMigrationExists(connections['load_balancer'], "myapp", "0432_ponies", deleted=True) + self.assertMigrationExists(connections['load_balancer'], "0432_ponies", "myapp", deleted=True) class LoaderTests(TestCase): From 8cfd5e9b90d52bdac30e80c879ac6e9e94751326 Mon Sep 17 00:00:00 2001 From: Ronaldo Campos Date: Thu, 28 Aug 2025 17:23:13 +0100 Subject: [PATCH 04/10] fix --- CHANGELOG.md | 1 + compose.yaml | 2 - proxy-config/haproxy.cfg | 4 +- tests/migrations/test_loader.py | 66 ++++++++++++++------------------- 4 files changed, 30 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c103702..a6bbfe8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ### 1.5.0 +- feat: #130: Add `distributed_migrations` database setting to support distributed migration queries. - feat: #129: Add `toYearWeek` datetime functionality ### 1.4.0 diff --git a/compose.yaml b/compose.yaml index c2daa51..977097d 100644 --- a/compose.yaml +++ b/compose.yaml @@ -62,8 +62,6 @@ services: - "127.0.0.1:8127:8123" - "127.0.0.1:9004:9000" depends_on: - node1: - condition: service_healthy node3: condition: service_healthy diff --git a/proxy-config/haproxy.cfg b/proxy-config/haproxy.cfg index 983aef8..8ada635 100644 --- a/proxy-config/haproxy.cfg +++ b/proxy-config/haproxy.cfg @@ -9,8 +9,7 @@ backend clickhouse_tcp_nodes balance roundrobin option tcp-check tcp-check connect - server ch1 clickhouse-node1:9000 check - server ch2 clickhouse-node3:9000 check + server ch1 clickhouse-node3:9000 check # HTTP frontend for ClickHouse's HTTP interface frontend clickhouse_http @@ -22,5 +21,4 @@ backend clickhouse_http_nodes mode http balance roundrobin option httpchk GET /ping - server ch1 clickhouse-node1:8123 check server ch2 clickhouse-node3:8123 check diff --git a/tests/migrations/test_loader.py b/tests/migrations/test_loader.py index 3098959..7c0d824 100644 --- a/tests/migrations/test_loader.py +++ b/tests/migrations/test_loader.py @@ -90,37 +90,29 @@ def tearDown(self): connections["load_balancer"].close() del connections["load_balancer"] - def assertMigrationTablesExists(self, conn): - # simulate multiple attempts to read the migration tables - i = 5 - while i: - with conn.cursor() as cursor: - tables = conn.introspection.table_names(cursor) - self.assertIn("django_migrations", tables) - self.assertIn("distributed_django_migrations", tables) - i -= 1 - - def assertMigrationExists(self, conn, name, app, deleted=False, tries=5): - # simulate multiple attempts to read a migration from the distributed migration table - while tries: - with conn.cursor() as cursor: - cursor.execute(f"SELECT * FROM distributed_django_migrations where name = '{name}'") - res = cursor.fetchall() - - try: - self.assertEqual(len(res), 1) - except AssertionError: - # handle replication lag - if tries >= 1: - sleep(1) - tries -= 1 - continue - raise ValueError(f"Migration {name} for app {app} not found in distributed_django_migrations table") - - self.assertEqual(res[0][1], app) - self.assertEqual(res[0][2], name) - self.assertEqual(res[0][-1], deleted) - tries -= 1 + def assertMigrationTablesExists(self): + for db in self.databases: + conn = connections[db] + tables = conn.introspection.table_names() + self.assertIn("django_migrations", tables, f"django_migrations table not found in {conn.alias}") + self.assertIn("distributed_django_migrations", tables, f"distributed_django_migrations table not found in {conn.alias}") + + def assertMigrationExists(self, conn, name, app, deleted=False, tries=10): + with conn.cursor() as cursor: + cursor.execute(f"SELECT * FROM distributed_django_migrations where name = '{name}'") + res = cursor.fetchall() + + if not res and tries > 1: + # sometimes the migration is not immediately visible, wait a bit and try again + print(f"Migration {name} not found in {conn.alias}, retrying...") + sleep(1) + return self.assertMigrationExists(conn, name, app, deleted, tries - 1) + + self.assertEqual(len(res), 1, f"Migration {name} not found in {conn.alias}") + + self.assertEqual(res[0][1], app) + self.assertEqual(res[0][2], name) + self.assertEqual(res[0][-1], deleted) def test_distributed_migration_schema_with_existing_migrations(self): """ @@ -137,7 +129,7 @@ def test_distributed_migration_schema_with_existing_migrations(self): recorder.migration_qs.model._meta.db_table, "distributed_django_migrations" ) - self.assertMigrationTablesExists(recorder.connection) + self.assertMigrationTablesExists() def test_distributed_migration_schema_without_migrations(self): """ @@ -175,13 +167,13 @@ def test_distributed_migration_schema_without_migrations(self): recorder.ensure_schema() - self.assertMigrationTablesExists(recorder.connection) + self.assertMigrationTablesExists() def test_apply_unapply_distributed(self): """ Tests marking migrations as applied/unapplied in a distributed setup. """ - databases = [x for x in self.databases if x != "s1r2"] + databases = [x for x in self.databases] for db in databases: conn = connections[db] @@ -190,6 +182,8 @@ def test_apply_unapply_distributed(self): lb = DatabaseWrapper(deepcopy(self.lb), alias="load_balancer") connections["load_balancer"] = lb + databases.append("load_balancer") + recorder = MigrationRecorder(lb) recorder.record_applied("myapp", "0432_ponies") @@ -198,16 +192,12 @@ def test_apply_unapply_distributed(self): conn = connections[db] self.assertMigrationExists(conn, "0432_ponies", "myapp", deleted=False) - self.assertMigrationExists(connections['load_balancer'], "myapp", "0432_ponies") - recorder.record_unapplied("myapp", "0432_ponies") for db in databases: conn = connections[db] self.assertMigrationExists(conn, "0432_ponies", "myapp", deleted=True) - self.assertMigrationExists(connections['load_balancer'], "0432_ponies", "myapp", deleted=True) - class LoaderTests(TestCase): """ From a386db62e9979129e09dc4b4b77c6cfbfe3b35c8 Mon Sep 17 00:00:00 2001 From: Ronaldo Campos Date: Thu, 28 Aug 2025 18:49:12 +0100 Subject: [PATCH 05/10] fix --- clickhouse_backend/patch/migrations.py | 2 +- compose.yaml | 2 + proxy-config/haproxy.cfg | 4 +- tests/migrations/test_loader.py | 60 ++++++++++++-------------- 4 files changed, 33 insertions(+), 35 deletions(-) diff --git a/clickhouse_backend/patch/migrations.py b/clickhouse_backend/patch/migrations.py index 2203491..4126d00 100644 --- a/clickhouse_backend/patch/migrations.py +++ b/clickhouse_backend/patch/migrations.py @@ -67,7 +67,7 @@ class Meta: apps = Apps() app_label = "migrations" db_table = "django_migrations" - engine = models.MergeTree(order_by=("app", "name")) + engine = models.ReplicatedMergeTree(order_by=("app", "name")) cluster = getattr(self.connection, "migration_cluster") def __str__(self): diff --git a/compose.yaml b/compose.yaml index 977097d..c2daa51 100644 --- a/compose.yaml +++ b/compose.yaml @@ -62,6 +62,8 @@ services: - "127.0.0.1:8127:8123" - "127.0.0.1:9004:9000" depends_on: + node1: + condition: service_healthy node3: condition: service_healthy diff --git a/proxy-config/haproxy.cfg b/proxy-config/haproxy.cfg index 8ada635..983aef8 100644 --- a/proxy-config/haproxy.cfg +++ b/proxy-config/haproxy.cfg @@ -9,7 +9,8 @@ backend clickhouse_tcp_nodes balance roundrobin option tcp-check tcp-check connect - server ch1 clickhouse-node3:9000 check + server ch1 clickhouse-node1:9000 check + server ch2 clickhouse-node3:9000 check # HTTP frontend for ClickHouse's HTTP interface frontend clickhouse_http @@ -21,4 +22,5 @@ backend clickhouse_http_nodes mode http balance roundrobin option httpchk GET /ping + server ch1 clickhouse-node1:8123 check server ch2 clickhouse-node3:8123 check diff --git a/tests/migrations/test_loader.py b/tests/migrations/test_loader.py index 7c0d824..33fd61f 100644 --- a/tests/migrations/test_loader.py +++ b/tests/migrations/test_loader.py @@ -97,45 +97,18 @@ def assertMigrationTablesExists(self): self.assertIn("django_migrations", tables, f"django_migrations table not found in {conn.alias}") self.assertIn("distributed_django_migrations", tables, f"distributed_django_migrations table not found in {conn.alias}") - def assertMigrationExists(self, conn, name, app, deleted=False, tries=10): + def assertMigrationExists(self, conn, name, app, deleted=False): with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM distributed_django_migrations where name = '{name}'") res = cursor.fetchall() - if not res and tries > 1: - # sometimes the migration is not immediately visible, wait a bit and try again - print(f"Migration {name} not found in {conn.alias}, retrying...") - sleep(1) - return self.assertMigrationExists(conn, name, app, deleted, tries - 1) - self.assertEqual(len(res), 1, f"Migration {name} not found in {conn.alias}") self.assertEqual(res[0][1], app) self.assertEqual(res[0][2], name) self.assertEqual(res[0][-1], deleted) - def test_distributed_migration_schema_with_existing_migrations(self): - """ - Tests that migration tables are created in all nodes even if the django_migrations table already exists - """ - db = DatabaseWrapper(deepcopy(self.lb), alias="load_balancer") - connections["load_balancer"] = db - recorder = MigrationRecorder(db) - - recorder.ensure_schema() - - self.assertEqual(recorder.migration_qs.db, "load_balancer") - self.assertEqual( - recorder.migration_qs.model._meta.db_table, "distributed_django_migrations" - ) - - self.assertMigrationTablesExists() - - def test_distributed_migration_schema_without_migrations(self): - """ - Tests that migration tables are created in all nodes when migration tables do not exist - """ - + def _drop_migrations(self): for conn in self.databases: recorder = MigrationRecorder(connections[conn]) self.assertEqual(recorder.migration_qs.db, conn) @@ -161,6 +134,30 @@ def test_distributed_migration_schema_without_migrations(self): self.assertNotIn("django_migrations", tables) self.assertNotIn("distributed_django_migrations", tables) + def test_distributed_migration_schema_with_existing_migrations(self): + """ + Tests that migration tables are created in all nodes even if the django_migrations table already exists + """ + db = DatabaseWrapper(deepcopy(self.lb), alias="load_balancer") + connections["load_balancer"] = db + recorder = MigrationRecorder(db) + + recorder.ensure_schema() + + self.assertEqual(recorder.migration_qs.db, "load_balancer") + self.assertEqual( + recorder.migration_qs.model._meta.db_table, "distributed_django_migrations" + ) + + self.assertMigrationTablesExists() + + def test_distributed_migration_schema_without_migrations(self): + """ + Tests that migration tables are created in all nodes when migration tables do not exist + """ + + self._drop_migrations() + db = DatabaseWrapper(deepcopy(self.lb), alias="load_balancer") connections["load_balancer"] = db recorder = MigrationRecorder(db) @@ -175,10 +172,7 @@ def test_apply_unapply_distributed(self): """ databases = [x for x in self.databases] - for db in databases: - conn = connections[db] - recorder = MigrationRecorder(conn) - recorder.flush() + self._drop_migrations() lb = DatabaseWrapper(deepcopy(self.lb), alias="load_balancer") connections["load_balancer"] = lb From 1e15bc94e74fc09ed9590f409fb24fc0d990d20b Mon Sep 17 00:00:00 2001 From: Ronaldo Campos Date: Mon, 8 Sep 2025 20:10:09 +0100 Subject: [PATCH 06/10] fix --- clickhouse_backend/patch/migrations.py | 34 ++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/clickhouse_backend/patch/migrations.py b/clickhouse_backend/patch/migrations.py index 4126d00..9b68b3a 100644 --- a/clickhouse_backend/patch/migrations.py +++ b/clickhouse_backend/patch/migrations.py @@ -36,6 +36,21 @@ def _get_model_table_name(connection): return "django_migrations" +def _check_replicas(connection): + """ + Check if the connection has replicas configured for the migration cluster. + """ + if hasattr(connection, "has_replicas"): + return connection.has_replicas + + with connection.cursor() as cursor: + cursor.execute( + f"select replica_num from system.clusters where cluster={connection.migration_cluster}" + ) + (replica_count,) = cursor.fetchone() + return replica_count >= 1 + + def patch_migrations(): patch_migration_recorder() patch_migration() @@ -57,6 +72,15 @@ def Migration(self): # otherwise, create a regular merge tree. if _should_distribute_migrations(self.connection): + has_replicas = _check_replicas(self.connection) + + Engine = models.MergeTree + if has_replicas: + Engine = models.ReplicatedMergeTree + self.connection.has_replicas = True + else: + self.connection.has_replicas = False + class _Migration(models.ClickhouseModel): app = models.StringField(max_length=255) name = models.StringField(max_length=255) @@ -67,8 +91,8 @@ class Meta: apps = Apps() app_label = "migrations" db_table = "django_migrations" - engine = models.ReplicatedMergeTree(order_by=("app", "name")) - cluster = getattr(self.connection, "migration_cluster") + engine = Engine(order_by=("app", "name")) + cluster = self.connection.migration_cluster def __str__(self): return "Migration %s for %s" % (self.name, self.app) @@ -84,12 +108,12 @@ class Meta: app_label = "migrations" db_table = _get_model_table_name(self.connection) engine = models.Distributed( - getattr(self.connection, "migration_cluster"), + self.connection.migration_cluster, currentDatabase(), _Migration._meta.db_table, models.Rand(), ) - cluster = getattr(self.connection, "migration_cluster") + cluster = self.connection.migration_cluster Migration._meta.local_model_class = _Migration @@ -106,7 +130,7 @@ class Meta: app_label = "migrations" db_table = _get_model_table_name(self.connection) engine = models.MergeTree(order_by=("app", "name")) - cluster = getattr(self.connection, "migration_cluster") + cluster = getattr(self.connection, "migration_cluster", None) else: From b4ed1444c93f979076b60ebf4b2aca29a5b4acc5 Mon Sep 17 00:00:00 2001 From: Ronaldo Campos Date: Mon, 8 Sep 2025 20:16:07 +0100 Subject: [PATCH 07/10] fix --- clickhouse_backend/patch/migrations.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/clickhouse_backend/patch/migrations.py b/clickhouse_backend/patch/migrations.py index 9b68b3a..f2fdddf 100644 --- a/clickhouse_backend/patch/migrations.py +++ b/clickhouse_backend/patch/migrations.py @@ -132,6 +132,9 @@ class Meta: engine = models.MergeTree(order_by=("app", "name")) cluster = getattr(self.connection, "migration_cluster", None) + def __str__(self): + return "Migration %s for %s" % (self.name, self.app) + else: class Migration(django_models.Model): From 478e0e6c9ba78a2a6733e5ff2f2d82c582c67f05 Mon Sep 17 00:00:00 2001 From: Ronaldo Campos Date: Mon, 8 Sep 2025 20:30:02 +0100 Subject: [PATCH 08/10] lint --- clickhouse_backend/patch/migrations.py | 5 +++-- tests/migrations/test_loader.py | 16 +++++++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/clickhouse_backend/patch/migrations.py b/clickhouse_backend/patch/migrations.py index f2fdddf..5dc51d8 100644 --- a/clickhouse_backend/patch/migrations.py +++ b/clickhouse_backend/patch/migrations.py @@ -71,7 +71,6 @@ def Migration(self): # has distributed migrations enabled and a migration cluster is set. # otherwise, create a regular merge tree. if _should_distribute_migrations(self.connection): - has_replicas = _check_replicas(self.connection) Engine = models.MergeTree @@ -130,7 +129,9 @@ class Meta: app_label = "migrations" db_table = _get_model_table_name(self.connection) engine = models.MergeTree(order_by=("app", "name")) - cluster = getattr(self.connection, "migration_cluster", None) + cluster = getattr( + self.connection, "migration_cluster", None + ) def __str__(self): return "Migration %s for %s" % (self.name, self.app) diff --git a/tests/migrations/test_loader.py b/tests/migrations/test_loader.py index 33fd61f..2dc3348 100644 --- a/tests/migrations/test_loader.py +++ b/tests/migrations/test_loader.py @@ -94,12 +94,22 @@ def assertMigrationTablesExists(self): for db in self.databases: conn = connections[db] tables = conn.introspection.table_names() - self.assertIn("django_migrations", tables, f"django_migrations table not found in {conn.alias}") - self.assertIn("distributed_django_migrations", tables, f"distributed_django_migrations table not found in {conn.alias}") + self.assertIn( + "django_migrations", + tables, + f"django_migrations table not found in {conn.alias}", + ) + self.assertIn( + "distributed_django_migrations", + tables, + f"distributed_django_migrations table not found in {conn.alias}", + ) def assertMigrationExists(self, conn, name, app, deleted=False): with conn.cursor() as cursor: - cursor.execute(f"SELECT * FROM distributed_django_migrations where name = '{name}'") + cursor.execute( + f"SELECT * FROM distributed_django_migrations where name = '{name}'" + ) res = cursor.fetchall() self.assertEqual(len(res), 1, f"Migration {name} not found in {conn.alias}") From bbf6df8fdbe260f4873bff32fe7d60c0b963f489 Mon Sep 17 00:00:00 2001 From: Ronaldo Campos Date: Tue, 9 Sep 2025 19:39:01 +0100 Subject: [PATCH 09/10] simplify --- clickhouse_backend/patch/migrations.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/clickhouse_backend/patch/migrations.py b/clickhouse_backend/patch/migrations.py index 5dc51d8..e4e3f15 100644 --- a/clickhouse_backend/patch/migrations.py +++ b/clickhouse_backend/patch/migrations.py @@ -76,9 +76,8 @@ def Migration(self): Engine = models.MergeTree if has_replicas: Engine = models.ReplicatedMergeTree - self.connection.has_replicas = True - else: - self.connection.has_replicas = False + + self.connection.has_replicas = has_replicas class _Migration(models.ClickhouseModel): app = models.StringField(max_length=255) From 60572dcd411025cce9ef9d3a1be3de4cf877b8fb Mon Sep 17 00:00:00 2001 From: Ronaldo Campos Date: Mon, 15 Sep 2025 17:06:30 +0100 Subject: [PATCH 10/10] remove unused --- tests/migrations/test_loader.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/migrations/test_loader.py b/tests/migrations/test_loader.py index 2dc3348..1110a0e 100644 --- a/tests/migrations/test_loader.py +++ b/tests/migrations/test_loader.py @@ -2,7 +2,6 @@ import os from copy import deepcopy from importlib import import_module -from time import sleep from django.db import connection, connections from django.db.migrations.exceptions import (