Skip to content

Commit 9a20c1e

Browse files
authored
Erase using batches (#186)
1 parent 2480e29 commit 9a20c1e

File tree

3 files changed

+33
-13
lines changed

3 files changed

+33
-13
lines changed

mysql_ch_replicator/clickhouse_api.py

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class ClickhouseApi:
8585
def __init__(self, database: str | None, clickhouse_settings: ClickhouseSettings):
8686
self.database = database
8787
self.clickhouse_settings = clickhouse_settings
88+
self.erase_batch_size = clickhouse_settings.erase_batch_size
8889
self.client = clickhouse_connect.get_client(
8990
host=clickhouse_settings.host,
9091
port=clickhouse_settings.port,
@@ -248,22 +249,34 @@ def insert(self, table_name, records, table_structure: TableStructure = None):
248249

249250
def erase(self, table_name, field_name, field_values):
250251
field_name = ','.join(field_name)
251-
field_values = ', '.join(f'({v})' for v in field_values)
252-
query = DELETE_QUERY.format(**{
253-
'db_name': self.database,
254-
'table_name': table_name,
255-
'field_name': field_name,
256-
'field_values': field_values,
257-
})
258-
t1 = time.time()
259-
self.execute_command(query)
260-
t2 = time.time()
261-
duration = t2 - t1
252+
253+
# Batch large deletions to avoid ClickHouse max query size limit
254+
field_values_list = list(field_values)
255+
256+
total_duration = 0.0
257+
total_records = len(field_values_list)
258+
259+
for i in range(0, len(field_values_list), self.erase_batch_size):
260+
batch = field_values_list[i:i + self.erase_batch_size]
261+
batch_field_values = ', '.join(f'({v})' for v in batch)
262+
263+
query = DELETE_QUERY.format(**{
264+
'db_name': self.database,
265+
'table_name': table_name,
266+
'field_name': field_name,
267+
'field_values': batch_field_values,
268+
})
269+
270+
t1 = time.time()
271+
self.execute_command(query)
272+
t2 = time.time()
273+
total_duration += (t2 - t1)
274+
262275
self.stats.on_event(
263276
table_name=table_name,
264-
duration=duration,
277+
duration=total_duration,
265278
is_insert=False,
266-
records=len(field_values),
279+
records=total_records,
267280
)
268281

269282
def drop_database(self, db_name):

mysql_ch_replicator/config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class ClickhouseSettings:
5252
password: str = ''
5353
connection_timeout: int = 30
5454
send_receive_timeout: int = 120
55+
erase_batch_size: int = 100000 # Number of records to delete per batch
5556

5657
def validate(self):
5758
if not isinstance(self.host, str):
@@ -78,6 +79,11 @@ def validate(self):
7879
if self.send_receive_timeout <= 0:
7980
raise ValueError(f'send_receive_timeout timeout should be at least 1 second')
8081

82+
if not isinstance(self.erase_batch_size, int):
83+
raise ValueError(f'erase_batch_size should be int and not {stype(self.erase_batch_size)}')
84+
if self.erase_batch_size <= 0:
85+
raise ValueError('erase_batch_size should be positive')
86+
8187

8288
@dataclass
8389
class BinlogReplicatorSettings:

tests_config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ clickhouse:
99
port: 9123
1010
user: 'default'
1111
password: 'admin'
12+
erase_batch_size: 2
1213

1314
binlog_replicator:
1415
data_dir: '/app/binlog/'

0 commit comments

Comments
 (0)