diff --git a/mysql_ch_replicator/clickhouse_api.py b/mysql_ch_replicator/clickhouse_api.py index f555394..24a9d65 100644 --- a/mysql_ch_replicator/clickhouse_api.py +++ b/mysql_ch_replicator/clickhouse_api.py @@ -117,18 +117,25 @@ def create_table(self, structure: TableStructure): }) self.execute_command(query) - def insert(self, table_name, records): + def insert(self, table_name, records, table_structure: TableStructure = None): current_version = self.get_last_used_version(table_name) + 1 records_to_insert = [] for record in records: new_record = [] - for e in record: + for i, e in enumerate(record): if isinstance(e, datetime.datetime): try: e.timestamp() except ValueError: e = 0 + if table_structure is not None: + field: TableField = table_structure.fields[i] + if 'DateTime' in field.field_type and 'Nullable' not in field.field_type: + try: + e.timestamp() + except (ValueError, AttributeError): + e = datetime.datetime(1970, 1, 1) new_record.append(e) record = new_record diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index 7cada51..b7936be 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -257,7 +257,7 @@ def perform_initial_replication_table(self, table_name): if not records: break - self.clickhouse_api.insert(table_name, records) + self.clickhouse_api.insert(table_name, records, table_structure=clickhouse_table_structure) for record in records: record_primary_key = record[primary_key_index] if max_primary_key is None: @@ -460,7 +460,8 @@ def upload_records(self): records = id_to_records.values() if not records: continue - self.clickhouse_api.insert(table_name, records) + _, ch_table_structure = self.state.tables_structure[table_name] + self.clickhouse_api.insert(table_name, records, table_structure=ch_table_structure) for table_name, keys_to_remove in self.records_to_delete.items(): if not keys_to_remove: diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 109bae1..11d7f88 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -436,3 +436,58 @@ def test_database_tables_filtering(): assert_wait(lambda: len(ch.select('test_table_2')) == 1) assert 'test_table_3' not in ch.get_tables() + + +def test_datetime_exception(): + cfg = config.Settings() + cfg.load(CONFIG_FILE) + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch) + + mysql.execute("SET sql_mode = 'ALLOW_INVALID_DATES';") + + mysql.execute(f''' +CREATE TABLE {TEST_TABLE_NAME} ( + id int NOT NULL AUTO_INCREMENT, + name varchar(255), + modified_date DateTime(3) NOT NULL, + PRIMARY KEY (id) +); + ''') + + mysql.execute( + f"INSERT INTO {TEST_TABLE_NAME} (name, modified_date) VALUES ('Ivan', '0000-00-00 00:00:00');", + commit=True, + ) + + binlog_replicator_runner = BinlogReplicatorRunner() + binlog_replicator_runner.run() + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME) + db_replicator_runner.run() + + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + + ch.execute_command(f'USE {TEST_DB_NAME}') + + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) + + mysql.execute( + f"INSERT INTO {TEST_TABLE_NAME} (name, modified_date) VALUES ('Alex', '0000-00-00 00:00:00');", + commit=True, + ) + mysql.execute( + f"INSERT INTO {TEST_TABLE_NAME} (name, modified_date) VALUES ('Givi', '2023-01-08 03:11:09');", + commit=True, + ) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)