diff --git a/CHANGELOG.md b/CHANGELOG.md index c103702..a6bbfe8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ### 1.5.0 +- feat: #130: Add `distributed_migrations` database setting to support distributed migration queries. - feat: #129: Add `toYearWeek` datetime functionality ### 1.4.0 diff --git a/README.md b/README.md index 679cce3..20a88e2 100644 --- a/README.md +++ b/README.md @@ -525,6 +525,34 @@ Extra settings explanation: Do not hardcode database name when you define replicated table or distributed table. Because test database name is different from deployed database name. +#### Clickhouse cluster behind a load balancer + +If your clickhouse cluster is running behind a load balancer, you can optionally set `distributed_migrations` to `True` under database OPTIONS. +Then a distributed migration table will be created on all nodes of the cluster, and all migration operations will be performed on this +distributed migrations table instead of a local migrations table. Otherwise, sequentially running migrations will have no effect on other nodes. + +Configuration example: + +```python +DATABASES = { + "default": { + "HOST": "clickhouse-load-balancer", + "PORT": 9000, + "ENGINE": "clickhouse_backend.backend", + "OPTIONS": { + "migration_cluster": "cluster", + "distributed_migrations": True, + "settings": { + "mutations_sync": 2, + "insert_distributed_sync": 1, + "insert_quorum": 2, + "alter_sync": 2, + }, + }, + } +} +``` + ### Model `cluster` in `Meta` class will make models being created on cluster. diff --git a/clickhouse_backend/backend/base.py b/clickhouse_backend/backend/base.py index 3e382fa..152191b 100644 --- a/clickhouse_backend/backend/base.py +++ b/clickhouse_backend/backend/base.py @@ -163,6 +163,9 @@ def __init__(self, settings_dict, alias=DEFAULT_DB_ALIAS): self.migration_cluster = self.settings_dict["OPTIONS"].pop( "migration_cluster", None ) + self.distributed_migrations = self.settings_dict["OPTIONS"].pop( + "distributed_migrations", None + ) # https://clickhouse-driver.readthedocs.io/en/latest/quickstart.html#streaming-results self.max_block_size = self.settings_dict["OPTIONS"].pop("max_block_size", 65409) if not self.settings_dict["NAME"]: diff --git a/clickhouse_backend/patch/migrations.py b/clickhouse_backend/patch/migrations.py index bdf2bd9..e4e3f15 100644 --- a/clickhouse_backend/patch/migrations.py +++ b/clickhouse_backend/patch/migrations.py @@ -1,7 +1,8 @@ from django.apps.registry import Apps +from django.db import DatabaseError from django.db import models as django_models from django.db.migrations import Migration -from django.db.migrations.exceptions import IrreversibleError +from django.db.migrations.exceptions import IrreversibleError, MigrationSchemaMissing from django.db.migrations.operations.fields import FieldOperation from django.db.migrations.operations.models import ( DeleteModel, @@ -15,6 +16,41 @@ __all__ = ["patch_migrations", "patch_migration_recorder", "patch_migration"] +def _should_distribute_migrations(connection): + """ + Check if the connection is configured for distributed migrations. + """ + return getattr(connection, "distributed_migrations", False) and getattr( + connection, "migration_cluster", None + ) + + +def _get_model_table_name(connection): + """ + Return the name of the table that will be used by the MigrationRecorder. + If distributed migrations are enabled, return the distributed table name. + Otherwise, return the regular django_migrations table name. + """ + if _should_distribute_migrations(connection): + return "distributed_django_migrations" + return "django_migrations" + + +def _check_replicas(connection): + """ + Check if the connection has replicas configured for the migration cluster. + """ + if hasattr(connection, "has_replicas"): + return connection.has_replicas + + with connection.cursor() as cursor: + cursor.execute( + f"select replica_num from system.clusters where cluster={connection.migration_cluster}" + ) + (replica_count,) = cursor.fetchone() + return replica_count >= 1 + + def patch_migrations(): patch_migration_recorder() patch_migration() @@ -29,22 +65,75 @@ def Migration(self): if self._migration_class is None: if self.connection.vendor == "clickhouse": from clickhouse_backend import models + from clickhouse_backend.models import currentDatabase - class Migration(models.ClickhouseModel): - app = models.StringField(max_length=255) - name = models.StringField(max_length=255) - applied = models.DateTime64Field(default=now) - deleted = models.BoolField(default=False) + # Only create a distributed migration model if the connection + # has distributed migrations enabled and a migration cluster is set. + # otherwise, create a regular merge tree. + if _should_distribute_migrations(self.connection): + has_replicas = _check_replicas(self.connection) - class Meta: - apps = Apps() - app_label = "migrations" - db_table = "django_migrations" - engine = models.MergeTree(order_by=("app", "name")) - cluster = getattr(self.connection, "migration_cluster", None) + Engine = models.MergeTree + if has_replicas: + Engine = models.ReplicatedMergeTree - def __str__(self): - return "Migration %s for %s" % (self.name, self.app) + self.connection.has_replicas = has_replicas + + class _Migration(models.ClickhouseModel): + app = models.StringField(max_length=255) + name = models.StringField(max_length=255) + applied = models.DateTime64Field(default=now) + deleted = models.BoolField(default=False) + + class Meta: + apps = Apps() + app_label = "migrations" + db_table = "django_migrations" + engine = Engine(order_by=("app", "name")) + cluster = self.connection.migration_cluster + + def __str__(self): + return "Migration %s for %s" % (self.name, self.app) + + class Migration(models.ClickhouseModel): + app = models.StringField(max_length=255) + name = models.StringField(max_length=255) + applied = models.DateTime64Field(default=now) + deleted = models.BoolField(default=False) + + class Meta: + apps = Apps() + app_label = "migrations" + db_table = _get_model_table_name(self.connection) + engine = models.Distributed( + self.connection.migration_cluster, + currentDatabase(), + _Migration._meta.db_table, + models.Rand(), + ) + cluster = self.connection.migration_cluster + + Migration._meta.local_model_class = _Migration + + else: + + class Migration(models.ClickhouseModel): + app = models.StringField(max_length=255) + name = models.StringField(max_length=255) + applied = models.DateTime64Field(default=now) + deleted = models.BoolField(default=False) + + class Meta: + apps = Apps() + app_label = "migrations" + db_table = _get_model_table_name(self.connection) + engine = models.MergeTree(order_by=("app", "name")) + cluster = getattr( + self.connection, "migration_cluster", None + ) + + def __str__(self): + return "Migration %s for %s" % (self.name, self.app) else: @@ -69,15 +158,45 @@ def has_table(self): # Assert migration table won't be deleted once created. if not getattr(self, "_has_table", False): with self.connection.cursor() as cursor: + table = self.Migration._meta.db_table tables = self.connection.introspection.table_names(cursor) - self._has_table = self.Migration._meta.db_table in tables + self._has_table = table in tables if self._has_table and self.connection.vendor == "clickhouse": # fix https://github.com/jayvynl/django-clickhouse-backend/issues/51 cursor.execute( - "ALTER table django_migrations ADD COLUMN IF NOT EXISTS deleted Bool" + f"ALTER table {table} ADD COLUMN IF NOT EXISTS deleted Bool" ) return self._has_table + def ensure_schema(self): + """Ensure the table exists and has the correct schema.""" + # If the table's there, that's fine - we've never changed its schema + # in the codebase. + if self.has_table(): + return + + # In case of distributed migrations, we need to ensure the local model exists first and + # then create the distributed model. + try: + with self.connection.schema_editor() as editor: + if ( + editor.connection.vendor == "clickhouse" + and _should_distribute_migrations(editor.connection) + ): + with editor.connection.cursor() as cursor: + tables = editor.connection.introspection.table_names(cursor) + local_model_class = self.Migration._meta.local_model_class + local_table = local_model_class._meta.db_table + if local_table not in tables: + # Create the local model first + editor.create_model(self.Migration._meta.local_model_class) + + editor.create_model(self.Migration) + except DatabaseError as exc: + raise MigrationSchemaMissing( + "Unable to create the django_migrations table (%s)" % exc + ) + def migration_qs(self): if self.connection.vendor == "clickhouse": return self.Migration.objects.using(self.connection.alias).filter( @@ -118,6 +237,7 @@ def flush(self): MigrationRecorder.Migration = property(Migration) MigrationRecorder.has_table = has_table + MigrationRecorder.ensure_schema = ensure_schema MigrationRecorder.migration_qs = property(migration_qs) MigrationRecorder.record_applied = record_applied MigrationRecorder.record_unapplied = record_unapplied @@ -136,13 +256,15 @@ def apply(self, project_state, schema_editor, collect_sql=False): """ applied_on_remote = False if getattr(schema_editor.connection, "migration_cluster", None): + _table = _get_model_table_name(schema_editor.connection) + with schema_editor.connection.cursor() as cursor: cursor.execute( "select EXISTS(select 1 from clusterAllReplicas(%s, currentDatabase(), %s)" " where app=%s and name=%s and deleted=false)", [ schema_editor.connection.migration_cluster, - "django_migrations", + _table, self.app_label, self.name, ], @@ -203,13 +325,15 @@ def unapply(self, project_state, schema_editor, collect_sql=False): """ unapplied_on_remote = False if getattr(schema_editor.connection, "migration_cluster", None): + _table = _get_model_table_name(schema_editor.connection) + with schema_editor.connection.cursor() as cursor: cursor.execute( "select EXISTS(select 1 from clusterAllReplicas(%s, currentDatabase(), %s)" " where app=%s and name=%s and deleted=true)", [ schema_editor.connection.migration_cluster, - "django_migrations", + _table, self.app_label, self.name, ], diff --git a/compose.yaml b/compose.yaml index 9e3c6d4..c2daa51 100644 --- a/compose.yaml +++ b/compose.yaml @@ -21,6 +21,7 @@ services: - "node1:/var/lib/clickhouse/" - "./clickhouse-config/node1/:/etc/clickhouse-server/config.d/" ports: + - "127.0.0.1:8123:8123" - "127.0.0.1:9000:9000" node2: <<: *base-service @@ -29,6 +30,7 @@ services: - "node2:/var/lib/clickhouse/" - "./clickhouse-config/node2/:/etc/clickhouse-server/config.d/" ports: + - "127.0.0.1:8124:8123" - "127.0.0.1:9001:9000" node3: <<: *base-service @@ -37,6 +39,7 @@ services: - "node3:/var/lib/clickhouse/" - "./clickhouse-config/node3/:/etc/clickhouse-server/config.d/" ports: + - "127.0.0.1:8125:8123" - "127.0.0.1:9002:9000" node4: <<: *base-service @@ -45,8 +48,25 @@ services: - "node4:/var/lib/clickhouse/" - "./clickhouse-config/node4/:/etc/clickhouse-server/config.d/" ports: + - "127.0.0.1:8126:8123" - "127.0.0.1:9003:9000" + haproxy: + image: haproxy:latest + container_name: clickhouse-haproxy + command: sh -c "haproxy -f /usr/local/etc/haproxy/haproxy.cfg" + restart: always + volumes: + - "./proxy-config/haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg" + ports: + - "127.0.0.1:8127:8123" + - "127.0.0.1:9004:9000" + depends_on: + node1: + condition: service_healthy + node3: + condition: service_healthy + volumes: node1: name: clickhouse-node1 diff --git a/proxy-config/haproxy.cfg b/proxy-config/haproxy.cfg new file mode 100644 index 0000000..983aef8 --- /dev/null +++ b/proxy-config/haproxy.cfg @@ -0,0 +1,26 @@ +# TCP frontend for native ClickHouse protocol +frontend clickhouse_tcp + bind *:9000 + mode tcp + default_backend clickhouse_tcp_nodes + +backend clickhouse_tcp_nodes + mode tcp + balance roundrobin + option tcp-check + tcp-check connect + server ch1 clickhouse-node1:9000 check + server ch2 clickhouse-node3:9000 check + +# HTTP frontend for ClickHouse's HTTP interface +frontend clickhouse_http + bind *:8123 + mode http + default_backend clickhouse_http_nodes + +backend clickhouse_http_nodes + mode http + balance roundrobin + option httpchk GET /ping + server ch1 clickhouse-node1:8123 check + server ch2 clickhouse-node3:8123 check diff --git a/tests/migrations/test_loader.py b/tests/migrations/test_loader.py index dd448e7..1110a0e 100644 --- a/tests/migrations/test_loader.py +++ b/tests/migrations/test_loader.py @@ -1,5 +1,6 @@ import compileall import os +from copy import deepcopy from importlib import import_module from django.db import connection, connections @@ -13,6 +14,7 @@ from django.test import TestCase, modify_settings, override_settings from clickhouse_backend import compat +from clickhouse_backend.backend.base import DatabaseWrapper from .test_base import MigrationTestBase @@ -51,6 +53,155 @@ def test_apply(self): ) +class DistributedMigrationTests(MigrationTestBase): + databases = ("default", "s2r1", "s1r2") + + lb = { + "ENGINE": "clickhouse_backend.backend", + "HOST": "localhost", + "USER": "default", + "PASSWORD": "clickhouse_password", + "PORT": 9004, + "NAME": "test_default", + "OPTIONS": { + "distributed_migrations": True, + "migration_cluster": "cluster", + "connections_min": 1, + "settings": { + "mutations_sync": 2, + "insert_distributed_sync": 1, + "insert_quorum": 2, + "alter_sync": 2, + "allow_suspicious_low_cardinality_types": 1, + "allow_experimental_object_type": 1, + }, + }, + "TEST": {"cluster": "cluster", "managed": False}, + "TIME_ZONE": None, + "AUTOCOMMIT": True, + "CONN_MAX_AGE": 0, + "CONN_HEALTH_CHECKS": True, + } + + def tearDown(self): + if "load_balancer" in connections: + # Remove the load balancer connection to avoid conflicts with other tests + connections["load_balancer"].close() + del connections["load_balancer"] + + def assertMigrationTablesExists(self): + for db in self.databases: + conn = connections[db] + tables = conn.introspection.table_names() + self.assertIn( + "django_migrations", + tables, + f"django_migrations table not found in {conn.alias}", + ) + self.assertIn( + "distributed_django_migrations", + tables, + f"distributed_django_migrations table not found in {conn.alias}", + ) + + def assertMigrationExists(self, conn, name, app, deleted=False): + with conn.cursor() as cursor: + cursor.execute( + f"SELECT * FROM distributed_django_migrations where name = '{name}'" + ) + res = cursor.fetchall() + + self.assertEqual(len(res), 1, f"Migration {name} not found in {conn.alias}") + + self.assertEqual(res[0][1], app) + self.assertEqual(res[0][2], name) + self.assertEqual(res[0][-1], deleted) + + def _drop_migrations(self): + for conn in self.databases: + recorder = MigrationRecorder(connections[conn]) + self.assertEqual(recorder.migration_qs.db, conn) + self.assertEqual( + recorder.migration_qs.model._meta.db_table, "django_migrations" + ) + with recorder.connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS django_migrations") + cursor.execute("DROP TABLE IF EXISTS distributed_django_migrations") + tables = recorder.connection.introspection.table_names(cursor) + self.assertNotIn("django_migrations", tables) + self.assertNotIn("distributed_django_migrations", tables) + + # node4 can throw error when trying to create django_migrations table, even if other nodes are ok + db = deepcopy(self.lb) + db["PORT"] = 9003 + del db["OPTIONS"]["distributed_migrations"] + recorder = MigrationRecorder(DatabaseWrapper(db, alias="node4")) + with recorder.connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS django_migrations") + cursor.execute("DROP TABLE IF EXISTS distributed_django_migrations") + tables = recorder.connection.introspection.table_names(cursor) + self.assertNotIn("django_migrations", tables) + self.assertNotIn("distributed_django_migrations", tables) + + def test_distributed_migration_schema_with_existing_migrations(self): + """ + Tests that migration tables are created in all nodes even if the django_migrations table already exists + """ + db = DatabaseWrapper(deepcopy(self.lb), alias="load_balancer") + connections["load_balancer"] = db + recorder = MigrationRecorder(db) + + recorder.ensure_schema() + + self.assertEqual(recorder.migration_qs.db, "load_balancer") + self.assertEqual( + recorder.migration_qs.model._meta.db_table, "distributed_django_migrations" + ) + + self.assertMigrationTablesExists() + + def test_distributed_migration_schema_without_migrations(self): + """ + Tests that migration tables are created in all nodes when migration tables do not exist + """ + + self._drop_migrations() + + db = DatabaseWrapper(deepcopy(self.lb), alias="load_balancer") + connections["load_balancer"] = db + recorder = MigrationRecorder(db) + + recorder.ensure_schema() + + self.assertMigrationTablesExists() + + def test_apply_unapply_distributed(self): + """ + Tests marking migrations as applied/unapplied in a distributed setup. + """ + databases = [x for x in self.databases] + + self._drop_migrations() + + lb = DatabaseWrapper(deepcopy(self.lb), alias="load_balancer") + connections["load_balancer"] = lb + databases.append("load_balancer") + + recorder = MigrationRecorder(lb) + + recorder.record_applied("myapp", "0432_ponies") + + for db in databases: + conn = connections[db] + self.assertMigrationExists(conn, "0432_ponies", "myapp", deleted=False) + + recorder.record_unapplied("myapp", "0432_ponies") + + for db in databases: + conn = connections[db] + self.assertMigrationExists(conn, "0432_ponies", "myapp", deleted=True) + + class LoaderTests(TestCase): """ Tests the disk and database loader, and running through migrations