Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,34 @@ Extra settings explanation:
Do not hardcode database name when you define replicated table or distributed table.
Because test database name is different from deployed database name.

#### Clickhouse cluster behind a load balancer

If your clickhouse cluster is running behind a load balancer, you can optionally set `distributed_migrations` to `True` under database OPTIONS.
Then a distributed migration table will be created on all nodes of the cluster, and all migration operations will be performed on this
distributed migrations table instead of a local migrations table. Otherwise, sequentially running migrations will have no effect on other nodes.

Configuration example:

```python
DATABASES = {
"default": {
"HOST": "clickhouse-load-balancer",
"PORT": 9000,
"ENGINE": "clickhouse_backend.backend",
"OPTIONS": {
"migration_cluster": "cluster",
"distributed_migrations": True,
"settings": {
"mutations_sync": 2,
"insert_distributed_sync": 1,
"insert_quorum": 2,
"alter_sync": 2,
},
},
}
}
```

### Model

`cluster` in `Meta` class will make models being created on cluster.
Expand Down
3 changes: 3 additions & 0 deletions clickhouse_backend/backend/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ def __init__(self, settings_dict, alias=DEFAULT_DB_ALIAS):
self.migration_cluster = self.settings_dict["OPTIONS"].pop(
"migration_cluster", None
)
self.distributed_migrations = self.settings_dict["OPTIONS"].pop(
"distributed_migrations", None
)
# https://clickhouse-driver.readthedocs.io/en/latest/quickstart.html#streaming-results
self.max_block_size = self.settings_dict["OPTIONS"].pop("max_block_size", 65409)
if not self.settings_dict["NAME"]:
Expand Down
133 changes: 115 additions & 18 deletions clickhouse_backend/patch/migrations.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from django.apps.registry import Apps
from django.db import DatabaseError
from django.db import models as django_models
from django.db.migrations import Migration
from django.db.migrations.exceptions import IrreversibleError
from django.db.migrations.exceptions import IrreversibleError, MigrationSchemaMissing
from django.db.migrations.operations.fields import FieldOperation
from django.db.migrations.operations.models import (
DeleteModel,
Expand All @@ -15,6 +16,26 @@
__all__ = ["patch_migrations", "patch_migration_recorder", "patch_migration"]


def _should_distribute_migrations(connection):
"""
Check if the connection is configured for distributed migrations.
"""
return getattr(connection, "distributed_migrations", False) and getattr(
connection, "migration_cluster", None
)


def _get_model_table_name(connection):
"""
Return the name of the table that will be used by the MigrationRecorder.
If distributed migrations are enabled, return the distributed table name.
Otherwise, return the regular django_migrations table name.
"""
if _should_distribute_migrations(connection):
return "distributed_django_migrations"
return "django_migrations"


def patch_migrations():
patch_migration_recorder()
patch_migration()
Expand All @@ -29,22 +50,63 @@ def Migration(self):
if self._migration_class is None:
if self.connection.vendor == "clickhouse":
from clickhouse_backend import models
from clickhouse_backend.models import currentDatabase

class Migration(models.ClickhouseModel):
app = models.StringField(max_length=255)
name = models.StringField(max_length=255)
applied = models.DateTime64Field(default=now)
deleted = models.BoolField(default=False)
# Only create a distributed migration model if the connection
# has distributed migrations enabled and a migration cluster is set.
# otherwise, create a regular merge tree.
if _should_distribute_migrations(self.connection):

class Meta:
apps = Apps()
app_label = "migrations"
db_table = "django_migrations"
engine = models.MergeTree(order_by=("app", "name"))
cluster = getattr(self.connection, "migration_cluster", None)
class _Migration(models.ClickhouseModel):
app = models.StringField(max_length=255)
name = models.StringField(max_length=255)
applied = models.DateTime64Field(default=now)
deleted = models.BoolField(default=False)

def __str__(self):
return "Migration %s for %s" % (self.name, self.app)
class Meta:
apps = Apps()
app_label = "migrations"
db_table = "django_migrations"
engine = models.MergeTree(order_by=("app", "name"))
cluster = getattr(self.connection, "migration_cluster")

def __str__(self):
return "Migration %s for %s" % (self.name, self.app)

class Migration(models.ClickhouseModel):
app = models.StringField(max_length=255)
name = models.StringField(max_length=255)
applied = models.DateTime64Field(default=now)
deleted = models.BoolField(default=False)

class Meta:
apps = Apps()
app_label = "migrations"
db_table = _get_model_table_name(self.connection)
engine = models.Distributed(
getattr(self.connection, "migration_cluster"),
currentDatabase(),
_Migration._meta.db_table,
models.Rand(),
)
cluster = getattr(self.connection, "migration_cluster")

Migration._meta.local_model_class = _Migration

else:

class Migration(models.ClickhouseModel):
app = models.StringField(max_length=255)
name = models.StringField(max_length=255)
applied = models.DateTime64Field(default=now)
deleted = models.BoolField(default=False)

class Meta:
apps = Apps()
app_label = "migrations"
db_table = _get_model_table_name(self.connection)
engine = models.MergeTree(order_by=("app", "name"))
cluster = getattr(self.connection, "migration_cluster")

else:

Expand All @@ -69,15 +131,45 @@ def has_table(self):
# Assert migration table won't be deleted once created.
if not getattr(self, "_has_table", False):
with self.connection.cursor() as cursor:
table = self.Migration._meta.db_table
tables = self.connection.introspection.table_names(cursor)
self._has_table = self.Migration._meta.db_table in tables
self._has_table = table in tables
if self._has_table and self.connection.vendor == "clickhouse":
# fix https://github.com/jayvynl/django-clickhouse-backend/issues/51
cursor.execute(
"ALTER table django_migrations ADD COLUMN IF NOT EXISTS deleted Bool"
f"ALTER table {table} ADD COLUMN IF NOT EXISTS deleted Bool"
)
return self._has_table

def ensure_schema(self):
"""Ensure the table exists and has the correct schema."""
# If the table's there, that's fine - we've never changed its schema
# in the codebase.
if self.has_table():
return

# In case of distributed migrations, we need to ensure the local model exists first and
# then create the distributed model.
try:
with self.connection.schema_editor() as editor:
if (
editor.connection.vendor == "clickhouse"
and _should_distribute_migrations(editor.connection)
):
with editor.connection.cursor() as cursor:
tables = editor.connection.introspection.table_names(cursor)
local_model_class = self.Migration._meta.local_model_class
local_table = local_model_class._meta.db_table
if local_table not in tables:
# Create the local model first
editor.create_model(self.Migration._meta.local_model_class)

editor.create_model(self.Migration)
except DatabaseError as exc:
raise MigrationSchemaMissing(
"Unable to create the django_migrations table (%s)" % exc
)

def migration_qs(self):
if self.connection.vendor == "clickhouse":
return self.Migration.objects.using(self.connection.alias).filter(
Expand Down Expand Up @@ -118,6 +210,7 @@ def flush(self):

MigrationRecorder.Migration = property(Migration)
MigrationRecorder.has_table = has_table
MigrationRecorder.ensure_schema = ensure_schema
MigrationRecorder.migration_qs = property(migration_qs)
MigrationRecorder.record_applied = record_applied
MigrationRecorder.record_unapplied = record_unapplied
Expand All @@ -136,13 +229,15 @@ def apply(self, project_state, schema_editor, collect_sql=False):
"""
applied_on_remote = False
if getattr(schema_editor.connection, "migration_cluster", None):
_table = _get_model_table_name(schema_editor.connection)

with schema_editor.connection.cursor() as cursor:
cursor.execute(
"select EXISTS(select 1 from clusterAllReplicas(%s, currentDatabase(), %s)"
" where app=%s and name=%s and deleted=false)",
[
schema_editor.connection.migration_cluster,
"django_migrations",
_table,
self.app_label,
self.name,
],
Expand Down Expand Up @@ -203,13 +298,15 @@ def unapply(self, project_state, schema_editor, collect_sql=False):
"""
unapplied_on_remote = False
if getattr(schema_editor.connection, "migration_cluster", None):
_table = _get_model_table_name(schema_editor.connection)

with schema_editor.connection.cursor() as cursor:
cursor.execute(
"select EXISTS(select 1 from clusterAllReplicas(%s, currentDatabase(), %s)"
" where app=%s and name=%s and deleted=true)",
[
schema_editor.connection.migration_cluster,
"django_migrations",
_table,
self.app_label,
self.name,
],
Expand Down
20 changes: 20 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ services:
- "node1:/var/lib/clickhouse/"
- "./clickhouse-config/node1/:/etc/clickhouse-server/config.d/"
ports:
- "127.0.0.1:8123:8123"
- "127.0.0.1:9000:9000"
node2:
<<: *base-service
Expand All @@ -29,6 +30,7 @@ services:
- "node2:/var/lib/clickhouse/"
- "./clickhouse-config/node2/:/etc/clickhouse-server/config.d/"
ports:
- "127.0.0.1:8124:8123"
- "127.0.0.1:9001:9000"
node3:
<<: *base-service
Expand All @@ -37,6 +39,7 @@ services:
- "node3:/var/lib/clickhouse/"
- "./clickhouse-config/node3/:/etc/clickhouse-server/config.d/"
ports:
- "127.0.0.1:8125:8123"
- "127.0.0.1:9002:9000"
node4:
<<: *base-service
Expand All @@ -45,8 +48,25 @@ services:
- "node4:/var/lib/clickhouse/"
- "./clickhouse-config/node4/:/etc/clickhouse-server/config.d/"
ports:
- "127.0.0.1:8126:8123"
- "127.0.0.1:9003:9000"

haproxy:
image: haproxy:latest
container_name: clickhouse-haproxy
command: sh -c "haproxy -f /usr/local/etc/haproxy/haproxy.cfg"
restart: always
volumes:
- "./proxy-config/haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg"
ports:
- "127.0.0.1:8127:8123"
- "127.0.0.1:9004:9000"
depends_on:
node1:
condition: service_healthy
node3:
condition: service_healthy

volumes:
node1:
name: clickhouse-node1
Expand Down
26 changes: 26 additions & 0 deletions proxy-config/haproxy.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# TCP frontend for native ClickHouse protocol
frontend clickhouse_tcp
bind *:9000
mode tcp
default_backend clickhouse_tcp_nodes

backend clickhouse_tcp_nodes
mode tcp
balance roundrobin
option tcp-check
tcp-check connect
server ch1 clickhouse-node1:9000 check
server ch2 clickhouse-node3:9000 check

# HTTP frontend for ClickHouse's HTTP interface
frontend clickhouse_http
bind *:8123
mode http
default_backend clickhouse_http_nodes

backend clickhouse_http_nodes
mode http
balance roundrobin
option httpchk GET /ping
server ch1 clickhouse-node1:8123 check
server ch2 clickhouse-node3:8123 check
Loading