Skip to content

Commit 405e2d7

Browse files
committed
Merge branch 'master' into optimizer_auto_call
2 parents b477118 + cc1aea0 commit 405e2d7

File tree

4 files changed

+137
-7
lines changed

4 files changed

+137
-7
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
[![Release][release-image]][releases]
44
[![License][license-image]][license]
55

6-
[release-image]: https://img.shields.io/badge/release-0.0.32-blue.svg?style=flat
6+
[release-image]: https://img.shields.io/badge/release-0.0.35-blue.svg?style=flat
77
[releases]: https://github.com/bakwc/mysql_ch_replicator/releases
88

99
[license-image]: https://img.shields.io/badge/license-MIT-blue.svg?style=flat

mysql_ch_replicator/converter.py

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import struct
12
import json
23
import sqlparse
34
import re
@@ -27,6 +28,58 @@ def convert_bytes(obj):
2728
return obj
2829

2930

31+
def parse_mysql_point(binary):
32+
"""
33+
Parses the binary representation of a MySQL POINT data type
34+
and returns a tuple (x, y) representing the coordinates.
35+
36+
:param binary: The binary data representing the POINT.
37+
:return: A tuple (x, y) with the coordinate values.
38+
"""
39+
if binary is None:
40+
return 0, 0
41+
42+
if len(binary) == 21:
43+
# No SRID. Proceed as per WKB POINT
44+
# Read the byte order
45+
byte_order = binary[0]
46+
if byte_order == 0:
47+
endian = '>'
48+
elif byte_order == 1:
49+
endian = '<'
50+
else:
51+
raise ValueError("Invalid byte order in WKB POINT")
52+
# Read the WKB Type
53+
wkb_type = struct.unpack(endian + 'I', binary[1:5])[0]
54+
if wkb_type != 1: # WKB type 1 means POINT
55+
raise ValueError("Not a WKB POINT type")
56+
# Read X and Y coordinates
57+
x = struct.unpack(endian + 'd', binary[5:13])[0]
58+
y = struct.unpack(endian + 'd', binary[13:21])[0]
59+
elif len(binary) == 25:
60+
# With SRID included
61+
# First 4 bytes are the SRID
62+
srid = struct.unpack('>I', binary[0:4])[0] # SRID is big-endian
63+
# Next byte is byte order
64+
byte_order = binary[4]
65+
if byte_order == 0:
66+
endian = '>'
67+
elif byte_order == 1:
68+
endian = '<'
69+
else:
70+
raise ValueError("Invalid byte order in WKB POINT")
71+
# Read the WKB Type
72+
wkb_type = struct.unpack(endian + 'I', binary[5:9])[0]
73+
if wkb_type != 1: # WKB type 1 means POINT
74+
raise ValueError("Not a WKB POINT type")
75+
# Read X and Y coordinates
76+
x = struct.unpack(endian + 'd', binary[9:17])[0]
77+
y = struct.unpack(endian + 'd', binary[17:25])[0]
78+
else:
79+
raise ValueError("Invalid binary length for WKB POINT")
80+
return (x, y)
81+
82+
3083
def strip_sql_name(name):
3184
name = name.strip()
3285
if name.startswith('`'):
@@ -64,9 +117,11 @@ def __init__(self, db_replicator: 'DbReplicator' = None):
64117
self.db_replicator = db_replicator
65118

66119
def convert_type(self, mysql_type, parameters):
67-
68120
is_unsigned = 'unsigned' in parameters.lower()
69121

122+
if mysql_type == 'point':
123+
return 'Tuple(x Float32, y Float32)'
124+
70125
if mysql_type == 'int':
71126
if is_unsigned:
72127
return 'UInt32'
@@ -89,6 +144,8 @@ def convert_type(self, mysql_type, parameters):
89144
return 'Date32'
90145
if mysql_type == 'tinyint(1)':
91146
return 'Bool'
147+
if mysql_type == 'bit(1)':
148+
return 'Bool'
92149
if mysql_type == 'bool':
93150
return 'Bool'
94151
if 'smallint' in mysql_type:
@@ -123,27 +180,31 @@ def convert_type(self, mysql_type, parameters):
123180
return 'Float32'
124181
if 'double' in mysql_type:
125182
return 'Float64'
126-
if 'integer' in mysql_type or 'int(' in mysql_type:
127-
if is_unsigned:
128-
return 'UInt32'
129-
return 'Int32'
130183
if 'bigint' in mysql_type:
131184
if is_unsigned:
132185
return 'UInt64'
133186
return 'Int64'
187+
if 'integer' in mysql_type or 'int(' in mysql_type:
188+
if is_unsigned:
189+
return 'UInt32'
190+
return 'Int32'
134191
if 'real' in mysql_type:
135192
return 'Float64'
136193
if mysql_type.startswith('time'):
137194
return 'String'
138195
if 'varbinary' in mysql_type:
139196
return 'String'
197+
if 'binary' in mysql_type:
198+
return 'String'
140199
raise Exception(f'unknown mysql type "{mysql_type}"')
141200

142201
def convert_field_type(self, mysql_type, mysql_parameters):
143202
mysql_type = mysql_type.lower()
144203
mysql_parameters = mysql_parameters.lower()
145204
not_null = 'not null' in mysql_parameters
146205
clickhouse_type = self.convert_type(mysql_type, mysql_parameters)
206+
if 'Tuple' in clickhouse_type:
207+
not_null = True
147208
if not not_null:
148209
clickhouse_type = f'Nullable({clickhouse_type})'
149210
return clickhouse_type
@@ -195,6 +256,9 @@ def convert_record(self, mysql_record, mysql_field_types, clickhouse_field_types
195256
if 'UInt64' in clickhouse_field_type and clickhouse_field_value < 0:
196257
clickhouse_field_value = 18446744073709551616 + clickhouse_field_value
197258

259+
if 'point' in mysql_field_type:
260+
clickhouse_field_value = parse_mysql_point(clickhouse_field_value)
261+
198262
clickhouse_record.append(clickhouse_field_value)
199263
return tuple(clickhouse_record)
200264

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "mysql-ch-replicator"
3-
version = "0.0.32"
3+
version = "0.0.35"
44
description = "Tool for replication of MySQL databases to ClickHouse"
55
authors = ["Filipp Ozinov <[email protected]>"]
66
license = "MIT"

test_mysql_ch_replicator.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,72 @@ def test_numeric_types_and_limits():
694694
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, 'test7=18446744073709551586')) == 2)
695695

696696

697+
def test_different_types_2():
698+
cfg = config.Settings()
699+
cfg.load(CONFIG_FILE)
700+
701+
mysql = mysql_api.MySQLApi(
702+
database=None,
703+
mysql_settings=cfg.mysql,
704+
)
705+
706+
ch = clickhouse_api.ClickhouseApi(
707+
database=TEST_DB_NAME,
708+
clickhouse_settings=cfg.clickhouse,
709+
)
710+
711+
prepare_env(cfg, mysql, ch)
712+
713+
mysql.execute("SET sql_mode = 'ALLOW_INVALID_DATES';")
714+
715+
mysql.execute(f'''
716+
CREATE TABLE {TEST_TABLE_NAME} (
717+
`id` int unsigned NOT NULL AUTO_INCREMENT,
718+
test1 bit(1),
719+
test2 point,
720+
test3 binary(16),
721+
PRIMARY KEY (id)
722+
);
723+
''')
724+
725+
mysql.execute(
726+
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test3) VALUES "
727+
f"(0, POINT(10.0, 20.0), 'azaza');",
728+
commit=True,
729+
)
730+
731+
binlog_replicator_runner = BinlogReplicatorRunner()
732+
binlog_replicator_runner.run()
733+
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
734+
db_replicator_runner.run()
735+
736+
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
737+
738+
ch.execute_command(f'USE {TEST_DB_NAME}')
739+
740+
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
741+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)
742+
743+
mysql.execute(
744+
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES "
745+
f"(1, POINT(15.0, 14.0));",
746+
commit=True,
747+
)
748+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)
749+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, 'test1=True')) == 1)
750+
751+
assert ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test2']['x'] == 15.0
752+
assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test2']['y'] == 20.0
753+
assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test3'] == 'azaza\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
754+
755+
mysql.execute(
756+
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES "
757+
f"(0, NULL);",
758+
commit=True,
759+
)
760+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
761+
762+
697763
def test_json():
698764
cfg = config.Settings()
699765
cfg.load(CONFIG_FILE)

0 commit comments

Comments
 (0)