Skip to content

Commit c9efc16

Browse files
liandong00liandong
andauthored
Fix: Properly escape SQL identifiers to handle reserved keywords like key (#162)
* Fix the conflict issue with reserved keywords. * Remove the SQL query print statement used for debugging --------- Co-authored-by: liandong <[email protected]>
1 parent d50cfd1 commit c9efc16

File tree

1 file changed

+17
-8
lines changed

1 file changed

+17
-8
lines changed

mysql_ch_replicator/mysql_api.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,26 +95,35 @@ def get_table_create_statement(self, table_name) -> str:
9595

9696
def get_records(self, table_name, order_by, limit, start_value=None, worker_id=None, total_workers=None):
9797
self.reconnect_if_required()
98-
order_by_str = ','.join(order_by)
98+
99+
# Escape column names with backticks to avoid issues with reserved keywords like "key"
100+
order_by_escaped = [f'`{col}`' for col in order_by]
101+
order_by_str = ','.join(order_by_escaped)
102+
99103
where = ''
100104
if start_value is not None:
101-
start_value = ','.join(map(str, start_value))
102-
where = f'WHERE ({order_by_str}) > ({start_value}) '
105+
# Build the start_value condition for pagination
106+
start_value_str = ','.join(map(str, start_value))
107+
where = f'WHERE ({order_by_str}) > ({start_value_str}) '
103108

104-
# Add partitioning filter for parallel processing if needed
109+
# Add partitioning filter for parallel processing (e.g., sharded crawling)
105110
if worker_id is not None and total_workers is not None and total_workers > 1:
106-
# Use a list comprehension to build the COALESCE expressions with proper quoting
107-
coalesce_expressions = [f"COALESCE({key}, '')" for key in order_by]
111+
# Escape column names in COALESCE expressions
112+
coalesce_expressions = [f"COALESCE(`{key}`, '')" for key in order_by]
108113
concat_keys = f"CONCAT_WS('|', {', '.join(coalesce_expressions)})"
109114
hash_condition = f"CRC32({concat_keys}) % {total_workers} = {worker_id}"
115+
110116
if where:
111117
where += f'AND {hash_condition} '
112118
else:
113119
where = f'WHERE {hash_condition} '
114-
120+
121+
# Construct final query
115122
query = f'SELECT * FROM `{table_name}` {where}ORDER BY {order_by_str} LIMIT {limit}'
116123

117-
# Execute the actual query
124+
# print("Executing query:", query)
125+
126+
# Execute the query
118127
self.cursor.execute(query)
119128
res = self.cursor.fetchall()
120129
records = [x for x in res]

0 commit comments

Comments
 (0)