Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import struct
import json
import sqlparse
import re
Expand Down Expand Up @@ -27,6 +28,58 @@ def convert_bytes(obj):
return obj


def parse_mysql_point(binary):
"""
Parses the binary representation of a MySQL POINT data type
and returns a tuple (x, y) representing the coordinates.

:param binary: The binary data representing the POINT.
:return: A tuple (x, y) with the coordinate values.
"""
if binary is None:
return 0, 0

if len(binary) == 21:
# No SRID. Proceed as per WKB POINT
# Read the byte order
byte_order = binary[0]
if byte_order == 0:
endian = '>'
elif byte_order == 1:
endian = '<'
else:
raise ValueError("Invalid byte order in WKB POINT")
# Read the WKB Type
wkb_type = struct.unpack(endian + 'I', binary[1:5])[0]
if wkb_type != 1: # WKB type 1 means POINT
raise ValueError("Not a WKB POINT type")
# Read X and Y coordinates
x = struct.unpack(endian + 'd', binary[5:13])[0]
y = struct.unpack(endian + 'd', binary[13:21])[0]
elif len(binary) == 25:
# With SRID included
# First 4 bytes are the SRID
srid = struct.unpack('>I', binary[0:4])[0] # SRID is big-endian
# Next byte is byte order
byte_order = binary[4]
if byte_order == 0:
endian = '>'
elif byte_order == 1:
endian = '<'
else:
raise ValueError("Invalid byte order in WKB POINT")
# Read the WKB Type
wkb_type = struct.unpack(endian + 'I', binary[5:9])[0]
if wkb_type != 1: # WKB type 1 means POINT
raise ValueError("Not a WKB POINT type")
# Read X and Y coordinates
x = struct.unpack(endian + 'd', binary[9:17])[0]
y = struct.unpack(endian + 'd', binary[17:25])[0]
else:
raise ValueError("Invalid binary length for WKB POINT")
return (x, y)


def strip_sql_name(name):
name = name.strip()
if name.startswith('`'):
Expand Down Expand Up @@ -64,9 +117,11 @@ def __init__(self, db_replicator: 'DbReplicator' = None):
self.db_replicator = db_replicator

def convert_type(self, mysql_type, parameters):

is_unsigned = 'unsigned' in parameters.lower()

if mysql_type == 'point':
return 'Tuple(x Float32, y Float32)'

if mysql_type == 'int':
if is_unsigned:
return 'UInt32'
Expand Down Expand Up @@ -146,6 +201,8 @@ def convert_field_type(self, mysql_type, mysql_parameters):
mysql_parameters = mysql_parameters.lower()
not_null = 'not null' in mysql_parameters
clickhouse_type = self.convert_type(mysql_type, mysql_parameters)
if 'Tuple' in clickhouse_type:
not_null = True
if not not_null:
clickhouse_type = f'Nullable({clickhouse_type})'
return clickhouse_type
Expand Down Expand Up @@ -197,6 +254,9 @@ def convert_record(self, mysql_record, mysql_field_types, clickhouse_field_types
if 'UInt64' in clickhouse_field_type and clickhouse_field_value < 0:
clickhouse_field_value = 18446744073709551616 + clickhouse_field_value

if 'point' in mysql_field_type:
clickhouse_field_value = parse_mysql_point(clickhouse_field_value)

clickhouse_record.append(clickhouse_field_value)
return tuple(clickhouse_record)

Expand Down
19 changes: 15 additions & 4 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,13 +716,14 @@ def test_different_types_2():
CREATE TABLE {TEST_TABLE_NAME} (
`id` int unsigned NOT NULL AUTO_INCREMENT,
test1 bit(1),
test2 point,
PRIMARY KEY (id)
);
''')

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (test1) VALUES "
f"(0);",
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES "
f"(0, POINT(10.0, 20.0));",
commit=True,
)

Expand All @@ -739,13 +740,23 @@ def test_different_types_2():
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (test1) VALUES "
f"(1);",
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES "
f"(1, POINT(15.0, 14.0));",
commit=True,
)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, 'test1=True')) == 1)

assert ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test2']['x'] == 15.0
assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test2']['y'] == 20.0

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES "
f"(0, NULL);",
commit=True,
)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)


def test_json():
cfg = config.Settings()
Expand Down
Loading