Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 57 additions & 36 deletions ch_backup/clickhouse/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,61 +600,82 @@ def kill_old_freeze_queries(self):
KILL_RUNNING_FREEZE_SQL.format(user=self._ch_ctl_config["user"])
)

# pylint: disable=too-many-positional-arguments
def freeze_table(
self,
backup_name: str,
table: Table,
threads: int,
parallelize_freeze_in_ch: bool,
freeze_partition_threads: int,
clickhouse_query_max_threads: int,
) -> None:
"""
Make snapshot of the specified table.
To execute freeze efficiently, we should parallelize freeze operations. We have two options where we can parallelize:
1) parallelize_freeze_in_ch = False
Inside ch-backup: We can perform several `ALTER TABLE FREEZE PARTITION` queries.
2) parallelize_freeze_in_ch = True
Inside clickhouse(preferable): Since 25.11 clickhouse can parallelize `ALTER TABLE FREEZE` queries. https://github.com/ClickHouse/ClickHouse/pull/71743
"""
# Table has no partitions or created with deprecated syntax.
# FREEZE PARTITION ID with deprecated syntax throws segmentation fault in CH.
freeze_by_partitions = threads > 0 and "PARTITION BY" in table.create_statement
freeze_by_partitions = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about if freeze_partition_threads > 0 set it to max_threads for >=25.11 otherwise fallback to freezing by partitions. Without special parallelize_freeze_in_ch setting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like it because:

  • We will lose the ability to disable the new behaviour. We haven't use it in prod environment yet. Therefore, in my opinion, it is better to keep the possibility of enabling freezes in ch-backup.
  • Knobs naming problems: maybe it nitpicking but... AFAIK freezes in the ch got part granularity, but the name is freeze_partition_threads , so renaming or aliases required

Maybe it is overthinking, but I like current solution more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it makes sense. Let's make parallelize_freeze_in_clickhouse: True by default

not parallelize_freeze_in_ch
and freeze_partition_threads > 0
and "PARTITION BY" in table.create_statement
)

query_settings: Dict[str, Any] = {"max_threads": 1}

query_settings = None
# Since https://github.com/ClickHouse/ClickHouse/pull/75016
if self.ch_version_ge("25.2"):
query_settings = {"max_execution_time": self._freeze_timeout}
query_settings["max_execution_time"] = self._freeze_timeout

if freeze_by_partitions:
with ThreadExecPool(max(1, threads)) as pool:
if freeze_by_partitions:
partitions_to_freeze = self.list_partitions(table)
for partition in partitions_to_freeze:
query_sql = FREEZE_PARTITION_SQL.format(
db_name=escape(table.database),
table_name=escape(table.name),
backup_name=backup_name,
partition=partition,
)
pool.submit(
f'Freeze partition "{partition}"',
self._ch_client.query,
query_sql,
settings=query_settings,
timeout=self._freeze_timeout,
should_retry=False,
new_session=True,
)
with ThreadExecPool(max(1, freeze_partition_threads)) as pool:
partitions_to_freeze = self.list_partitions(table)
for partition in partitions_to_freeze:
query_sql = FREEZE_PARTITION_SQL.format(
db_name=escape(table.database),
table_name=escape(table.name),
backup_name=backup_name,
partition=partition,
)
pool.submit(
f'Freeze partition "{partition}"',
self._ch_client.query,
query_sql,
settings=query_settings,
timeout=self._freeze_timeout,
should_retry=False,
new_session=True,
)
pool.wait_all(
keep_going=False,
timeout=self._freeze_timeout,
)
else:
query_sql = FREEZE_TABLE_SQL.format(
db_name=escape(table.database),
table_name=escape(table.name),
backup_name=backup_name,
)
self._ch_client.query(
query_sql,
settings=query_settings,
timeout=self._freeze_timeout,
should_retry=False,
new_session=True,
)
return

if parallelize_freeze_in_ch:
if self.ch_version_ge("25.11"):
query_settings["max_threads"] = max(1, clickhouse_query_max_threads)
else:
logging.warning(
"Parallel freeze available only with ch >= 25.11. See https://github.com/ClickHouse/ClickHouse/pull/71743"
)

query_sql = FREEZE_TABLE_SQL.format(
db_name=escape(table.database),
table_name=escape(table.name),
backup_name=backup_name,
)
self._ch_client.query(
query_sql,
settings=query_settings,
timeout=self._freeze_timeout,
should_retry=False,
new_session=True,
)

def list_partitions(self, table: Table) -> List[str]:
"""
Expand Down
6 changes: 6 additions & 0 deletions ch_backup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,14 @@ def _as_seconds(t: str) -> int:
"cloud_storage_restore_workers": 4,
# The number of threads for parallel freeze of tables
"freeze_threads": 4,
# To execute freeze efficiently, we should parallelize freeze operations. We have two options where we can parallelize:
# 1) Inside ch-backup: We can perform several `ALTER TABLE FREEZE PARTITION` queries. Then freeze_partition_threads are used.
# 2) Inside clickhouse(preferable): Since 25.11 clickhouse can parallelize `ALTER TABLE FREEZE` queries. Then freeze_table_query_max_threads are used.
# https://github.com/ClickHouse/ClickHouse/pull/71743
"parallelize_freeze_in_clickhouse": True,
# The number of threads for parallel freeze of partitions. If set to 0, will freeze table in one query.
"freeze_partition_threads": 16,
"freeze_table_query_max_threads": 16,
# The number of threads for parallel drop replica
"drop_replica_threads": 8,
},
Expand Down
12 changes: 11 additions & 1 deletion ch_backup/logic/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,11 @@ def _backup(
table,
backup_name,
schema_only,
multiprocessing_config.get(
"parallelize_freeze_in_clickhouse", False
),
multiprocessing_config.get("freeze_partition_threads", 0),
multiprocessing_config.get("freeze_table_query_max_threads", 0),
)

for freezed_table in pool.as_completed(keep_going=False):
Expand Down Expand Up @@ -188,7 +192,9 @@ def _freeze_table(
table: Table,
backup_name: str,
schema_only: bool,
parallelize_freeze_in_ch: bool,
freeze_partition_threads: int,
freeze_table_query_max_threads: int,
) -> Optional[Table]:
"""
Freeze table and return it's create statement
Expand All @@ -208,7 +214,11 @@ def _freeze_table(
if not schema_only and table.is_merge_tree():
try:
context.ch_ctl.freeze_table(
backup_name, table, freeze_partition_threads
backup_name,
table,
parallelize_freeze_in_ch,
freeze_partition_threads,
freeze_table_query_max_threads,
)
except ClickhouseError:
if context.ch_ctl.does_table_exist(table.database, table.name):
Expand Down
6 changes: 6 additions & 0 deletions images/clickhouse/config/ch-backup.conf
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,9 @@ multiprocessing:
{% else %}
cloud_storage_restore_workers: 1
{% endif%}
{% if ch_version_ge('25.11') %}
parallelize_freeze_in_clickhouse: true
{% else %}
parallelize_freeze_in_clickhouse: false
{% endif %}

18 changes: 18 additions & 0 deletions tests/integration/features/backup_restore_sources.feature
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,21 @@ Scenario: Detached table is not a blocker to backup restore.
And we restore clickhouse backup #0 to clickhouse02
Then we got same clickhouse data at clickhouse01 clickhouse02

@require_version_25.11
Scenario: Parallel freeze over parallelization in ch-backup.
Given ch-backup configuration on clickhouse01
"""
multiprocessing:
parallelize_freeze_in_clickhouse: False
"""
Given we have executed queries on clickhouse01
"""
CREATE DATABASE test_db;
CREATE TABLE test_db.table_01 (n Int32, b Int32) ENGINE = MergeTree() PARTITION BY b ORDER BY n;
INSERT INTO test_db.table_01 SELECT rand32()%100, rand32()%100 FROM numbers(100000);
"""
When we create clickhouse01 clickhouse backup
And we restore clickhouse backup #0 to clickhouse02
Then we got same clickhouse data at clickhouse01 clickhouse02