|
6 | 6 | from enum import Enum |
7 | 7 | from dataclasses import dataclass |
8 | 8 | from collections import defaultdict |
| 9 | +from datetime import date |
9 | 10 |
|
10 | 11 | from .config import Settings, MysqlSettings, ClickhouseSettings |
11 | 12 | from .mysql_api import MySQLApi |
@@ -268,6 +269,30 @@ def perform_initial_replication(self): |
268 | 269 | self.clickhouse_api.database = self.target_database |
269 | 270 | logger.info(f'initial replication - done') |
270 | 271 |
|
| 272 | + def to_date_if_str(self, value): |
| 273 | + if not isinstance(value, str): |
| 274 | + return value |
| 275 | + |
| 276 | + if len(value) == 10 and value[4] == '-' and value[7] == '-': |
| 277 | + try: |
| 278 | + year = int(value[0:4]) |
| 279 | + month = int(value[5:7]) |
| 280 | + day = int(value[8:10]) |
| 281 | + return date(year, month, day) |
| 282 | + except ValueError: |
| 283 | + return value |
| 284 | + |
| 285 | + if len(value) == 12 and value[5] == '-' and value[8] == '-' and ((value[0] == '\'' and value[11] == '\'') or (value[0] == '"' and value[11] == '"')): |
| 286 | + try: |
| 287 | + year = int(value[1:5]) |
| 288 | + month = int(value[6:8]) |
| 289 | + day = int(value[9:11]) |
| 290 | + return date(year, month, day) |
| 291 | + except ValueError: |
| 292 | + return value |
| 293 | + |
| 294 | + return value |
| 295 | + |
271 | 296 | def perform_initial_replication_table(self, table_name): |
272 | 297 | logger.info(f'running initial replication for table {table_name}') |
273 | 298 |
|
@@ -329,13 +354,11 @@ def perform_initial_replication_table(self, table_name): |
329 | 354 |
|
330 | 355 | if not records: |
331 | 356 | break |
| 357 | + |
332 | 358 | self.clickhouse_api.insert(table_name, records, table_structure=clickhouse_table_structure) |
333 | | - for record in records: |
334 | | - record_primary_key = [record[key_idx] for key_idx in primary_key_ids] |
335 | | - if max_primary_key is None: |
336 | | - max_primary_key = record_primary_key |
337 | | - else: |
338 | | - max_primary_key = max(max_primary_key, record_primary_key) |
| 359 | + |
| 360 | + last_record = records[-1] |
| 361 | + max_primary_key = [self.to_date_if_str(last_record[key_idx]) for key_idx in primary_key_ids] |
339 | 362 |
|
340 | 363 | self.state.initial_replication_max_primary_key = max_primary_key |
341 | 364 | self.save_state_if_required() |
|
0 commit comments