Skip to content

Commit d07ec98

Browse files
authored
Fixed datetime handling (#12)
1 parent 7003cc9 commit d07ec98

File tree

3 files changed

+67
-4
lines changed

3 files changed

+67
-4
lines changed

mysql_ch_replicator/clickhouse_api.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,18 +117,25 @@ def create_table(self, structure: TableStructure):
117117
})
118118
self.execute_command(query)
119119

120-
def insert(self, table_name, records):
120+
def insert(self, table_name, records, table_structure: TableStructure = None):
121121
current_version = self.get_last_used_version(table_name) + 1
122122

123123
records_to_insert = []
124124
for record in records:
125125
new_record = []
126-
for e in record:
126+
for i, e in enumerate(record):
127127
if isinstance(e, datetime.datetime):
128128
try:
129129
e.timestamp()
130130
except ValueError:
131131
e = 0
132+
if table_structure is not None:
133+
field: TableField = table_structure.fields[i]
134+
if 'DateTime' in field.field_type and 'Nullable' not in field.field_type:
135+
try:
136+
e.timestamp()
137+
except (ValueError, AttributeError):
138+
e = datetime.datetime(1970, 1, 1)
132139
new_record.append(e)
133140
record = new_record
134141

mysql_ch_replicator/db_replicator.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ def perform_initial_replication_table(self, table_name):
257257

258258
if not records:
259259
break
260-
self.clickhouse_api.insert(table_name, records)
260+
self.clickhouse_api.insert(table_name, records, table_structure=clickhouse_table_structure)
261261
for record in records:
262262
record_primary_key = record[primary_key_index]
263263
if max_primary_key is None:
@@ -460,7 +460,8 @@ def upload_records(self):
460460
records = id_to_records.values()
461461
if not records:
462462
continue
463-
self.clickhouse_api.insert(table_name, records)
463+
_, ch_table_structure = self.state.tables_structure[table_name]
464+
self.clickhouse_api.insert(table_name, records, table_structure=ch_table_structure)
464465

465466
for table_name, keys_to_remove in self.records_to_delete.items():
466467
if not keys_to_remove:

test_mysql_ch_replicator.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,3 +436,58 @@ def test_database_tables_filtering():
436436
assert_wait(lambda: len(ch.select('test_table_2')) == 1)
437437

438438
assert 'test_table_3' not in ch.get_tables()
439+
440+
441+
def test_datetime_exception():
442+
cfg = config.Settings()
443+
cfg.load(CONFIG_FILE)
444+
445+
mysql = mysql_api.MySQLApi(
446+
database=None,
447+
mysql_settings=cfg.mysql,
448+
)
449+
450+
ch = clickhouse_api.ClickhouseApi(
451+
database=TEST_DB_NAME,
452+
clickhouse_settings=cfg.clickhouse,
453+
)
454+
455+
prepare_env(cfg, mysql, ch)
456+
457+
mysql.execute("SET sql_mode = 'ALLOW_INVALID_DATES';")
458+
459+
mysql.execute(f'''
460+
CREATE TABLE {TEST_TABLE_NAME} (
461+
id int NOT NULL AUTO_INCREMENT,
462+
name varchar(255),
463+
modified_date DateTime(3) NOT NULL,
464+
PRIMARY KEY (id)
465+
);
466+
''')
467+
468+
mysql.execute(
469+
f"INSERT INTO {TEST_TABLE_NAME} (name, modified_date) VALUES ('Ivan', '0000-00-00 00:00:00');",
470+
commit=True,
471+
)
472+
473+
binlog_replicator_runner = BinlogReplicatorRunner()
474+
binlog_replicator_runner.run()
475+
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
476+
db_replicator_runner.run()
477+
478+
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
479+
480+
ch.execute_command(f'USE {TEST_DB_NAME}')
481+
482+
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
483+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)
484+
485+
mysql.execute(
486+
f"INSERT INTO {TEST_TABLE_NAME} (name, modified_date) VALUES ('Alex', '0000-00-00 00:00:00');",
487+
commit=True,
488+
)
489+
mysql.execute(
490+
f"INSERT INTO {TEST_TABLE_NAME} (name, modified_date) VALUES ('Givi', '2023-01-08 03:11:09');",
491+
commit=True,
492+
)
493+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)

0 commit comments

Comments
 (0)