Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,44 @@ def convert_type(self, mysql_type, parameters):
if mysql_type == 'point':
return 'Tuple(x Float32, y Float32)'

# Correctly handle numeric types
if mysql_type.startswith('numeric'):
# Determine if parameters are specified via parentheses:
if '(' in mysql_type and ')' in mysql_type:
# Expecting a type definition like "numeric(precision, scale)"
pattern = r"numeric\((\d+)\s*,\s*(\d+)\)"
match = re.search(pattern, mysql_type)
if not match:
raise ValueError(f"Invalid numeric type definition: {mysql_type}")

precision = int(match.group(1))
scale = int(match.group(2))
else:
# If no parentheses are provided, assume defaults.
precision = 10 # or other default as defined by your standards
scale = 0

# If no fractional part, consider mapping to integer type (if desired)
if scale == 0:
if is_unsigned:
if precision <= 9:
return "UInt32"
elif precision <= 18:
return "UInt64"
else:
# For very large precisions, fallback to Decimal
return f"Decimal({precision}, {scale})"
else:
if precision <= 9:
return "Int32"
elif precision <= 18:
return "Int64"
else:
return f"Decimal({precision}, {scale})"
else:
# For types with a defined fractional part, use a Decimal mapping.
return f"Decimal({precision}, {scale})"

if mysql_type == 'int':
if is_unsigned:
return 'UInt32'
Expand Down Expand Up @@ -472,7 +510,69 @@ def convert_alter_query(self, mysql_query, db_name):

raise Exception(f'operation {op_name} not implement, query: {subquery}')

@classmethod
def _tokenize_alter_query(cls, sql_line):
# We want to recognize tokens that may be:
# 1. A backquoted identifier that can optionally be immediately followed by parentheses.
# 2. A plain word (letters/digits/underscore) that may immediately be followed by a parenthesized argument list.
# 3. A single-quoted or double-quoted string.
# 4. Or, if nothing else, any non‐whitespace sequence.
#
# The order is important: for example, if a word is immediately followed by parentheses,
# we want to grab it as a single token.
token_pattern = re.compile(r'''
( # start capture group for a token
`[^`]+`(?:\([^)]*\))? | # backquoted identifier w/ optional parentheses
\w+(?:\([^)]*\))? | # a word with optional parentheses
'(?:\\'|[^'])*' | # a single-quoted string
"(?:\\"|[^"])*" | # a double-quoted string
[^\s]+ # fallback: any sequence of non-whitespace characters
)
''', re.VERBOSE)
tokens = token_pattern.findall(sql_line)

# Now, split the column definition into:
# token0 = column name,
# token1 = data type (which might be multiple tokens, e.g. DOUBLE PRECISION, INT UNSIGNED,
# or a word+parentheses like VARCHAR(254) or NUMERIC(5, 2)),
# remaining tokens: the parameters such as DEFAULT, NOT, etc.
#
# We define a set of keywords that indicate the start of column options.
constraint_keywords = {
"DEFAULT", "NOT", "NULL", "AUTO_INCREMENT", "PRIMARY", "UNIQUE",
"COMMENT", "COLLATE", "REFERENCES", "ON", "CHECK", "CONSTRAINT",
"AFTER", "BEFORE", "GENERATED", "VIRTUAL", "STORED", "FIRST",
"ALWAYS", "AS", "IDENTITY", "INVISIBLE", "PERSISTED",
}

if not tokens:
return tokens
# The first token is always the column name.
column_name = tokens[0]

# Now “merge” tokens after the column name that belong to the type.
# (For many types the type is written as a single token already –
# e.g. "VARCHAR(254)" or "NUMERIC(5, 2)", but for types like
# "DOUBLE PRECISION" or "INT UNSIGNED" the .split() would produce two tokens.)
type_tokens = []
i = 1
while i < len(tokens) and tokens[i].upper() not in constraint_keywords:
type_tokens.append(tokens[i])
i += 1
merged_type = " ".join(type_tokens) if type_tokens else ""

# The remaining tokens are passed through unchanged.
param_tokens = tokens[i:]

# Result: [column name, merged type, all the rest]
if merged_type:
return [column_name, merged_type] + param_tokens
else:
return [column_name] + param_tokens

def __convert_alter_table_add_column(self, db_name, table_name, tokens):
tokens = self._tokenize_alter_query(' '.join(tokens))

if len(tokens) < 2:
raise Exception('wrong tokens count', tokens)

Expand Down
42 changes: 42 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import subprocess
import json
import uuid
import decimal

import pytest
import requests
Expand Down Expand Up @@ -276,6 +277,12 @@ def test_e2e_multistatement():
mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE name='Ivan';", commit=True)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)

mysql.execute(f"ALTER TABLE `{TEST_TABLE_NAME}` ADD factor NUMERIC(5, 2) DEFAULT NULL;")
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, factor) VALUES ('Snow', 31, 13.29);", commit=True)

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Snow'")[0].get('factor') == decimal.Decimal('13.29'))

mysql.execute(
f"CREATE TABLE {TEST_TABLE_NAME_2} "
f"(id int NOT NULL AUTO_INCREMENT, name varchar(255), age int, "
Expand Down Expand Up @@ -1493,3 +1500,38 @@ def _get_last_insert_name():
print("*****************************")
print('\n\n')


def test_alter_tokens_split():
examples = [
# basic examples from the prompt:
("test_name VARCHAR(254) NULL", ["test_name", "VARCHAR(254)", "NULL"]),
("factor NUMERIC(5, 2) DEFAULT NULL", ["factor", "NUMERIC(5, 2)", "DEFAULT", "NULL"]),
# backquoted column name:
("`test_name` VARCHAR(254) NULL", ["`test_name`", "VARCHAR(254)", "NULL"]),
("`order` INT NOT NULL", ["`order`", "INT", "NOT", "NULL"]),
# type that contains a parenthesized list with quoted values:
("status ENUM('active','inactive') DEFAULT 'active'",
["status", "ENUM('active','inactive')", "DEFAULT", "'active'"]),
# multi‐word type definitions:
("col DOUBLE PRECISION DEFAULT 0", ["col", "DOUBLE PRECISION", "DEFAULT", "0"]),
("col INT UNSIGNED DEFAULT 0", ["col", "INT UNSIGNED", "DEFAULT", "0"]),
# a case with a quoted string containing spaces and punctuation:
("message VARCHAR(100) DEFAULT 'Hello, world!'",
["message", "VARCHAR(100)", "DEFAULT", "'Hello, world!'"]),
# longer definition with more options:
("col DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP",
["col", "DATETIME", "DEFAULT", "CURRENT_TIMESTAMP", "ON", "UPDATE", "CURRENT_TIMESTAMP"]),
# type with a COMMENT clause (here the type is given, then a parameter keyword)
("col VARCHAR(100) COMMENT 'This is a test comment'",
["col", "VARCHAR(100)", "COMMENT", "'This is a test comment'"]),
("c1 INT FIRST", ["c1", "INT", "FIRST"]),
]

for sql, expected in examples:
result = MysqlToClickhouseConverter._tokenize_alter_query(sql)
print("SQL Input: ", sql)
print("Expected: ", expected)
print("Tokenized: ", result)
print("Match? ", result == expected)
print("-" * 60)
assert result == expected