Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .cursor/rules/rules.mdc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ description:
globs:
alwaysApply: true
---
Use following command to run tests:
1. Use following command to run tests:

docker exec -w /app/ -it tests-replicator-1 python3 -m pytest -v -s tests/ -k test_your_test_name

2. Never create a config files in tests code in runtime. Always create a real config files. Use log level info for newly added config files.

3. Dont create analyzes md files.
1 change: 1 addition & 0 deletions mysql_ch_replicator/db_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(self, config: Settings):
self.mysql_api = MySQLApi(
database=None,
mysql_settings=config.mysql,
mysql_timezone=config.mysql_timezone,
)
self.clickhouse_api = ClickhouseApi(
database=None,
Expand Down
1 change: 1 addition & 0 deletions mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
self.mysql_api = MySQLApi(
database=self.database,
mysql_settings=config.mysql,
mysql_timezone=config.mysql_timezone,
)
self.clickhouse_api = ClickhouseApi(
database=self.target_database,
Expand Down
6 changes: 5 additions & 1 deletion mysql_ch_replicator/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ class Monitoring:
def __init__(self, databases: str, config: Settings):
self.config = config
self.databases = [db.strip() for db in databases.split(',') if db.strip()]
self.mysql_api = MySQLApi(database=None, mysql_settings=config.mysql)
self.mysql_api = MySQLApi(
database=None,
mysql_settings=config.mysql,
mysql_timezone=config.mysql_timezone,
)

def run(self):
stats = []
Expand Down
23 changes: 22 additions & 1 deletion mysql_ch_replicator/mysql_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import datetime
import time
import zoneinfo

import mysql.connector

from .config import MysqlSettings
Expand All @@ -8,9 +11,10 @@
class MySQLApi:
RECONNECT_INTERVAL = 3 * 60

def __init__(self, database: str, mysql_settings: MysqlSettings):
def __init__(self, database: str, mysql_settings: MysqlSettings, mysql_timezone: str = 'UTC'):
self.database = database
self.mysql_settings = mysql_settings
self.mysql_timezone = mysql_timezone
self.last_connect_time = 0
self.reconnect_if_required()

Expand Down Expand Up @@ -44,6 +48,10 @@ def reconnect_if_required(self, force=False):
else:
raise
self.cursor = self.db.cursor()

if self.mysql_timezone and self.mysql_timezone != 'UTC':
self.cursor.execute(f"SET time_zone = '{self.mysql_timezone}'")

if self.database is not None:
self.cursor.execute(f'USE `{self.database}`')
self.last_connect_time = curr_time
Expand Down Expand Up @@ -132,5 +140,18 @@ def get_records(self, table_name, order_by, limit, start_value=None, worker_id=N
# Execute the query
self.cursor.execute(query)
res = self.cursor.fetchall()

if self.mysql_timezone and self.mysql_timezone != 'UTC':
tz = zoneinfo.ZoneInfo(self.mysql_timezone)
records = []
for row in res:
new_row = []
for value in row:
if isinstance(value, datetime.datetime) and value.tzinfo is None:
value = value.replace(tzinfo=tz)
new_row.append(value)
records.append(tuple(new_row))
return records

records = [x for x in res]
return records
4 changes: 3 additions & 1 deletion mysql_ch_replicator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ def check_databases_updated(self, mysql_api: MySQLApi):

def run(self):
mysql_api = MySQLApi(
database=None, mysql_settings=self.config.mysql,
database=None,
mysql_settings=self.config.mysql,
mysql_timezone=self.config.mysql_timezone,
)
databases = mysql_api.get_databases()
databases = [db for db in databases if self.config.is_database_matches(db)]
Expand Down
68 changes: 68 additions & 0 deletions tests/test_data_types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import datetime
import json
import os
import tempfile
import uuid
import zoneinfo

import yaml

Expand Down Expand Up @@ -492,6 +494,72 @@ def test_timezone_conversion():
os.unlink(temp_config_file)


def test_timezone_conversion_values():
"""
Test that MySQL timestamp values are correctly preserved with timezone conversion.
This test reproduces the issue from GitHub issue #177.
"""
config_file = 'tests/tests_config_timezone.yaml'
cfg = config.Settings()
cfg.load(config_file)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
mysql_timezone=cfg.mysql_timezone,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

mysql.execute(f'''
CREATE TABLE `{TEST_TABLE_NAME}` (
id int NOT NULL AUTO_INCREMENT,
name varchar(255),
created_at timestamp NULL,
updated_at timestamp(3) NULL,
PRIMARY KEY (id)
);
''')

mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (name, created_at, updated_at) "
f"VALUES ('test_timezone', '2023-08-15 14:30:00', '2023-08-15 14:30:00.123');",
commit=True,
)

run_all_runner = RunAllRunner(cfg_file=config_file)
run_all_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)

results = ch.select(TEST_TABLE_NAME)
assert len(results) == 1
assert results[0]['name'] == 'test_timezone'

created_at_value = results[0]['created_at']
updated_at_value = results[0]['updated_at']

expected_dt = datetime.datetime(2023, 8, 15, 14, 30, 0)
ny_tz = zoneinfo.ZoneInfo('America/New_York')
expected_dt_with_tz = expected_dt.replace(tzinfo=ny_tz)

assert created_at_value == expected_dt_with_tz, f"Expected {expected_dt_with_tz}, got {created_at_value}"

expected_dt_with_microseconds = datetime.datetime(2023, 8, 15, 14, 30, 0, 123000)
expected_dt_with_microseconds_tz = expected_dt_with_microseconds.replace(tzinfo=ny_tz)
assert updated_at_value == expected_dt_with_microseconds_tz, f"Expected {expected_dt_with_microseconds_tz}, got {updated_at_value}"

run_all_runner.stop()


def test_year_type():
"""
Test that MySQL YEAR type is properly converted to UInt16 in ClickHouse
Expand Down
20 changes: 20 additions & 0 deletions tests/tests_config_timezone.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
mysql:
host: 'localhost'
port: 9306
user: 'root'
password: 'admin'

clickhouse:
host: 'localhost'
port: 9123
user: 'default'
password: 'admin'

binlog_replicator:
data_dir: '/app/binlog/'
records_per_file: 100000

databases: '*test*'
log_level: 'info'
mysql_timezone: 'America/New_York'