Skip to content

Commit 8ce8453

Browse files
committed
Point data type support
1 parent 458351c commit 8ce8453

File tree

2 files changed

+76
-5
lines changed

2 files changed

+76
-5
lines changed

mysql_ch_replicator/converter.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import struct
12
import json
23
import sqlparse
34
import re
@@ -27,6 +28,58 @@ def convert_bytes(obj):
2728
return obj
2829

2930

31+
def parse_mysql_point(binary):
32+
"""
33+
Parses the binary representation of a MySQL POINT data type
34+
and returns a tuple (x, y) representing the coordinates.
35+
36+
:param binary: The binary data representing the POINT.
37+
:return: A tuple (x, y) with the coordinate values.
38+
"""
39+
if binary is None:
40+
return 0, 0
41+
42+
if len(binary) == 21:
43+
# No SRID. Proceed as per WKB POINT
44+
# Read the byte order
45+
byte_order = binary[0]
46+
if byte_order == 0:
47+
endian = '>'
48+
elif byte_order == 1:
49+
endian = '<'
50+
else:
51+
raise ValueError("Invalid byte order in WKB POINT")
52+
# Read the WKB Type
53+
wkb_type = struct.unpack(endian + 'I', binary[1:5])[0]
54+
if wkb_type != 1: # WKB type 1 means POINT
55+
raise ValueError("Not a WKB POINT type")
56+
# Read X and Y coordinates
57+
x = struct.unpack(endian + 'd', binary[5:13])[0]
58+
y = struct.unpack(endian + 'd', binary[13:21])[0]
59+
elif len(binary) == 25:
60+
# With SRID included
61+
# First 4 bytes are the SRID
62+
srid = struct.unpack('>I', binary[0:4])[0] # SRID is big-endian
63+
# Next byte is byte order
64+
byte_order = binary[4]
65+
if byte_order == 0:
66+
endian = '>'
67+
elif byte_order == 1:
68+
endian = '<'
69+
else:
70+
raise ValueError("Invalid byte order in WKB POINT")
71+
# Read the WKB Type
72+
wkb_type = struct.unpack(endian + 'I', binary[5:9])[0]
73+
if wkb_type != 1: # WKB type 1 means POINT
74+
raise ValueError("Not a WKB POINT type")
75+
# Read X and Y coordinates
76+
x = struct.unpack(endian + 'd', binary[9:17])[0]
77+
y = struct.unpack(endian + 'd', binary[17:25])[0]
78+
else:
79+
raise ValueError("Invalid binary length for WKB POINT")
80+
return (x, y)
81+
82+
3083
def strip_sql_name(name):
3184
name = name.strip()
3285
if name.startswith('`'):
@@ -64,9 +117,11 @@ def __init__(self, db_replicator: 'DbReplicator' = None):
64117
self.db_replicator = db_replicator
65118

66119
def convert_type(self, mysql_type, parameters):
67-
68120
is_unsigned = 'unsigned' in parameters.lower()
69121

122+
if mysql_type == 'point':
123+
return 'Tuple(x Float32, y Float32)'
124+
70125
if mysql_type == 'int':
71126
if is_unsigned:
72127
return 'UInt32'
@@ -146,6 +201,8 @@ def convert_field_type(self, mysql_type, mysql_parameters):
146201
mysql_parameters = mysql_parameters.lower()
147202
not_null = 'not null' in mysql_parameters
148203
clickhouse_type = self.convert_type(mysql_type, mysql_parameters)
204+
if 'Tuple' in clickhouse_type:
205+
not_null = True
149206
if not not_null:
150207
clickhouse_type = f'Nullable({clickhouse_type})'
151208
return clickhouse_type
@@ -197,6 +254,9 @@ def convert_record(self, mysql_record, mysql_field_types, clickhouse_field_types
197254
if 'UInt64' in clickhouse_field_type and clickhouse_field_value < 0:
198255
clickhouse_field_value = 18446744073709551616 + clickhouse_field_value
199256

257+
if 'point' in mysql_field_type:
258+
clickhouse_field_value = parse_mysql_point(clickhouse_field_value)
259+
200260
clickhouse_record.append(clickhouse_field_value)
201261
return tuple(clickhouse_record)
202262

test_mysql_ch_replicator.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -716,13 +716,14 @@ def test_different_types_2():
716716
CREATE TABLE {TEST_TABLE_NAME} (
717717
`id` int unsigned NOT NULL AUTO_INCREMENT,
718718
test1 bit(1),
719+
test2 point,
719720
PRIMARY KEY (id)
720721
);
721722
''')
722723

723724
mysql.execute(
724-
f"INSERT INTO {TEST_TABLE_NAME} (test1) VALUES "
725-
f"(0);",
725+
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES "
726+
f"(0, POINT(10.0, 20.0));",
726727
commit=True,
727728
)
728729

@@ -739,13 +740,23 @@ def test_different_types_2():
739740
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)
740741

741742
mysql.execute(
742-
f"INSERT INTO {TEST_TABLE_NAME} (test1) VALUES "
743-
f"(1);",
743+
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES "
744+
f"(1, POINT(15.0, 14.0));",
744745
commit=True,
745746
)
746747
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)
747748
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, 'test1=True')) == 1)
748749

750+
assert ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test2']['x'] == 15.0
751+
assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test2']['y'] == 20.0
752+
753+
mysql.execute(
754+
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES "
755+
f"(0, NULL);",
756+
commit=True,
757+
)
758+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
759+
749760

750761
def test_json():
751762
cfg = config.Settings()

0 commit comments

Comments
 (0)