Skip to content

Commit 3cac016

Browse files
authored
Optimize datetime fields check
1 parent 20fee80 commit 3cac016

File tree

1 file changed

+33
-27
lines changed

1 file changed

+33
-27
lines changed

mysql_ch_replicator/clickhouse_api.py

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -179,46 +179,52 @@ def create_table(self, structure: TableStructure, additional_indexes: list | Non
179179
def insert(self, table_name, records, table_structure: TableStructure = None):
180180
current_version = self.get_last_used_version(table_name) + 1
181181

182-
records_to_insert = []
183-
for record in records:
184-
new_record = []
185-
for i, e in enumerate(record):
182+
if not records:
183+
return
184+
185+
process_indices = []
186+
if table_structure:
187+
for i, field in enumerate(table_structure.fields):
188+
if (("DateTime" in field.field_type or "Date32" in field.field_type) and "Nullable" not in field.field_type):
189+
process_indices.append(i)
190+
else:
191+
first_record = records[0]
192+
for i, value in enumerate(first_record):
193+
if isinstance(value, (datetime.date, datetime.datetime)):
194+
process_indices.append(i)
195+
196+
for j, record in enumerate(records):
197+
if isinstance(record, tuple):
198+
record = list(record)
199+
records[j] = record
200+
201+
for i in process_indices:
202+
e = record[i]
203+
186204
if isinstance(e, datetime.date) and not isinstance(e, datetime.datetime):
187205
try:
188-
e = datetime.datetime.combine(e, datetime.time())
206+
record[i] = datetime.datetime.combine(e, datetime.time())
189207
except ValueError:
190-
e = datetime.datetime(1970, 1, 1)
191-
if isinstance(e, datetime.datetime):
208+
record[i] = datetime.datetime(1970, 1, 1)
209+
elif isinstance(e, datetime.datetime):
192210
try:
193211
e.timestamp()
194-
except ValueError:
195-
e = datetime.datetime(1970, 1, 1)
196-
if table_structure is not None:
197-
field: TableField = table_structure.fields[i]
198-
is_datetime = (
199-
('DateTime' in field.field_type) or
200-
('Date32' in field.field_type)
201-
)
202-
if is_datetime and 'Nullable' not in field.field_type:
203-
try:
204-
e.timestamp()
205-
except (ValueError, AttributeError):
206-
e = datetime.datetime(1970, 1, 1)
207-
new_record.append(e)
208-
record = new_record
209-
210-
records_to_insert.append(tuple(record) + (current_version,))
212+
except (ValueError, AttributeError):
213+
record[i] = datetime.datetime(1970, 1, 1)
214+
215+
record.append(current_version)
211216
current_version += 1
212217

213-
full_table_name = f'`table_name`'
214-
if '.' not in full_table_name:
218+
if '.' in table_name:
219+
full_table_name = f'`{table_name}`'
220+
else:
215221
full_table_name = f'`{self.database}`.`{table_name}`'
216222

217223
duration = 0.0
218224
for attempt in range(ClickhouseApi.MAX_RETRIES):
219225
try:
220226
t1 = time.time()
221-
self.client.insert(table=full_table_name, data=records_to_insert)
227+
self.client.insert(table=full_table_name, data=records)
222228
t2 = time.time()
223229
duration += (t2 - t1)
224230
break

0 commit comments

Comments
 (0)