Skip to content

Commit db63e2e

Browse files
authored
Fixed timezone conversion issue (#207)
1 parent e4ae3c6 commit db63e2e

File tree

8 files changed

+125
-4
lines changed

8 files changed

+125
-4
lines changed

.cursor/rules/rules.mdc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ description:
33
globs:
44
alwaysApply: true
55
---
6-
Use following command to run tests:
6+
1. Use following command to run tests:
77

88
docker exec -w /app/ -it tests-replicator-1 python3 -m pytest -v -s tests/ -k test_your_test_name
9+
10+
2. Never create a config files in tests code in runtime. Always create a real config files. Use log level info for newly added config files.
11+
12+
3. Dont create analyzes md files.

mysql_ch_replicator/db_optimizer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def __init__(self, config: Settings):
4747
self.mysql_api = MySQLApi(
4848
database=None,
4949
mysql_settings=config.mysql,
50+
mysql_timezone=config.mysql_timezone,
5051
)
5152
self.clickhouse_api = ClickhouseApi(
5253
database=None,

mysql_ch_replicator/db_replicator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
163163
self.mysql_api = MySQLApi(
164164
database=self.database,
165165
mysql_settings=config.mysql,
166+
mysql_timezone=config.mysql_timezone,
166167
)
167168
self.clickhouse_api = ClickhouseApi(
168169
database=self.target_database,

mysql_ch_replicator/monitoring.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@ class Monitoring:
1818
def __init__(self, databases: str, config: Settings):
1919
self.config = config
2020
self.databases = [db.strip() for db in databases.split(',') if db.strip()]
21-
self.mysql_api = MySQLApi(database=None, mysql_settings=config.mysql)
21+
self.mysql_api = MySQLApi(
22+
database=None,
23+
mysql_settings=config.mysql,
24+
mysql_timezone=config.mysql_timezone,
25+
)
2226

2327
def run(self):
2428
stats = []

mysql_ch_replicator/mysql_api.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
import datetime
12
import time
3+
import zoneinfo
4+
25
import mysql.connector
36

47
from .config import MysqlSettings
@@ -8,9 +11,10 @@
811
class MySQLApi:
912
RECONNECT_INTERVAL = 3 * 60
1013

11-
def __init__(self, database: str, mysql_settings: MysqlSettings):
14+
def __init__(self, database: str, mysql_settings: MysqlSettings, mysql_timezone: str = 'UTC'):
1215
self.database = database
1316
self.mysql_settings = mysql_settings
17+
self.mysql_timezone = mysql_timezone
1418
self.last_connect_time = 0
1519
self.reconnect_if_required()
1620

@@ -44,6 +48,10 @@ def reconnect_if_required(self, force=False):
4448
else:
4549
raise
4650
self.cursor = self.db.cursor()
51+
52+
if self.mysql_timezone and self.mysql_timezone != 'UTC':
53+
self.cursor.execute(f"SET time_zone = '{self.mysql_timezone}'")
54+
4755
if self.database is not None:
4856
self.cursor.execute(f'USE `{self.database}`')
4957
self.last_connect_time = curr_time
@@ -132,5 +140,18 @@ def get_records(self, table_name, order_by, limit, start_value=None, worker_id=N
132140
# Execute the query
133141
self.cursor.execute(query)
134142
res = self.cursor.fetchall()
143+
144+
if self.mysql_timezone and self.mysql_timezone != 'UTC':
145+
tz = zoneinfo.ZoneInfo(self.mysql_timezone)
146+
records = []
147+
for row in res:
148+
new_row = []
149+
for value in row:
150+
if isinstance(value, datetime.datetime) and value.tzinfo is None:
151+
value = value.replace(tzinfo=tz)
152+
new_row.append(value)
153+
records.append(tuple(new_row))
154+
return records
155+
135156
records = [x for x in res]
136157
return records

mysql_ch_replicator/runner.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,9 @@ def check_databases_updated(self, mysql_api: MySQLApi):
147147

148148
def run(self):
149149
mysql_api = MySQLApi(
150-
database=None, mysql_settings=self.config.mysql,
150+
database=None,
151+
mysql_settings=self.config.mysql,
152+
mysql_timezone=self.config.mysql_timezone,
151153
)
152154
databases = mysql_api.get_databases()
153155
databases = [db for db in databases if self.config.is_database_matches(db)]

tests/test_data_types.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import datetime
22
import json
3+
import os
34
import tempfile
45
import uuid
6+
import zoneinfo
57

68
import yaml
79

@@ -492,6 +494,72 @@ def test_timezone_conversion():
492494
os.unlink(temp_config_file)
493495

494496

497+
def test_timezone_conversion_values():
498+
"""
499+
Test that MySQL timestamp values are correctly preserved with timezone conversion.
500+
This test reproduces the issue from GitHub issue #177.
501+
"""
502+
config_file = 'tests/tests_config_timezone.yaml'
503+
cfg = config.Settings()
504+
cfg.load(config_file)
505+
506+
mysql = mysql_api.MySQLApi(
507+
database=None,
508+
mysql_settings=cfg.mysql,
509+
mysql_timezone=cfg.mysql_timezone,
510+
)
511+
512+
ch = clickhouse_api.ClickhouseApi(
513+
database=TEST_DB_NAME,
514+
clickhouse_settings=cfg.clickhouse,
515+
)
516+
517+
prepare_env(cfg, mysql, ch)
518+
519+
mysql.execute(f'''
520+
CREATE TABLE `{TEST_TABLE_NAME}` (
521+
id int NOT NULL AUTO_INCREMENT,
522+
name varchar(255),
523+
created_at timestamp NULL,
524+
updated_at timestamp(3) NULL,
525+
PRIMARY KEY (id)
526+
);
527+
''')
528+
529+
mysql.execute(
530+
f"INSERT INTO `{TEST_TABLE_NAME}` (name, created_at, updated_at) "
531+
f"VALUES ('test_timezone', '2023-08-15 14:30:00', '2023-08-15 14:30:00.123');",
532+
commit=True,
533+
)
534+
535+
run_all_runner = RunAllRunner(cfg_file=config_file)
536+
run_all_runner.run()
537+
538+
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
539+
ch.execute_command(f'USE `{TEST_DB_NAME}`')
540+
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
541+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)
542+
543+
results = ch.select(TEST_TABLE_NAME)
544+
assert len(results) == 1
545+
assert results[0]['name'] == 'test_timezone'
546+
547+
created_at_value = results[0]['created_at']
548+
updated_at_value = results[0]['updated_at']
549+
550+
expected_dt = datetime.datetime(2023, 8, 15, 14, 30, 0)
551+
ny_tz = zoneinfo.ZoneInfo('America/New_York')
552+
expected_dt_with_tz = expected_dt.replace(tzinfo=ny_tz)
553+
554+
assert created_at_value == expected_dt_with_tz, f"Expected {expected_dt_with_tz}, got {created_at_value}"
555+
556+
expected_dt_with_microseconds = datetime.datetime(2023, 8, 15, 14, 30, 0, 123000)
557+
expected_dt_with_microseconds_tz = expected_dt_with_microseconds.replace(tzinfo=ny_tz)
558+
assert updated_at_value == expected_dt_with_microseconds_tz, f"Expected {expected_dt_with_microseconds_tz}, got {updated_at_value}"
559+
560+
run_all_runner.stop()
561+
562+
495563
def test_year_type():
496564
"""
497565
Test that MySQL YEAR type is properly converted to UInt16 in ClickHouse

tests/tests_config_timezone.yaml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
mysql:
2+
host: 'localhost'
3+
port: 9306
4+
user: 'root'
5+
password: 'admin'
6+
7+
clickhouse:
8+
host: 'localhost'
9+
port: 9123
10+
user: 'default'
11+
password: 'admin'
12+
13+
binlog_replicator:
14+
data_dir: '/app/binlog/'
15+
records_per_file: 100000
16+
17+
databases: '*test*'
18+
log_level: 'info'
19+
mysql_timezone: 'America/New_York'
20+

0 commit comments

Comments
 (0)