Skip to content

Commit 2480e29

Browse files
authored
Added multipolygon support, related to #183 (#185)
1 parent 035fe46 commit 2480e29

File tree

2 files changed

+149
-9
lines changed

2 files changed

+149
-9
lines changed

mysql_ch_replicator/converter.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,93 @@ def parse_mysql_polygon(binary):
182182
return points
183183

184184

185+
def parse_mysql_multipolygon(binary):
186+
"""
187+
Parses the binary representation of a MySQL MULTIPOLYGON data type
188+
and returns a list of polygons, where each polygon is a list of tuples
189+
[(x1,y1), (x2,y2), ...] representing the polygon vertices.
190+
191+
:param binary: The binary data representing the MULTIPOLYGON.
192+
:return: A list of lists of tuples with the coordinate values.
193+
"""
194+
if binary is None:
195+
return []
196+
197+
# Determine if SRID is present
198+
has_srid = len(binary) > 25
199+
offset = 4 if has_srid else 0
200+
201+
# Read byte order
202+
byte_order = binary[offset]
203+
if byte_order == 0:
204+
endian = '>'
205+
elif byte_order == 1:
206+
endian = '<'
207+
else:
208+
raise ValueError("Invalid byte order in WKB MULTIPOLYGON")
209+
210+
# Read WKB Type
211+
wkb_type = struct.unpack(endian + 'I', binary[offset + 1:offset + 5])[0]
212+
if wkb_type != 6: # WKB type 6 means MULTIPOLYGON
213+
raise ValueError("Not a WKB MULTIPOLYGON type")
214+
215+
# Read number of polygons
216+
num_polygons = struct.unpack(endian + 'I', binary[offset + 5:offset + 9])[0]
217+
if num_polygons == 0:
218+
return []
219+
220+
polygons = []
221+
current_offset = offset + 9
222+
223+
for polygon_idx in range(num_polygons):
224+
# Each polygon starts with its own WKB header
225+
# Read byte order for this polygon
226+
polygon_byte_order = binary[current_offset]
227+
if polygon_byte_order == 0:
228+
polygon_endian = '>'
229+
elif polygon_byte_order == 1:
230+
polygon_endian = '<'
231+
else:
232+
raise ValueError("Invalid byte order in WKB POLYGON within MULTIPOLYGON")
233+
234+
# Read WKB Type for this polygon
235+
polygon_wkb_type = struct.unpack(polygon_endian + 'I', binary[current_offset + 1:current_offset + 5])[0]
236+
if polygon_wkb_type != 3: # WKB type 3 means POLYGON
237+
raise ValueError("Not a WKB POLYGON type within MULTIPOLYGON")
238+
239+
# Read number of rings for this polygon
240+
num_rings = struct.unpack(polygon_endian + 'I', binary[current_offset + 5:current_offset + 9])[0]
241+
if num_rings == 0:
242+
polygons.append([])
243+
current_offset += 9
244+
continue
245+
246+
# Read the first ring (outer boundary) of this polygon
247+
ring_offset = current_offset + 9
248+
num_points = struct.unpack(polygon_endian + 'I', binary[ring_offset:ring_offset + 4])[0]
249+
points = []
250+
251+
# Read each point in the ring
252+
for i in range(num_points):
253+
point_offset = ring_offset + 4 + (i * 16) # 16 bytes per point (8 for x, 8 for y)
254+
x = struct.unpack(polygon_endian + 'd', binary[point_offset:point_offset + 8])[0]
255+
y = struct.unpack(polygon_endian + 'd', binary[point_offset + 8:point_offset + 16])[0]
256+
points.append((x, y))
257+
258+
polygons.append(points)
259+
260+
# Move to next polygon
261+
# Skip the current polygon's data: header (9 bytes) + ring header (4 bytes) + points (16 bytes each)
262+
current_offset = ring_offset + 4 + (num_points * 16)
263+
264+
# Skip any additional rings (holes) for this polygon
265+
for ring_idx in range(1, num_rings):
266+
ring_num_points = struct.unpack(polygon_endian + 'I', binary[current_offset:current_offset + 4])[0]
267+
current_offset += 4 + (ring_num_points * 16)
268+
269+
return polygons
270+
271+
185272
def strip_sql_name(name):
186273
name = name.strip()
187274
if name.startswith('`'):
@@ -261,6 +348,9 @@ def convert_type(self, mysql_type, parameters):
261348
if mysql_type == 'polygon':
262349
return 'Array(Tuple(x Float32, y Float32))'
263350

351+
if mysql_type == 'multipolygon':
352+
return 'Array(Array(Tuple(x Float32, y Float32)))'
353+
264354
# Correctly handle numeric types
265355
if mysql_type.startswith('numeric'):
266356
# Determine if parameters are specified via parentheses:
@@ -501,6 +591,9 @@ def convert_record(
501591
if mysql_field_type.startswith('polygon'):
502592
clickhouse_field_value = parse_mysql_polygon(clickhouse_field_value)
503593

594+
if mysql_field_type.startswith('multipolygon'):
595+
clickhouse_field_value = parse_mysql_multipolygon(clickhouse_field_value)
596+
504597
if mysql_field_type.startswith('enum('):
505598
enum_values = mysql_structure.fields[idx].additional_data
506599
field_name = mysql_structure.fields[idx].name if idx < len(mysql_structure.fields) else "unknown"

test_mysql_ch_replicator.py

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1734,24 +1734,25 @@ def test_polygon_type():
17341734

17351735
prepare_env(cfg, mysql, ch)
17361736

1737-
# Create a table with polygon type
1737+
# Create a table with polygon and multipolygon types
17381738
mysql.execute(f'''
17391739
CREATE TABLE `{TEST_TABLE_NAME}` (
17401740
id INT NOT NULL AUTO_INCREMENT,
17411741
name VARCHAR(50) NOT NULL,
17421742
area POLYGON NOT NULL,
17431743
nullable_area POLYGON,
1744+
multi_area MULTIPOLYGON,
17441745
PRIMARY KEY (id)
17451746
)
17461747
''')
17471748

17481749
# Insert test data with polygons
17491750
# Using ST_GeomFromText to create polygons from WKT (Well-Known Text) format
17501751
mysql.execute(f'''
1751-
INSERT INTO `{TEST_TABLE_NAME}` (name, area, nullable_area) VALUES
1752-
('Square', ST_GeomFromText('POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'), ST_GeomFromText('POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))')),
1753-
('Triangle', ST_GeomFromText('POLYGON((0 0, 1 0, 0.5 1, 0 0))'), NULL),
1754-
('Complex', ST_GeomFromText('POLYGON((0 0, 0 3, 3 3, 3 0, 0 0))'), ST_GeomFromText('POLYGON((1 1, 1 2, 2 2, 2 1, 1 1))'));
1752+
INSERT INTO `{TEST_TABLE_NAME}` (name, area, nullable_area, multi_area) VALUES
1753+
('Square', ST_GeomFromText('POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'), ST_GeomFromText('POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))'), NULL),
1754+
('Triangle', ST_GeomFromText('POLYGON((0 0, 1 0, 0.5 1, 0 0))'), NULL, NULL),
1755+
('Complex', ST_GeomFromText('POLYGON((0 0, 0 3, 3 3, 3 0, 0 0))'), ST_GeomFromText('POLYGON((1 1, 1 2, 2 2, 2 1, 1 1))'), NULL);
17551756
''', commit=True)
17561757

17571758
run_all_runner = RunAllRunner(cfg_file=config_file)
@@ -1772,6 +1773,7 @@ def test_polygon_type():
17721773
assert results[0]['name'] == 'Square'
17731774
assert len(results[0]['area']) == 5 # Square has 5 points (including closing point)
17741775
assert len(results[0]['nullable_area']) == 5
1776+
assert results[0]['multi_area'] == [] # NULL multipolygon values are returned as empty list
17751777
# Verify some specific points
17761778
assert results[0]['area'][0] == {'x': 0.0, 'y': 0.0}
17771779
assert results[0]['area'][1] == {'x': 0.0, 'y': 1.0}
@@ -1783,6 +1785,7 @@ def test_polygon_type():
17831785
assert results[1]['name'] == 'Triangle'
17841786
assert len(results[1]['area']) == 4 # Triangle has 4 points (including closing point)
17851787
assert results[1]['nullable_area'] == [] # NULL values are returned as empty list
1788+
assert results[1]['multi_area'] == [] # NULL multipolygon values are returned as empty list
17861789
# Verify some specific points
17871790
assert results[1]['area'][0] == {'x': 0.0, 'y': 0.0}
17881791
assert results[1]['area'][1] == {'x': 1.0, 'y': 0.0}
@@ -1793,6 +1796,7 @@ def test_polygon_type():
17931796
assert results[2]['name'] == 'Complex'
17941797
assert len(results[2]['area']) == 5 # Outer square
17951798
assert len(results[2]['nullable_area']) == 5 # Inner square
1799+
assert results[2]['multi_area'] == [] # NULL multipolygon values are returned as empty list
17961800
# Verify some specific points
17971801
assert results[2]['area'][0] == {'x': 0.0, 'y': 0.0}
17981802
assert results[2]['area'][2] == {'x': 3.0, 'y': 3.0}
@@ -1801,10 +1805,10 @@ def test_polygon_type():
18011805

18021806
# Test realtime replication by adding more records
18031807
mysql.execute(f'''
1804-
INSERT INTO `{TEST_TABLE_NAME}` (name, area, nullable_area) VALUES
1805-
('Pentagon', ST_GeomFromText('POLYGON((0 0, 1 0, 1.5 1, 0.5 1.5, 0 0))'), ST_GeomFromText('POLYGON((0.2 0.2, 0.8 0.2, 1 0.8, 0.5 1, 0.2 0.2))')),
1806-
('Hexagon', ST_GeomFromText('POLYGON((0 0, 1 0, 1.5 0.5, 1 1, 0.5 1, 0 0))'), NULL),
1807-
('Circle', ST_GeomFromText('POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))'), ST_GeomFromText('POLYGON((0.5 0.5, 0.5 1.5, 1.5 1.5, 1.5 0.5, 0.5 0.5))'));
1808+
INSERT INTO `{TEST_TABLE_NAME}` (name, area, nullable_area, multi_area) VALUES
1809+
('Pentagon', ST_GeomFromText('POLYGON((0 0, 1 0, 1.5 1, 0.5 1.5, 0 0))'), ST_GeomFromText('POLYGON((0.2 0.2, 0.8 0.2, 1 0.8, 0.5 1, 0.2 0.2))'), NULL),
1810+
('Hexagon', ST_GeomFromText('POLYGON((0 0, 1 0, 1.5 0.5, 1 1, 0.5 1, 0 0))'), NULL, NULL),
1811+
('Circle', ST_GeomFromText('POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))'), ST_GeomFromText('POLYGON((0.5 0.5, 0.5 1.5, 1.5 1.5, 1.5 0.5, 0.5 0.5))'), NULL);
18081812
''', commit=True)
18091813

18101814
# Wait for new records to be replicated
@@ -1816,6 +1820,7 @@ def test_polygon_type():
18161820
assert pentagon['name'] == 'Pentagon'
18171821
assert len(pentagon['area']) == 5 # Pentagon has 5 points
18181822
assert len(pentagon['nullable_area']) == 5 # Inner pentagon
1823+
assert pentagon['multi_area'] == [] # NULL multipolygon values are returned as empty list
18191824
assert abs(pentagon['area'][0]['x'] - 0.0) < 1e-6
18201825
assert abs(pentagon['area'][0]['y'] - 0.0) < 1e-6
18211826
assert abs(pentagon['area'][2]['x'] - 1.5) < 1e-6
@@ -1830,6 +1835,7 @@ def test_polygon_type():
18301835
assert hexagon['name'] == 'Hexagon'
18311836
assert len(hexagon['area']) == 6 # Hexagon has 6 points
18321837
assert hexagon['nullable_area'] == [] # NULL values are returned as empty list
1838+
assert hexagon['multi_area'] == [] # NULL multipolygon values are returned as empty list
18331839
assert abs(hexagon['area'][0]['x'] - 0.0) < 1e-6
18341840
assert abs(hexagon['area'][0]['y'] - 0.0) < 1e-6
18351841
assert abs(hexagon['area'][2]['x'] - 1.5) < 1e-6
@@ -1842,6 +1848,7 @@ def test_polygon_type():
18421848
assert circle['name'] == 'Circle'
18431849
assert len(circle['area']) == 5 # Outer square
18441850
assert len(circle['nullable_area']) == 5 # Inner square
1851+
assert circle['multi_area'] == [] # NULL multipolygon values are returned as empty list
18451852
assert abs(circle['area'][0]['x'] - 0.0) < 1e-6
18461853
assert abs(circle['area'][0]['y'] - 0.0) < 1e-6
18471854
assert abs(circle['area'][2]['x'] - 2.0) < 1e-6
@@ -1851,6 +1858,46 @@ def test_polygon_type():
18511858
assert abs(circle['nullable_area'][2]['x'] - 1.5) < 1e-6
18521859
assert abs(circle['nullable_area'][2]['y'] - 1.5) < 1e-6
18531860

1861+
# Test multipolygon type - insert a record with multipolygon data
1862+
mysql.execute(f'''
1863+
INSERT INTO `{TEST_TABLE_NAME}` (name, area, nullable_area, multi_area) VALUES
1864+
('MultiSquares',
1865+
ST_GeomFromText('POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'),
1866+
NULL,
1867+
ST_GeomFromText('MULTIPOLYGON(((0 0, 0 1, 1 1, 1 0, 0 0)), ((2 2, 2 3, 3 3, 3 2, 2 2)))')
1868+
);
1869+
''', commit=True)
1870+
1871+
# Wait for the new record with multipolygon to be replicated
1872+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 7)
1873+
1874+
# Verify the multipolygon data
1875+
multi_squares = ch.select(TEST_TABLE_NAME, where="name='MultiSquares'")[0]
1876+
assert multi_squares['name'] == 'MultiSquares'
1877+
1878+
# Check that multi_area contains multiple polygons
1879+
# The multipolygon should be represented as an array of polygon arrays
1880+
assert isinstance(multi_squares['multi_area'], list)
1881+
assert len(multi_squares['multi_area']) == 2 # Two polygons in the multipolygon
1882+
1883+
# Check first polygon in multipolygon
1884+
first_polygon = multi_squares['multi_area'][0]
1885+
assert len(first_polygon) == 5 # Square has 5 points (including closing point)
1886+
assert first_polygon[0] == {'x': 0.0, 'y': 0.0}
1887+
assert first_polygon[1] == {'x': 0.0, 'y': 1.0}
1888+
assert first_polygon[2] == {'x': 1.0, 'y': 1.0}
1889+
assert first_polygon[3] == {'x': 1.0, 'y': 0.0}
1890+
assert first_polygon[4] == {'x': 0.0, 'y': 0.0} # Closing point
1891+
1892+
# Check second polygon in multipolygon
1893+
second_polygon = multi_squares['multi_area'][1]
1894+
assert len(second_polygon) == 5 # Square has 5 points (including closing point)
1895+
assert second_polygon[0] == {'x': 2.0, 'y': 2.0}
1896+
assert second_polygon[1] == {'x': 2.0, 'y': 3.0}
1897+
assert second_polygon[2] == {'x': 3.0, 'y': 3.0}
1898+
assert second_polygon[3] == {'x': 3.0, 'y': 2.0}
1899+
assert second_polygon[4] == {'x': 2.0, 'y': 2.0} # Closing point
1900+
18541901
run_all_runner.stop()
18551902
assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME))
18561903
assert('Traceback' not in read_logs(TEST_DB_NAME))

0 commit comments

Comments
 (0)