Skip to content

Commit 84a6920

Browse files
authored
Support for mysql set type (#72)
1 parent 714d94d commit 84a6920

File tree

6 files changed

+61
-9
lines changed

6 files changed

+61
-9
lines changed

.github/workflows/tests.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@ jobs:
1717
run: >
1818
ls -la &&
1919
docker compose -f docker-compose-tests.yaml up --force-recreate --no-deps --wait -d &&
20-
sudo docker exec -w /app/ -i `docker ps | grep python | awk '{print $1;}'` python3 -m pytest -v -s test_mysql_ch_replicator.py
20+
sudo docker exec -w /app/ -i `docker ps | grep python | awk '{print $1;}'` python3 -m pytest -x -v -s test_mysql_ch_replicator.py

mysql_ch_replicator/converter.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ def convert_type(self, mysql_type, parameters):
244244
return 'String'
245245
if 'binary' in mysql_type:
246246
return 'String'
247+
if 'set(' in mysql_type:
248+
return 'String'
247249
raise Exception(f'unknown mysql type "{mysql_type}"')
248250

249251
def convert_field_type(self, mysql_type, mysql_parameters):
@@ -323,6 +325,21 @@ def convert_record(
323325
charset = mysql_structure.charset_python or 'utf-8'
324326
clickhouse_field_value = clickhouse_field_value.decode(charset)
325327

328+
if 'set(' in mysql_field_type:
329+
set_values = mysql_structure.fields[idx].additional_data
330+
if isinstance(clickhouse_field_value, int):
331+
bit_mask = clickhouse_field_value
332+
clickhouse_field_value = [
333+
val
334+
for idx, val in enumerate(set_values)
335+
if bit_mask & (1 << idx)
336+
]
337+
elif isinstance(clickhouse_field_value, set):
338+
clickhouse_field_value = [
339+
v for v in set_values if v in clickhouse_field_value
340+
]
341+
clickhouse_field_value = ','.join(clickhouse_field_value)
342+
326343
if 'point' in mysql_field_type:
327344
clickhouse_field_value = parse_mysql_point(clickhouse_field_value)
328345

@@ -651,10 +668,26 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None
651668
if len(definition) > 2:
652669
field_parameters = ' '.join(definition[2:])
653670

671+
additional_data = None
672+
if 'set(' in field_type.lower():
673+
vals = field_type[len('set('):]
674+
close_pos = vals.find(')')
675+
vals = vals[:close_pos]
676+
vals = vals.split(',')
677+
def vstrip(e):
678+
if not e:
679+
return e
680+
if e[0] in '"\'':
681+
return e[1:-1]
682+
return e
683+
vals = [vstrip(v) for v in vals]
684+
additional_data = vals
685+
654686
structure.fields.append(TableField(
655687
name=field_name,
656688
field_type=field_type,
657689
parameters=field_parameters,
690+
additional_data=additional_data,
658691
))
659692
#print(' ---- params:', field_parameters)
660693

mysql_ch_replicator/pymysqlreplication/row_event.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ def __read_values_name(
275275
return None
276276
return ret
277277
self.__none_sources[column.name] = NONE_SOURCE.EMPTY_SET
278-
return None
278+
return bit_mask
279279
elif column.type == FIELD_TYPE.BIT:
280280
return self.__read_bit(column)
281281
elif column.type == FIELD_TYPE.GEOMETRY:

mysql_ch_replicator/runner.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ def __init__(self, db_name, config_file):
4343

4444

4545
class Runner:
46+
47+
DB_REPLICATOR_RUN_DELAY = 5
48+
4649
def __init__(self, config: Settings, wait_initial_replication: bool, databases: str):
4750
self.config = config
4851
self.databases = databases or config.databases
@@ -149,8 +152,14 @@ def run(self):
149152
server_thread = threading.Thread(target=self.run_server, daemon=True)
150153
server_thread.start()
151154

155+
t1 = time.time()
156+
while time.time() - t1 < self.DB_REPLICATOR_RUN_DELAY and not killer.kill_now:
157+
time.sleep(0.3)
158+
152159
# First - continue replication for DBs that already finished initial replication
153160
for db in databases:
161+
if killer.kill_now:
162+
break
154163
if not self.is_initial_replication_finished(db_name=db):
155164
continue
156165
logger.info(f'running replication for {db} (initial replication finished)')
@@ -161,6 +170,8 @@ def run(self):
161170
for db in databases:
162171
if db in self.runners:
163172
continue
173+
if killer.kill_now:
174+
break
164175

165176
logger.info(f'running replication for {db} (initial replication not finished - waiting)')
166177
runner = self.runners[db] = DbReplicatorRunner(db_name=db, config_file=self.config.settings_file)

mysql_ch_replicator/table_structure.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
from dataclasses import dataclass, field
2+
from typing import Any
3+
24

35
@dataclass
46
class TableField:
57
name: str = ''
68
field_type: str = ''
79
parameters: str = ''
10+
additional_data: Any = None
811

912
@dataclass
1013
class TableStructure:

test_mysql_ch_replicator.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def kill_process(pid, force=False):
5050
subprocess.run(command, shell=True)
5151

5252

53-
def assert_wait(condition, max_wait_time=15.0, retry_interval=0.05):
53+
def assert_wait(condition, max_wait_time=20.0, retry_interval=0.05):
5454
max_time = time.time() + max_wait_time
5555
while time.time() < max_time:
5656
if condition():
@@ -344,9 +344,9 @@ def test_runner():
344344
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
345345
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)
346346

347-
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES ('Filipp', 50, POINT(10.0, 20.0));", commit=True)
347+
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES ('Xeishfru32', 50, POINT(10.0, 20.0));", commit=True)
348348
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
349-
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0]['age'] == 50)
349+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Xeishfru32'")[0]['age'] == 50)
350350

351351
# Test for restarting dead processes
352352
binlog_repl_pid = get_binlog_replicator_pid(cfg)
@@ -868,13 +868,14 @@ def test_different_types_2():
868868
test1 bit(1),
869869
test2 point,
870870
test3 binary(16),
871+
test4 set('1','2','3','4','5','6','7'),
871872
PRIMARY KEY (id)
872873
);
873874
''')
874875

875876
mysql.execute(
876-
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test3) VALUES "
877-
f"(0, POINT(10.0, 20.0), 'azaza');",
877+
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test3, test4) VALUES "
878+
f"(0, POINT(10.0, 20.0), 'azaza', '1,3,5');",
878879
commit=True,
879880
)
880881

@@ -891,17 +892,21 @@ def test_different_types_2():
891892
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)
892893

893894
mysql.execute(
894-
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES "
895-
f"(1, POINT(15.0, 14.0));",
895+
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test4) VALUES "
896+
f"(1, POINT(15.0, 14.0), '2,4,5');",
896897
commit=True,
897898
)
899+
898900
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)
899901
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, 'test1=True')) == 1)
900902

901903
assert ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test2']['x'] == 15.0
902904
assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test2']['y'] == 20.0
903905
assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test3'] == 'azaza\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
904906

907+
assert ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test4'] == '2,4,5'
908+
assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test4'] == '1,3,5'
909+
905910
mysql.execute(
906911
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES "
907912
f"(0, NULL);",

0 commit comments

Comments
 (0)