Skip to content

Commit 5e5c32e

Browse files
committed
1 parent 82f688d commit 5e5c32e

File tree

9 files changed

+207
-85
lines changed

9 files changed

+207
-85
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
### 1.1.1
22
- [Black](https://github.com/psf/black) code style.
3+
- Support [MergeTree settings](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree#settings) in creating table.
34

45
### 1.1.0
56
- Change `AutoFiled` and `SmallAutoField` to clickhouse `Int64`, so that id worker can generate value for them.

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,10 @@ class Event(models.ClickhouseModel):
186186
db_table = 'event'
187187
engine = models.ReplacingMergeTree(
188188
order_by=['id'],
189-
partition_by=Func('timestamp', function='toYYYYMMDD')
189+
partition_by=Func('timestamp', function='toYYYYMMDD'),
190+
index_granularity=1024,
191+
index_granularity_bytes=1 << 20,
192+
enable_mixed_granularity_parts=1,
190193
)
191194
indexes = [
192195
models.Index(

clickhouse_backend/backend/schema.py

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ def _model_indexes_sql(self, model):
188188
return output
189189

190190
def _get_expression(self, model, *expressions):
191+
if not expressions:
192+
return ""
191193
index_expressions = []
192194
for expression in expressions:
193195
index_expression = IndexExpression(expression)
@@ -197,43 +199,30 @@ def _get_expression(self, model, *expressions):
197199

198200
query = Query(model, alias_cols=False)
199201
expression_list = ExpressionList(*index_expressions).resolve_expression(query)
200-
compiler = query.get_compiler(
201-
connection=self.connection,
202-
)
202+
compiler = query.get_compiler(connection=self.connection)
203203
return Expressions(
204204
model._meta.db_table, expression_list, compiler, self.quote_value
205205
)
206206

207207
def _model_extra_sql(self, model, engine):
208-
extra_parts = []
209208
from clickhouse_backend.models.engines import BaseMergeTree
210209

211210
if isinstance(engine, BaseMergeTree):
212211
order_by = engine.order_by
213212
partition_by = engine.partition_by
214213
primary_key = engine.primary_key
215214

216-
if order_by:
217-
if not isinstance(order_by, (list, tuple)):
218-
order_by = [order_by]
219-
extra_parts.append(
220-
"ORDER BY (%s)" % self._get_expression(model, *order_by)
221-
)
222-
else:
223-
extra_parts.append("ORDER BY tuple()")
215+
if order_by is not None:
216+
yield "ORDER BY (%s)" % self._get_expression(model, *order_by)
224217
if partition_by:
225-
if not isinstance(partition_by, (list, tuple)):
226-
partition_by = [partition_by]
227-
extra_parts.append(
228-
"PARTITION BY (%s)" % self._get_expression(model, *partition_by)
229-
)
230-
if primary_key:
231-
if not isinstance(primary_key, (list, tuple)):
232-
primary_key = [primary_key]
233-
extra_parts.append(
234-
"PRIMARY KEY (%s)" % self._get_expression(model, *primary_key)
235-
)
236-
return extra_parts
218+
yield "PARTITION BY (%s)" % self._get_expression(model, *partition_by)
219+
if primary_key is not None:
220+
yield "PRIMARY KEY (%s)" % self._get_expression(model, *primary_key)
221+
if engine.settings:
222+
result = []
223+
for setting, value in engine.settings.items():
224+
result.append(f"{setting}={self.quote_value(value)}")
225+
yield "SETTINGS %s" % ", ".join(result)
237226

238227
def add_field(self, model, field):
239228
"""
Lines changed: 157 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
from itertools import zip_longest
2+
13
from django.db.models.expressions import Func, Value
4+
from django.utils.itercompat import is_iterable
25

36
__all__ = [
47
"Engine",
@@ -20,7 +23,35 @@
2023
]
2124

2225

26+
def _check_positive(value, name):
27+
if not isinstance(value, int) and value <= 0:
28+
raise ValueError(f"{name} must be positive integer.")
29+
return value
30+
31+
32+
def _check_not_negative(value, name):
33+
if not isinstance(value, int) and value < 0:
34+
raise ValueError(f"{name} must not be negative.")
35+
return value
36+
37+
38+
def _check_bool(value, name):
39+
if value not in (0, 1):
40+
raise ValueError(f"{name} must be one of (0, 1, True, False)")
41+
return int(value)
42+
43+
44+
def _check_str(value, name):
45+
if not isinstance(value, str):
46+
raise ValueError(f"{name} must be string")
47+
return value
48+
49+
2350
class Engine(Func):
51+
@property
52+
def function(self):
53+
return self.__class__.__name__
54+
2455
def deconstruct(self):
2556
path, args, kwargs = super().deconstruct()
2657
if path.startswith("clickhouse_backend.models.engines"):
@@ -31,99 +62,197 @@ def deconstruct(self):
3162

3263

3364
class BaseMergeTree(Engine):
34-
def __init__(self, *expressions, output_field=None, **extra):
35-
self.order_by = extra.pop("order_by", None)
36-
assert self.order_by is not None, "order_by is required by MergeTree family."
37-
self.partition_by = extra.pop("partition_by", None)
38-
self.primary_key = extra.pop("primary_key", None)
39-
super().__init__(*expressions, output_field=output_field, **extra)
65+
max_arity = None # The max number of arguments the function accepts.
66+
# https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree#settings
67+
setting_types = {
68+
_check_positive: [
69+
"index_granularity",
70+
"min_index_granularity_bytes",
71+
"merge_max_block_size",
72+
"min_bytes_for_wide_part",
73+
"min_rows_for_wide_part",
74+
"max_parts_in_total",
75+
"max_compress_block_size",
76+
"min_compress_block_size",
77+
"max_partitions_to_read",
78+
],
79+
_check_not_negative: [
80+
"index_granularity_bytes",
81+
"min_merge_bytes_to_use_direct_io",
82+
"merge_with_ttl_timeout",
83+
"merge_with_recompression_ttl_timeout",
84+
"try_fetch_recompressed_part_timeout",
85+
],
86+
_check_bool: [
87+
"enable_mixed_granularity_parts",
88+
"use_minimalistic_part_header_in_zookeeper",
89+
"write_final_mark",
90+
],
91+
_check_str: ["storage_policy"],
92+
}
93+
94+
def __init__(
95+
self,
96+
*expressions,
97+
order_by=None,
98+
partition_by=None,
99+
primary_key=None,
100+
**settings,
101+
):
102+
if self.max_arity is not None and len(expressions) > self.max_arity:
103+
raise TypeError(
104+
"'%s' takes at most %s %s (%s given)"
105+
% (
106+
self.__class__.__name__,
107+
self.max_arity,
108+
"argument" if self.max_arity == 1 else "arguments",
109+
len(expressions),
110+
)
111+
)
112+
113+
assert (
114+
order_by is not None or primary_key is not None
115+
), "order_by or primary_key is missing."
116+
self.order_by = order_by
117+
self.primary_key = primary_key
118+
self.partition_by = partition_by
119+
120+
if order_by is not None:
121+
if isinstance(order_by, str) or not is_iterable(order_by):
122+
self.order_by = [order_by]
123+
if any(o is None for o in self.order_by):
124+
raise ValueError("None is not allowed in order_by")
125+
126+
if primary_key is not None:
127+
if isinstance(primary_key, str) or not is_iterable(primary_key):
128+
self.primary_key = [primary_key]
129+
if any(o is None for o in self.primary_key):
130+
raise ValueError("None is not allowed in primary_key")
131+
132+
# https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree#choosing-a-primary-key-that-differs-from-the-sorting-key
133+
# primary key expression tuple must be a prefix of the sorting key expression tuple.
134+
if self.order_by is not None and self.primary_key is not None:
135+
for o, p in zip_longest(self.order_by, self.primary_key):
136+
if p is None:
137+
break
138+
if p != o:
139+
raise ValueError("primary_key must be a prefix of order_by")
140+
141+
normalized_settings = {}
142+
for setting, value in settings.items():
143+
for validate, keys in self.setting_types.items():
144+
if setting in keys:
145+
normalized_settings[setting] = validate(value, setting)
146+
break
147+
else:
148+
raise TypeError(f"{setting} is not a valid setting.")
149+
self.settings = normalized_settings
150+
super().__init__(*expressions)
40151

41152

42153
class MergeTree(BaseMergeTree):
43-
function = "MergeTree"
44154
arity = 0
45155

46156

47157
class ReplacingMergeTree(BaseMergeTree):
48-
function = "ReplacingMergeTree"
158+
max_arity = 2
49159

50160

51161
class SummingMergeTree(BaseMergeTree):
52-
function = "SummingMergeTree"
162+
pass
53163

54164

55165
class AggregatingMergeTree(BaseMergeTree):
56-
function = "AggregatingMergeTree"
57166
arity = 0
58167

59168

60169
class CollapsingMergeTree(BaseMergeTree):
61-
function = "CollapsingMergeTree"
62170
arity = 1
63171

64172

65173
class VersionedCollapsingMergeTree(BaseMergeTree):
66-
function = "CollapsingMergeTree"
67174
arity = 2
68175

69176

70177
class GraphiteMergeTree(BaseMergeTree):
71-
function = "GraphiteMergeTree"
72178
arity = 1
73179

180+
def __init__(self, *expressions, **extra):
181+
if expressions:
182+
expressions = (Value(expressions[0]), *expressions[1:])
183+
super().__init__(*expressions, **extra)
184+
74185

75186
class ReplicatedMixin:
76187
def __init__(self, *expressions, **extra):
77188
if self.arity is not None and len(expressions) != self.arity + 2:
78189
raise TypeError(
79-
"'%s' takes exactly %s %s (%s given)"
190+
"'%s' takes exactly %s arguments (%s given)"
80191
% (
81192
self.__class__.__name__,
82193
self.arity + 2,
83-
"arguments",
84194
len(expressions),
85195
)
86196
)
87197
if self.arity is None and len(expressions) < 2:
88198
raise TypeError(
89-
"'%s' takes at least %s %s (%s given)"
199+
"'%s' takes at least 2 arguments (%s given)"
90200
% (
91201
self.__class__.__name__,
92-
2,
93-
"arguments",
94202
len(expressions),
95203
)
96204
)
97-
replicated_params = tuple(Value(arg) for arg in self.expressions[:2])
205+
if self.max_arity is not None and len(expressions) > self.max_arity + 2:
206+
raise TypeError(
207+
"'%s' takes at most %s arguments (%s given)"
208+
% (
209+
self.__class__.__name__,
210+
self.max_arity + 2,
211+
len(expressions),
212+
)
213+
)
214+
replicated_params = map(Value, self.expressions[:2])
98215
super().__init__(*expressions[2:], **extra)
99-
self.expressions = replicated_params + self.expressions
216+
self.expressions = (*replicated_params, *self.expressions)
100217

101218

102-
class ReplicatedMergeTree(ReplicatedMixin, MergeTree):
103-
function = "ReplicatedMergeTree"
219+
class ReplicatedMergeTree(MergeTree):
220+
def __init__(self, *expressions, **extra):
221+
# https://github.com/ClickHouse/ClickHouse/issues/8675
222+
# https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication#creating-replicated-tables
223+
# You can specify default arguments for Replicated table engine in the server configuration file.
224+
# In this case, you can omit arguments when creating tables.
225+
if expressions:
226+
if len(expressions) != 2:
227+
raise TypeError(
228+
"'ReplicatedMergeTree' takes at 0 or 2 arguments (%s given)"
229+
% len(expressions)
230+
)
231+
expressions = map(Value, expressions)
232+
super().__init__(*expressions, **extra)
104233

105234

106235
class ReplicatedReplacingMergeTree(ReplicatedMixin, ReplacingMergeTree):
107-
function = "ReplicatedReplacingMergeTree"
236+
pass
108237

109238

110239
class ReplicatedSummingMergeTree(ReplicatedMixin, SummingMergeTree):
111-
function = "ReplicatedSummingMergeTree"
240+
pass
112241

113242

114243
class ReplicatedAggregatingMergeTree(ReplicatedMixin, AggregatingMergeTree):
115-
function = "ReplicatedAggregatingMergeTree"
244+
pass
116245

117246

118247
class ReplicatedCollapsingMergeTree(ReplicatedMixin, CollapsingMergeTree):
119-
function = "ReplicatedCollapsingMergeTree"
248+
pass
120249

121250

122251
class ReplicatedVersionedCollapsingMergeTree(
123252
ReplicatedMixin, VersionedCollapsingMergeTree
124253
):
125-
function = "ReplicatedCollapsingMergeTree"
254+
pass
126255

127256

128257
class ReplicatedGraphiteMergeTree(ReplicatedMixin, GraphiteMergeTree):
129-
function = "ReplicatedGraphiteMergeTree"
258+
pass

example/testapp/models.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ class Meta:
2525
ordering = ["-id"]
2626
db_table = "event"
2727
engine = models.ReplacingMergeTree(
28-
order_by=["id"], partition_by=models.toYYYYMMDD("timestamp")
28+
order_by=["id"],
29+
partition_by=models.toYYYYMMDD("timestamp"),
30+
index_granularity=1024,
31+
index_granularity_bytes=1 << 20,
32+
enable_mixed_granularity_parts=1,
2933
)
3034
indexes = [
3135
models.Index(

0 commit comments

Comments
 (0)