Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pyparsing import Suppress, CaselessKeyword, Word, alphas, alphanums, delimitedList

from .table_structure import TableStructure, TableField
from .converter_enum_parser import parse_mysql_enum


CHARSET_MYSQL_TO_PYTHON = {
Expand Down Expand Up @@ -239,8 +240,14 @@ def convert_type(self, mysql_type, parameters):
return 'String'
if 'varchar' in mysql_type:
return 'String'
if 'enum' in mysql_type:
return 'String'
if mysql_type.startswith('enum'):
enum_values = parse_mysql_enum(mysql_type)
ch_enum_values = []
for idx, value_name in enumerate(enum_values):
ch_enum_values.append(f"'{value_name}' = {idx+1}")
ch_enum_values = ', '.join(ch_enum_values)
# Enum8('red' = 1, 'green' = 2, 'black' = 3)
return f'Enum8({ch_enum_values})'
if 'text' in mysql_type:
return 'String'
if 'blob' in mysql_type:
Expand Down Expand Up @@ -376,9 +383,13 @@ def convert_record(
]
clickhouse_field_value = ','.join(clickhouse_field_value)

if 'point' in mysql_field_type:
if mysql_field_type.startswith('point'):
clickhouse_field_value = parse_mysql_point(clickhouse_field_value)

if mysql_field_type.startswith('enum(') and isinstance(clickhouse_field_value, int):
enum_values = mysql_structure.fields[idx].additional_data
clickhouse_field_value = enum_values[int(clickhouse_field_value)-1]

clickhouse_record.append(clickhouse_field_value)
return tuple(clickhouse_record)

Expand Down Expand Up @@ -745,6 +756,9 @@ def vstrip(e):
vals = [vstrip(v) for v in vals]
additional_data = vals

if field_type.lower().startswith('enum('):
additional_data = parse_mysql_enum(field_type)

structure.fields.append(TableField(
name=field_name,
field_type=field_type,
Expand Down
206 changes: 206 additions & 0 deletions mysql_ch_replicator/converter_enum_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@


def parse_mysql_enum(enum_definition):
"""
Accepts a MySQL ENUM definition string (case–insensitive),
for example:
enum('point','qwe','def')
ENUM("asd", 'qwe', "def")
enum(`point`,`qwe`,`def`)
and returns a list of strings like:
['point', 'qwe', 'def']

Note:
- For single- and double–quoted values, backslash escapes are handled.
- For backtick–quoted values, only doubling (``) is recognized as escaping.
"""
# First, trim any whitespace.
s = enum_definition.strip()

# Check that the string begins with "enum" (case–insensitive)
if not s[:4].lower() == "enum":
raise ValueError("String does not start with 'enum'")

# Find the first opening parenthesis.
pos = s.find('(')
if pos == -1:
raise ValueError("Missing '(' in the enum definition")

# Extract the text inside the outer parenthesis.
# We use a helper to extract the contents taking into account
# that quotes (of any supported type) and escapes may appear.
inner_content, next_index = _extract_parenthesized_content(s, pos)
# Optionally, you can check that only whitespace follows next_index.

# Now parse out the comma–separated string literals.
return _parse_enum_values(inner_content)


def _extract_parenthesized_content(s, start_index):
"""
Given a string s and the index of a '(' in it,
return a tuple (content, pos) where content is the substring
inside the outer matching parentheses and pos is the index
immediately after the matching closing ')'.

This function takes special care to ignore any parentheses
that occur inside quotes (a quoted literal is any part enclosed by
', " or `) and also to skip over escape sequences in single/double quotes.
(Backticks do not process backslash escapes.)
"""
if s[start_index] != '(':
raise ValueError("Expected '(' at position {}".format(start_index))
depth = 1
i = start_index + 1
content_start = i
in_quote = None # will be set to a quoting character when inside a quoted literal

# Allow these quote characters.
allowed_quotes = ("'", '"', '`')

while i < len(s):
c = s[i]
if in_quote:
# Inside a quoted literal.
if in_quote in ("'", '"'):
if c == '\\':
# Skip the escape character and the next character.
i += 2
continue
# Whether we are in a backtick or one of the other quotes,
# check for the closing quote.
if c == in_quote:
# Check for a doubled quote.
if i + 1 < len(s) and s[i + 1] == in_quote:
i += 2
continue
else:
in_quote = None
i += 1
continue
else:
i += 1
continue
else:
# Not inside a quoted literal.
if c in allowed_quotes:
in_quote = c
i += 1
continue
elif c == '(':
depth += 1
i += 1
continue
elif c == ')':
depth -= 1
i += 1
if depth == 0:
# Return the substring inside (excluding the outer parentheses)
return s[content_start:i - 1], i
continue
else:
i += 1

raise ValueError("Unbalanced parentheses in enum definition")


def _parse_enum_values(content):
"""
Given the inner text from an ENUM declaration—for example:
"'point', 'qwe', 'def'"
parse and return a list of the string values as MySQL would see them.

This function handles:
- For single- and double–quoted strings: backslash escapes and doubled quotes.
- For backtick–quoted identifiers: only doubled backticks are recognized.
"""
values = []
i = 0
allowed_quotes = ("'", '"', '`')
while i < len(content):
# Skip any whitespace.
while i < len(content) and content[i].isspace():
i += 1
if i >= len(content):
break
# The next non–whitespace character must be one of the allowed quotes.
if content[i] not in allowed_quotes:
raise ValueError("Expected starting quote for enum value at position {} in {!r}".format(i, content))
quote = content[i]
i += 1 # skip the opening quote

literal_chars = []
while i < len(content):
c = content[i]
# For single- and double–quotes, process backslash escapes.
if quote in ("'", '"') and c == '\\':
if i + 1 < len(content):
next_char = content[i + 1]
# Mapping for common escapes. (For the quote character, map it to itself.)
escapes = {
'0': '\0',
'b': '\b',
'n': '\n',
'r': '\r',
't': '\t',
'Z': '\x1a',
'\\': '\\',
quote: quote
}
literal_chars.append(escapes.get(next_char, next_char))
i += 2
continue
else:
# Trailing backslash – treat it as literal.
literal_chars.append('\\')
i += 1
continue
elif c == quote:
# Check for a doubled quote (works for all three quoting styles).
if i + 1 < len(content) and content[i + 1] == quote:
literal_chars.append(quote)
i += 2
continue
else:
i += 1 # skip the closing quote
break # end of this literal
else:
# For backticks, we do not treat backslashes specially.
literal_chars.append(c)
i += 1
# Finished reading one literal; join the characters.
value = ''.join(literal_chars)
values.append(value)

# Skip whitespace after the literal.
while i < len(content) and content[i].isspace():
i += 1
# If there’s a comma, skip it; otherwise, we must be at the end.
if i < len(content):
if content[i] == ',':
i += 1
else:
raise ValueError("Expected comma between enum values at position {} in {!r}"
.format(i, content))
return values


# --- For testing purposes ---
if __name__ == '__main__':
tests = [
"enum('point','qwe','def')",
"ENUM('asd', 'qwe', 'def')",
'enum("first", \'second\', "Don""t stop")',
"enum('a\\'b','c\\\\d','Hello\\nWorld')",
# Now with backticks:
"enum(`point`,`qwe`,`def`)",
"enum('point',`qwe`,'def')",
"enum(`first`, `Don``t`, `third`)",
]

for t in tests:
try:
result = parse_mysql_enum(t)
print("Input: {}\nParsed: {}\n".format(t, result))
except Exception as e:
print("Error parsing {}: {}\n".format(t, e))
7 changes: 3 additions & 4 deletions mysql_ch_replicator/pymysqlreplication/row_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,9 @@ def __read_values_name(
elif column.type == FIELD_TYPE.YEAR:
return self.packet.read_uint8() + 1900
elif column.type == FIELD_TYPE.ENUM:
if column.enum_values:
return column.enum_values[self.packet.read_uint_by_size(column.size)]
self.packet.read_uint_by_size(column.size)
return None
# if column.enum_values:
# return column.enum_values[self.packet.read_uint_by_size(column.size)]
return self.packet.read_uint_by_size(column.size)
elif column.type == FIELD_TYPE.SET:
bit_mask = self.packet.read_uint_by_size(column.size)
if column.set_values:
Expand Down
11 changes: 7 additions & 4 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,13 +981,14 @@ def test_different_types_2():
test4 set('1','2','3','4','5','6','7'),
test5 timestamp(0),
test6 char(36),
test7 ENUM('point', 'qwe', 'def'),
PRIMARY KEY (id)
);
''')

mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test3, test4, test5, test6) VALUES "
f"(0, POINT(10.0, 20.0), 'azaza', '1,3,5', '2023-08-15 14:30:00', '550e8400-e29b-41d4-a716-446655440000');",
f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test3, test4, test5, test6, test7) VALUES "
f"(0, POINT(10.0, 20.0), 'azaza', '1,3,5', '2023-08-15 14:30:00', '550e8400-e29b-41d4-a716-446655440000', 'def');",
commit=True,
)

Expand All @@ -1004,16 +1005,18 @@ def test_different_types_2():
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)

mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test4, test5, test6) VALUES "
f"(1, POINT(15.0, 14.0), '2,4,5', '2023-08-15 14:40:00', '110e6103-e39b-51d4-a716-826755413099');",
f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test4, test5, test6, test7) VALUES "
f"(1, POINT(15.0, 14.0), '2,4,5', '2023-08-15 14:40:00', '110e6103-e39b-51d4-a716-826755413099', 'point');",
commit=True,
)

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, 'test1=True')) == 1)

assert ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test2']['x'] == 15.0
assert ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test7'] == 'point'
assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test2']['y'] == 20.0
assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test7'] == 'def'
assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test3'] == 'azaza\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'

assert ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test4'] == '2,4,5'
Expand Down