diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index 95eb5c7..bbbb7db 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -1,3 +1,4 @@ +import struct import json import sqlparse import re @@ -27,6 +28,58 @@ def convert_bytes(obj): return obj +def parse_mysql_point(binary): + """ + Parses the binary representation of a MySQL POINT data type + and returns a tuple (x, y) representing the coordinates. + + :param binary: The binary data representing the POINT. + :return: A tuple (x, y) with the coordinate values. + """ + if binary is None: + return 0, 0 + + if len(binary) == 21: + # No SRID. Proceed as per WKB POINT + # Read the byte order + byte_order = binary[0] + if byte_order == 0: + endian = '>' + elif byte_order == 1: + endian = '<' + else: + raise ValueError("Invalid byte order in WKB POINT") + # Read the WKB Type + wkb_type = struct.unpack(endian + 'I', binary[1:5])[0] + if wkb_type != 1: # WKB type 1 means POINT + raise ValueError("Not a WKB POINT type") + # Read X and Y coordinates + x = struct.unpack(endian + 'd', binary[5:13])[0] + y = struct.unpack(endian + 'd', binary[13:21])[0] + elif len(binary) == 25: + # With SRID included + # First 4 bytes are the SRID + srid = struct.unpack('>I', binary[0:4])[0] # SRID is big-endian + # Next byte is byte order + byte_order = binary[4] + if byte_order == 0: + endian = '>' + elif byte_order == 1: + endian = '<' + else: + raise ValueError("Invalid byte order in WKB POINT") + # Read the WKB Type + wkb_type = struct.unpack(endian + 'I', binary[5:9])[0] + if wkb_type != 1: # WKB type 1 means POINT + raise ValueError("Not a WKB POINT type") + # Read X and Y coordinates + x = struct.unpack(endian + 'd', binary[9:17])[0] + y = struct.unpack(endian + 'd', binary[17:25])[0] + else: + raise ValueError("Invalid binary length for WKB POINT") + return (x, y) + + def strip_sql_name(name): name = name.strip() if name.startswith('`'): @@ -64,9 +117,11 @@ def __init__(self, db_replicator: 'DbReplicator' = None): self.db_replicator = db_replicator def convert_type(self, mysql_type, parameters): - is_unsigned = 'unsigned' in parameters.lower() + if mysql_type == 'point': + return 'Tuple(x Float32, y Float32)' + if mysql_type == 'int': if is_unsigned: return 'UInt32' @@ -146,6 +201,8 @@ def convert_field_type(self, mysql_type, mysql_parameters): mysql_parameters = mysql_parameters.lower() not_null = 'not null' in mysql_parameters clickhouse_type = self.convert_type(mysql_type, mysql_parameters) + if 'Tuple' in clickhouse_type: + not_null = True if not not_null: clickhouse_type = f'Nullable({clickhouse_type})' return clickhouse_type @@ -197,6 +254,9 @@ def convert_record(self, mysql_record, mysql_field_types, clickhouse_field_types if 'UInt64' in clickhouse_field_type and clickhouse_field_value < 0: clickhouse_field_value = 18446744073709551616 + clickhouse_field_value + if 'point' in mysql_field_type: + clickhouse_field_value = parse_mysql_point(clickhouse_field_value) + clickhouse_record.append(clickhouse_field_value) return tuple(clickhouse_record) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index da31971..0d3f0f8 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -716,13 +716,14 @@ def test_different_types_2(): CREATE TABLE {TEST_TABLE_NAME} ( `id` int unsigned NOT NULL AUTO_INCREMENT, test1 bit(1), + test2 point, PRIMARY KEY (id) ); ''') mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (test1) VALUES " - f"(0);", + f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES " + f"(0, POINT(10.0, 20.0));", commit=True, ) @@ -739,13 +740,23 @@ def test_different_types_2(): assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (test1) VALUES " - f"(1);", + f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES " + f"(1, POINT(15.0, 14.0));", commit=True, ) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, 'test1=True')) == 1) + assert ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test2']['x'] == 15.0 + assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test2']['y'] == 20.0 + + mysql.execute( + f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES " + f"(0, NULL);", + commit=True, + ) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) + def test_json(): cfg = config.Settings()