Skip to content

Commit c732930

Browse files
authored
Polygon type support (#129)
1 parent 40276b1 commit c732930

File tree

2 files changed

+198
-0
lines changed

2 files changed

+198
-0
lines changed

mysql_ch_replicator/converter.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,55 @@ def parse_mysql_point(binary):
133133
return (x, y)
134134

135135

136+
def parse_mysql_polygon(binary):
137+
"""
138+
Parses the binary representation of a MySQL POLYGON data type
139+
and returns a list of tuples [(x1,y1), (x2,y2), ...] representing the polygon vertices.
140+
141+
:param binary: The binary data representing the POLYGON.
142+
:return: A list of tuples with the coordinate values.
143+
"""
144+
if binary is None:
145+
return []
146+
147+
# Determine if SRID is present (25 bytes for header with SRID, 21 without)
148+
has_srid = len(binary) > 25
149+
offset = 4 if has_srid else 0
150+
151+
# Read byte order
152+
byte_order = binary[offset]
153+
if byte_order == 0:
154+
endian = '>'
155+
elif byte_order == 1:
156+
endian = '<'
157+
else:
158+
raise ValueError("Invalid byte order in WKB POLYGON")
159+
160+
# Read WKB Type
161+
wkb_type = struct.unpack(endian + 'I', binary[offset + 1:offset + 5])[0]
162+
if wkb_type != 3: # WKB type 3 means POLYGON
163+
raise ValueError("Not a WKB POLYGON type")
164+
165+
# Read number of rings (polygons can have holes)
166+
num_rings = struct.unpack(endian + 'I', binary[offset + 5:offset + 9])[0]
167+
if num_rings == 0:
168+
return []
169+
170+
# Read the first ring (outer boundary)
171+
ring_offset = offset + 9
172+
num_points = struct.unpack(endian + 'I', binary[ring_offset:ring_offset + 4])[0]
173+
points = []
174+
175+
# Read each point in the ring
176+
for i in range(num_points):
177+
point_offset = ring_offset + 4 + (i * 16) # 16 bytes per point (8 for x, 8 for y)
178+
x = struct.unpack(endian + 'd', binary[point_offset:point_offset + 8])[0]
179+
y = struct.unpack(endian + 'd', binary[point_offset + 8:point_offset + 16])[0]
180+
points.append((x, y))
181+
182+
return points
183+
184+
136185
def strip_sql_name(name):
137186
name = name.strip()
138187
if name.startswith('`'):
@@ -201,6 +250,9 @@ def convert_type(self, mysql_type, parameters):
201250
if mysql_type == 'point':
202251
return 'Tuple(x Float32, y Float32)'
203252

253+
if mysql_type == 'polygon':
254+
return 'Array(Tuple(x Float32, y Float32))'
255+
204256
# Correctly handle numeric types
205257
if mysql_type.startswith('numeric'):
206258
# Determine if parameters are specified via parentheses:
@@ -433,6 +485,9 @@ def convert_record(
433485
if mysql_field_type.startswith('point'):
434486
clickhouse_field_value = parse_mysql_point(clickhouse_field_value)
435487

488+
if mysql_field_type.startswith('polygon'):
489+
clickhouse_field_value = parse_mysql_polygon(clickhouse_field_value)
490+
436491
if mysql_field_type.startswith('enum('):
437492
enum_values = mysql_structure.fields[idx].additional_data
438493
field_name = mysql_structure.fields[idx].name if idx < len(mysql_structure.fields) else "unknown"

test_mysql_ch_replicator.py

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1620,6 +1620,149 @@ def test_enum_conversion():
16201620
assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME))
16211621
assert('Traceback' not in read_logs(TEST_DB_NAME))
16221622

1623+
1624+
def test_polygon_type():
1625+
"""
1626+
Test that polygon type is properly converted and handled between MySQL and ClickHouse.
1627+
Tests both the type conversion and data handling for polygon values.
1628+
"""
1629+
config_file = CONFIG_FILE
1630+
cfg = config.Settings()
1631+
cfg.load(config_file)
1632+
mysql_config = cfg.mysql
1633+
clickhouse_config = cfg.clickhouse
1634+
mysql = mysql_api.MySQLApi(
1635+
database=None,
1636+
mysql_settings=mysql_config
1637+
)
1638+
ch = clickhouse_api.ClickhouseApi(
1639+
database=TEST_DB_NAME,
1640+
clickhouse_settings=clickhouse_config
1641+
)
1642+
1643+
prepare_env(cfg, mysql, ch)
1644+
1645+
# Create a table with polygon type
1646+
mysql.execute(f'''
1647+
CREATE TABLE `{TEST_TABLE_NAME}` (
1648+
id INT NOT NULL AUTO_INCREMENT,
1649+
name VARCHAR(50) NOT NULL,
1650+
area POLYGON NOT NULL,
1651+
nullable_area POLYGON,
1652+
PRIMARY KEY (id)
1653+
)
1654+
''')
1655+
1656+
# Insert test data with polygons
1657+
# Using ST_GeomFromText to create polygons from WKT (Well-Known Text) format
1658+
mysql.execute(f'''
1659+
INSERT INTO `{TEST_TABLE_NAME}` (name, area, nullable_area) VALUES
1660+
('Square', ST_GeomFromText('POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'), ST_GeomFromText('POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))')),
1661+
('Triangle', ST_GeomFromText('POLYGON((0 0, 1 0, 0.5 1, 0 0))'), NULL),
1662+
('Complex', ST_GeomFromText('POLYGON((0 0, 0 3, 3 3, 3 0, 0 0))'), ST_GeomFromText('POLYGON((1 1, 1 2, 2 2, 2 1, 1 1))'));
1663+
''', commit=True)
1664+
1665+
run_all_runner = RunAllRunner(cfg_file=config_file)
1666+
run_all_runner.run()
1667+
1668+
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
1669+
ch.execute_command(f'USE `{TEST_DB_NAME}`')
1670+
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
1671+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
1672+
1673+
# Get the ClickHouse data
1674+
results = ch.select(TEST_TABLE_NAME)
1675+
1676+
# Verify the data
1677+
assert len(results) == 3
1678+
1679+
# Check first row (Square)
1680+
assert results[0]['name'] == 'Square'
1681+
assert len(results[0]['area']) == 5 # Square has 5 points (including closing point)
1682+
assert len(results[0]['nullable_area']) == 5
1683+
# Verify some specific points
1684+
assert results[0]['area'][0] == {'x': 0.0, 'y': 0.0}
1685+
assert results[0]['area'][1] == {'x': 0.0, 'y': 1.0}
1686+
assert results[0]['area'][2] == {'x': 1.0, 'y': 1.0}
1687+
assert results[0]['area'][3] == {'x': 1.0, 'y': 0.0}
1688+
assert results[0]['area'][4] == {'x': 0.0, 'y': 0.0} # Closing point
1689+
1690+
# Check second row (Triangle)
1691+
assert results[1]['name'] == 'Triangle'
1692+
assert len(results[1]['area']) == 4 # Triangle has 4 points (including closing point)
1693+
assert results[1]['nullable_area'] == [] # NULL values are returned as empty list
1694+
# Verify some specific points
1695+
assert results[1]['area'][0] == {'x': 0.0, 'y': 0.0}
1696+
assert results[1]['area'][1] == {'x': 1.0, 'y': 0.0}
1697+
assert results[1]['area'][2] == {'x': 0.5, 'y': 1.0}
1698+
assert results[1]['area'][3] == {'x': 0.0, 'y': 0.0} # Closing point
1699+
1700+
# Check third row (Complex)
1701+
assert results[2]['name'] == 'Complex'
1702+
assert len(results[2]['area']) == 5 # Outer square
1703+
assert len(results[2]['nullable_area']) == 5 # Inner square
1704+
# Verify some specific points
1705+
assert results[2]['area'][0] == {'x': 0.0, 'y': 0.0}
1706+
assert results[2]['area'][2] == {'x': 3.0, 'y': 3.0}
1707+
assert results[2]['nullable_area'][0] == {'x': 1.0, 'y': 1.0}
1708+
assert results[2]['nullable_area'][2] == {'x': 2.0, 'y': 2.0}
1709+
1710+
# Test realtime replication by adding more records
1711+
mysql.execute(f'''
1712+
INSERT INTO `{TEST_TABLE_NAME}` (name, area, nullable_area) VALUES
1713+
('Pentagon', ST_GeomFromText('POLYGON((0 0, 1 0, 1.5 1, 0.5 1.5, 0 0))'), ST_GeomFromText('POLYGON((0.2 0.2, 0.8 0.2, 1 0.8, 0.5 1, 0.2 0.2))')),
1714+
('Hexagon', ST_GeomFromText('POLYGON((0 0, 1 0, 1.5 0.5, 1 1, 0.5 1, 0 0))'), NULL),
1715+
('Circle', ST_GeomFromText('POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))'), ST_GeomFromText('POLYGON((0.5 0.5, 0.5 1.5, 1.5 1.5, 1.5 0.5, 0.5 0.5))'));
1716+
''', commit=True)
1717+
1718+
# Wait for new records to be replicated
1719+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 6)
1720+
1721+
# Verify the new records using WHERE clauses
1722+
# Check Pentagon
1723+
pentagon = ch.select(TEST_TABLE_NAME, where="name='Pentagon'")[0]
1724+
assert pentagon['name'] == 'Pentagon'
1725+
assert len(pentagon['area']) == 5 # Pentagon has 5 points
1726+
assert len(pentagon['nullable_area']) == 5 # Inner pentagon
1727+
assert abs(pentagon['area'][0]['x'] - 0.0) < 1e-6
1728+
assert abs(pentagon['area'][0]['y'] - 0.0) < 1e-6
1729+
assert abs(pentagon['area'][2]['x'] - 1.5) < 1e-6
1730+
assert abs(pentagon['area'][2]['y'] - 1.0) < 1e-6
1731+
assert abs(pentagon['nullable_area'][0]['x'] - 0.2) < 1e-6
1732+
assert abs(pentagon['nullable_area'][0]['y'] - 0.2) < 1e-6
1733+
assert abs(pentagon['nullable_area'][2]['x'] - 1.0) < 1e-6
1734+
assert abs(pentagon['nullable_area'][2]['y'] - 0.8) < 1e-6
1735+
1736+
# Check Hexagon
1737+
hexagon = ch.select(TEST_TABLE_NAME, where="name='Hexagon'")[0]
1738+
assert hexagon['name'] == 'Hexagon'
1739+
assert len(hexagon['area']) == 6 # Hexagon has 6 points
1740+
assert hexagon['nullable_area'] == [] # NULL values are returned as empty list
1741+
assert abs(hexagon['area'][0]['x'] - 0.0) < 1e-6
1742+
assert abs(hexagon['area'][0]['y'] - 0.0) < 1e-6
1743+
assert abs(hexagon['area'][2]['x'] - 1.5) < 1e-6
1744+
assert abs(hexagon['area'][2]['y'] - 0.5) < 1e-6
1745+
assert abs(hexagon['area'][4]['x'] - 0.5) < 1e-6
1746+
assert abs(hexagon['area'][4]['y'] - 1.0) < 1e-6
1747+
1748+
# Check Circle
1749+
circle = ch.select(TEST_TABLE_NAME, where="name='Circle'")[0]
1750+
assert circle['name'] == 'Circle'
1751+
assert len(circle['area']) == 5 # Outer square
1752+
assert len(circle['nullable_area']) == 5 # Inner square
1753+
assert abs(circle['area'][0]['x'] - 0.0) < 1e-6
1754+
assert abs(circle['area'][0]['y'] - 0.0) < 1e-6
1755+
assert abs(circle['area'][2]['x'] - 2.0) < 1e-6
1756+
assert abs(circle['area'][2]['y'] - 2.0) < 1e-6
1757+
assert abs(circle['nullable_area'][0]['x'] - 0.5) < 1e-6
1758+
assert abs(circle['nullable_area'][0]['y'] - 0.5) < 1e-6
1759+
assert abs(circle['nullable_area'][2]['x'] - 1.5) < 1e-6
1760+
assert abs(circle['nullable_area'][2]['y'] - 1.5) < 1e-6
1761+
1762+
run_all_runner.stop()
1763+
assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME))
1764+
assert('Traceback' not in read_logs(TEST_DB_NAME))
1765+
16231766
@pytest.mark.parametrize("query,expected", [
16241767
("CREATE TABLE `mydb`.`mytable` (id INT)", "mydb"),
16251768
("CREATE TABLE mydb.mytable (id INT)", "mydb"),

0 commit comments

Comments
 (0)