Skip to content

Commit 5ce8834

Browse files
committed
Handle distributed migrations
1 parent 59c1fe5 commit 5ce8834

File tree

5 files changed

+268
-18
lines changed

5 files changed

+268
-18
lines changed

clickhouse_backend/backend/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ def __init__(self, settings_dict, alias=DEFAULT_DB_ALIAS):
163163
self.migration_cluster = self.settings_dict["OPTIONS"].pop(
164164
"migration_cluster", None
165165
)
166+
self.distributed_migrations = self.settings_dict["OPTIONS"].pop(
167+
"distributed_migrations", None
168+
)
166169
# https://clickhouse-driver.readthedocs.io/en/latest/quickstart.html#streaming-results
167170
self.max_block_size = self.settings_dict["OPTIONS"].pop("max_block_size", 65409)
168171
if not self.settings_dict["NAME"]:

clickhouse_backend/patch/migrations.py

Lines changed: 115 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from django.apps.registry import Apps
2+
from django.db import DatabaseError
23
from django.db import models as django_models
34
from django.db.migrations import Migration
4-
from django.db.migrations.exceptions import IrreversibleError
5+
from django.db.migrations.exceptions import IrreversibleError, MigrationSchemaMissing
56
from django.db.migrations.operations.fields import FieldOperation
67
from django.db.migrations.operations.models import (
78
DeleteModel,
@@ -15,6 +16,26 @@
1516
__all__ = ["patch_migrations", "patch_migration_recorder", "patch_migration"]
1617

1718

19+
def _should_distribute_migrations(connection):
20+
"""
21+
Check if the connection is configured for distributed migrations.
22+
"""
23+
return getattr(connection, "distributed_migrations", False) and getattr(
24+
connection, "migration_cluster", None
25+
)
26+
27+
28+
def _get_model_table_name(connection):
29+
"""
30+
Return the name of the table that will be used by the MigrationRecorder.
31+
If distributed migrations are enabled, return the distributed table name.
32+
Otherwise, return the regular django_migrations table name.
33+
"""
34+
if _should_distribute_migrations(connection):
35+
return "distributed_django_migrations"
36+
return "django_migrations"
37+
38+
1839
def patch_migrations():
1940
patch_migration_recorder()
2041
patch_migration()
@@ -29,22 +50,63 @@ def Migration(self):
2950
if self._migration_class is None:
3051
if self.connection.vendor == "clickhouse":
3152
from clickhouse_backend import models
53+
from clickhouse_backend.models import currentDatabase
3254

33-
class Migration(models.ClickhouseModel):
34-
app = models.StringField(max_length=255)
35-
name = models.StringField(max_length=255)
36-
applied = models.DateTime64Field(default=now)
37-
deleted = models.BoolField(default=False)
55+
# Only create a distributed migration model if the connection
56+
# has distributed migrations enabled and a migration cluster is set.
57+
# otherwise, create a regular merge tree.
58+
if _should_distribute_migrations(self.connection):
3859

39-
class Meta:
40-
apps = Apps()
41-
app_label = "migrations"
42-
db_table = "django_migrations"
43-
engine = models.MergeTree(order_by=("app", "name"))
44-
cluster = getattr(self.connection, "migration_cluster", None)
60+
class _Migration(models.ClickhouseModel):
61+
app = models.StringField(max_length=255)
62+
name = models.StringField(max_length=255)
63+
applied = models.DateTime64Field(default=now)
64+
deleted = models.BoolField(default=False)
4565

46-
def __str__(self):
47-
return "Migration %s for %s" % (self.name, self.app)
66+
class Meta:
67+
apps = Apps()
68+
app_label = "migrations"
69+
db_table = "django_migrations"
70+
engine = models.MergeTree(order_by=("app", "name"))
71+
cluster = getattr(self.connection, "migration_cluster")
72+
73+
def __str__(self):
74+
return "Migration %s for %s" % (self.name, self.app)
75+
76+
class Migration(models.ClickhouseModel):
77+
app = models.StringField(max_length=255)
78+
name = models.StringField(max_length=255)
79+
applied = models.DateTime64Field(default=now)
80+
deleted = models.BoolField(default=False)
81+
82+
class Meta:
83+
apps = Apps()
84+
app_label = "migrations"
85+
db_table = _get_model_table_name(self.connection)
86+
engine = models.Distributed(
87+
getattr(self.connection, "migration_cluster"),
88+
currentDatabase(),
89+
_Migration._meta.db_table,
90+
models.Rand(),
91+
)
92+
cluster = getattr(self.connection, "migration_cluster")
93+
94+
Migration._meta.local_model_class = _Migration
95+
96+
else:
97+
98+
class Migration(models.ClickhouseModel):
99+
app = models.StringField(max_length=255)
100+
name = models.StringField(max_length=255)
101+
applied = models.DateTime64Field(default=now)
102+
deleted = models.BoolField(default=False)
103+
104+
class Meta:
105+
apps = Apps()
106+
app_label = "migrations"
107+
db_table = _get_model_table_name(self.connection)
108+
engine = models.MergeTree(order_by=("app", "name"))
109+
cluster = getattr(self.connection, "migration_cluster")
48110

49111
else:
50112

@@ -69,15 +131,45 @@ def has_table(self):
69131
# Assert migration table won't be deleted once created.
70132
if not getattr(self, "_has_table", False):
71133
with self.connection.cursor() as cursor:
134+
table = self.Migration._meta.db_table
72135
tables = self.connection.introspection.table_names(cursor)
73-
self._has_table = self.Migration._meta.db_table in tables
136+
self._has_table = table in tables
74137
if self._has_table and self.connection.vendor == "clickhouse":
75138
# fix https://github.com/jayvynl/django-clickhouse-backend/issues/51
76139
cursor.execute(
77-
"ALTER table django_migrations ADD COLUMN IF NOT EXISTS deleted Bool"
140+
f"ALTER table {table} ADD COLUMN IF NOT EXISTS deleted Bool"
78141
)
79142
return self._has_table
80143

144+
def ensure_schema(self):
145+
"""Ensure the table exists and has the correct schema."""
146+
# If the table's there, that's fine - we've never changed its schema
147+
# in the codebase.
148+
if self.has_table():
149+
return
150+
151+
# In case of distributed migrations, we need to ensure the local model exists first and
152+
# then create the distributed model.
153+
try:
154+
with self.connection.schema_editor() as editor:
155+
if (
156+
editor.connection.vendor == "clickhouse"
157+
and _should_distribute_migrations(editor.connection)
158+
):
159+
with editor.connection.cursor() as cursor:
160+
tables = editor.connection.introspection.table_names(cursor)
161+
local_model_class = self.Migration._meta.local_model_class
162+
local_table = local_model_class._meta.db_table
163+
if local_table not in tables:
164+
# Create the local model first
165+
editor.create_model(self.Migration._meta.local_model_class)
166+
167+
editor.create_model(self.Migration)
168+
except DatabaseError as exc:
169+
raise MigrationSchemaMissing(
170+
"Unable to create the django_migrations table (%s)" % exc
171+
)
172+
81173
def migration_qs(self):
82174
if self.connection.vendor == "clickhouse":
83175
return self.Migration.objects.using(self.connection.alias).filter(
@@ -118,6 +210,7 @@ def flush(self):
118210

119211
MigrationRecorder.Migration = property(Migration)
120212
MigrationRecorder.has_table = has_table
213+
MigrationRecorder.ensure_schema = ensure_schema
121214
MigrationRecorder.migration_qs = property(migration_qs)
122215
MigrationRecorder.record_applied = record_applied
123216
MigrationRecorder.record_unapplied = record_unapplied
@@ -136,13 +229,15 @@ def apply(self, project_state, schema_editor, collect_sql=False):
136229
"""
137230
applied_on_remote = False
138231
if getattr(schema_editor.connection, "migration_cluster", None):
232+
_table = _get_model_table_name(schema_editor.connection)
233+
139234
with schema_editor.connection.cursor() as cursor:
140235
cursor.execute(
141236
"select EXISTS(select 1 from clusterAllReplicas(%s, currentDatabase(), %s)"
142237
" where app=%s and name=%s and deleted=false)",
143238
[
144239
schema_editor.connection.migration_cluster,
145-
"django_migrations",
240+
_table,
146241
self.app_label,
147242
self.name,
148243
],
@@ -203,13 +298,15 @@ def unapply(self, project_state, schema_editor, collect_sql=False):
203298
"""
204299
unapplied_on_remote = False
205300
if getattr(schema_editor.connection, "migration_cluster", None):
301+
_table = _get_model_table_name(schema_editor.connection)
302+
206303
with schema_editor.connection.cursor() as cursor:
207304
cursor.execute(
208305
"select EXISTS(select 1 from clusterAllReplicas(%s, currentDatabase(), %s)"
209306
" where app=%s and name=%s and deleted=true)",
210307
[
211308
schema_editor.connection.migration_cluster,
212-
"django_migrations",
309+
_table,
213310
self.app_label,
214311
self.name,
215312
],

compose.yaml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ services:
2121
- "node1:/var/lib/clickhouse/"
2222
- "./clickhouse-config/node1/:/etc/clickhouse-server/config.d/"
2323
ports:
24+
- "127.0.0.1:8123:8123"
2425
- "127.0.0.1:9000:9000"
2526
node2:
2627
<<: *base-service
@@ -29,6 +30,7 @@ services:
2930
- "node2:/var/lib/clickhouse/"
3031
- "./clickhouse-config/node2/:/etc/clickhouse-server/config.d/"
3132
ports:
33+
- "127.0.0.1:8124:8123"
3234
- "127.0.0.1:9001:9000"
3335
node3:
3436
<<: *base-service
@@ -37,6 +39,7 @@ services:
3739
- "node3:/var/lib/clickhouse/"
3840
- "./clickhouse-config/node3/:/etc/clickhouse-server/config.d/"
3941
ports:
42+
- "127.0.0.1:8125:8123"
4043
- "127.0.0.1:9002:9000"
4144
node4:
4245
<<: *base-service
@@ -45,8 +48,25 @@ services:
4548
- "node4:/var/lib/clickhouse/"
4649
- "./clickhouse-config/node4/:/etc/clickhouse-server/config.d/"
4750
ports:
51+
- "127.0.0.1:8126:8123"
4852
- "127.0.0.1:9003:9000"
4953

54+
haproxy:
55+
image: haproxy:latest
56+
container_name: clickhouse-haproxy
57+
command: sh -c "haproxy -f /usr/local/etc/haproxy/haproxy.cfg"
58+
restart: always
59+
volumes:
60+
- "./proxy-config/haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg"
61+
ports:
62+
- "127.0.0.1:8127:8123"
63+
- "127.0.0.1:9004:9000"
64+
depends_on:
65+
node1:
66+
condition: service_healthy
67+
node3:
68+
condition: service_healthy
69+
5070
volumes:
5171
node1:
5272
name: clickhouse-node1

proxy-config/haproxy.cfg

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# TCP frontend for native ClickHouse protocol
2+
frontend clickhouse_tcp
3+
bind *:9000
4+
mode tcp
5+
default_backend clickhouse_tcp_nodes
6+
7+
backend clickhouse_tcp_nodes
8+
mode tcp
9+
balance roundrobin
10+
option tcp-check
11+
tcp-check connect
12+
server ch1 clickhouse-node1:9000 check
13+
server ch2 clickhouse-node3:9000 check
14+
15+
# HTTP frontend for ClickHouse's HTTP interface
16+
frontend clickhouse_http
17+
bind *:8123
18+
mode http
19+
default_backend clickhouse_http_nodes
20+
21+
backend clickhouse_http_nodes
22+
mode http
23+
balance roundrobin
24+
option httpchk GET /ping
25+
server ch1 clickhouse-node1:8123 check
26+
server ch2 clickhouse-node3:8123 check

0 commit comments

Comments
 (0)