Skip to content

Commit e9d7b27

Browse files
committed
[DOP-21813] Fix partitioning_mode=hash and cover with tests
1 parent e3c6bfa commit e9d7b27

File tree

18 files changed

+1556
-77
lines changed

18 files changed

+1556
-77
lines changed

docs/changelog/0.12.5.rst

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
0.12.5 (2024-12-02)
2+
===================
3+
4+
Improvements
5+
------------
6+
7+
- Use ``sipHash64`` instead of ``md5`` in Clickhouse for reading data with ``{"partitioning_mode": "hash"}``, as it is 5 times faster.
8+
- Use ``hashtext`` instead of ``md5`` in Postgres for reading data with ``{"partitioning_mode": "hash"}``, as it is 3-5 times faster.
9+
- Use ``BINARY_CHECKSUM`` instead of ``HASHBYTES`` in MSSQL for reading data with ``{"partitioning_mode": "hash"}``, as it is 5 times faster.
10+
11+
Big fixes
12+
---------
13+
14+
- In JDBC sources wrap ``MOD(partitionColumn, numPartitions)`` with ``ABS(...)`` to make al returned values positive. This prevents data sked.
15+
- Fix reading table data from MSSQL using ``{"partitioning_mode": "hash"}`` with ``partitionColumn`` of integer type.

docs/changelog/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
:caption: Changelog
44

55
DRAFT
6+
0.12.5
67
0.12.4
78
0.12.3
89
0.12.2

onetl/connection/db_connection/clickhouse/dialect.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,15 @@
1010

1111
class ClickhouseDialect(JDBCDialect):
1212
def get_partition_column_hash(self, partition_column: str, num_partitions: int) -> str:
13-
return f"halfMD5({partition_column}) % {num_partitions}"
13+
# SipHash is 3 times faster thah MD5
14+
# https://clickhouse.com/docs/en/sql-reference/functions/hash-functions#siphash64
15+
return f"sipHash64({partition_column}) % {num_partitions}"
1416

1517
def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str:
16-
return f"{partition_column} % {num_partitions}"
18+
# Return positive value even for negative input.
19+
# Don't use positiveModulo as it is 4-5 times slower:
20+
# https://clickhouse.com/docs/en/sql-reference/functions/arithmetic-functions#positivemoduloa-b
21+
return f"abs({partition_column} % {num_partitions})"
1722

1823
def get_max_value(self, value: Any) -> str:
1924
# Max function in Clickhouse returns 0 instead of NULL for empty table

onetl/connection/db_connection/jdbc_connection/options.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,10 @@ class Config:
155155
.. note::
156156
Column type depends on :obj:`~partitioning_mode`.
157157
158-
* ``partitioning_mode="range"`` requires column to be an integer or date (can be NULL, but not recommended).
159-
* ``partitioning_mode="hash"`` requires column to be an string (NOT NULL).
158+
* ``partitioning_mode="range"`` requires column to be an integer, date or timestamp (can be NULL, but not recommended).
159+
* ``partitioning_mode="hash"`` accepts any column type (NOT NULL).
160160
* ``partitioning_mode="mod"`` requires column to be an integer (NOT NULL).
161161
162-
163162
See documentation for :obj:`~partitioning_mode` for more details"""
164163

165164
num_partitions: PositiveInt = Field(default=1, alias="numPartitions")
@@ -256,6 +255,10 @@ class Config:
256255
257256
Where ``stride=(upper_bound - lower_bound) / num_partitions``.
258257
258+
.. note::
259+
260+
Can be used only with columns of integer, date or timestamp types.
261+
259262
.. note::
260263
261264
:obj:`~lower_bound`, :obj:`~upper_bound` and :obj:`~num_partitions` are used just to
@@ -297,7 +300,7 @@ class Config:
297300
.. note::
298301
299302
The hash function implementation depends on RDBMS. It can be ``MD5`` or any other fast hash function,
300-
or expression based on this function call.
303+
or expression based on this function call. Usually such functions accepts any column type as an input.
301304
302305
* ``mod``
303306
Allocate each executor a set of values based on modulus of the :obj:`~partition_column` column.
@@ -325,6 +328,10 @@ class Config:
325328
SELECT ... FROM table
326329
WHERE (partition_column mod num_partitions) = num_partitions-1 -- upper_bound
327330
331+
.. note::
332+
333+
Can be used only with columns of integer type.
334+
328335
.. versionadded:: 0.5.0
329336
330337
Examples

onetl/connection/db_connection/mssql/dialect.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,15 @@
88

99

1010
class MSSQLDialect(JDBCDialect):
11-
# https://docs.microsoft.com/ru-ru/sql/t-sql/functions/hashbytes-transact-sql?view=sql-server-ver16
1211
def get_partition_column_hash(self, partition_column: str, num_partitions: int) -> str:
13-
return f"CONVERT(BIGINT, HASHBYTES ('SHA', {partition_column})) % {num_partitions}"
12+
# CHECKSUM/BINARY_CHECKSUM are faster than MD5 in 5 times:
13+
# https://stackoverflow.com/a/4691861/23601543
14+
# https://learn.microsoft.com/en-us/sql/t-sql/functions/checksum-transact-sql?view=sql-server-ver16
15+
return f"ABS(BINARY_CHECKSUM({partition_column})) % {num_partitions}"
1416

1517
def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str:
16-
return f"{partition_column} % {num_partitions}"
18+
# Return positive value even for negative input
19+
return f"ABS({partition_column} % {num_partitions})"
1720

1821
def get_sql_query(
1922
self,

onetl/connection/db_connection/mysql/dialect.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,13 @@
99

1010
class MySQLDialect(JDBCDialect):
1111
def get_partition_column_hash(self, partition_column: str, num_partitions: int) -> str:
12-
return f"MOD(CONV(CONV(RIGHT(MD5({partition_column}), 16), 16, 2), 2, 10), {num_partitions})"
12+
# MD5 is the fastest hash function https://stackoverflow.com/a/3118889/23601543
13+
# But it returns 32 char string (128 bit), which we need to convert to integer
14+
return f"CAST(CONV(RIGHT(MD5({partition_column}), 16), 16, 10) AS UNSIGNED) % {num_partitions}"
1315

1416
def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str:
15-
return f"MOD({partition_column}, {num_partitions})"
17+
# Return positive value even for negative input
18+
return f"ABS({partition_column} % {num_partitions})"
1619

1720
def escape_column(self, value: str) -> str:
1821
return f"`{value}`"

onetl/connection/db_connection/oracle/dialect.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ def get_partition_column_hash(self, partition_column: str, num_partitions: int)
4848
return f"ora_hash({partition_column}, {num_partitions - 1})"
4949

5050
def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str:
51-
return f"MOD({partition_column}, {num_partitions})"
51+
# Return positive value even for negative input
52+
return f"ABS(MOD({partition_column}, {num_partitions}))"
5253

5354
def _serialize_datetime(self, value: datetime) -> str:
5455
result = value.strftime("%Y-%m-%d %H:%M:%S")

onetl/connection/db_connection/postgres/dialect.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@
99

1010

1111
class PostgresDialect(NotSupportHint, JDBCDialect):
12-
# https://stackoverflow.com/a/9812029
1312
def get_partition_column_hash(self, partition_column: str, num_partitions: int) -> str:
14-
return f"('x'||right(md5('{partition_column}'), 16))::bit(32)::bigint % {num_partitions}"
13+
# hashtext is about 3-5 times faster than MD5 (tested locally)
14+
# https://postgrespro.com/list/thread-id/1506406
15+
return f"abs(hashtext({partition_column}::text)) % {num_partitions}"
1516

1617
def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str:
17-
return f"{partition_column} % {num_partitions}"
18+
# Return positive value even for negative input
19+
return f"abs({partition_column} % {num_partitions})"
1820

1921
def _serialize_datetime(self, value: datetime) -> str:
2022
result = value.isoformat()

onetl/connection/db_connection/teradata/dialect.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ def get_partition_column_hash(self, partition_column: str, num_partitions: int)
1313
return f"HASHAMP(HASHBUCKET(HASHROW({partition_column}))) mod {num_partitions}"
1414

1515
def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str:
16-
return f"{partition_column} mod {num_partitions}"
16+
# Return positive value even for negative input
17+
return f"ABS({partition_column} mod {num_partitions})"
1718

1819
def _serialize_datetime(self, value: datetime) -> str:
1920
result = value.isoformat()

tests/fixtures/processing/base_processing.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import secrets
34
from abc import ABC, abstractmethod
45
from collections import defaultdict
56
from datetime import date, datetime, timedelta
@@ -137,7 +138,7 @@ def create_pandas_df(
137138
elif "float" in column_name:
138139
values[column].append(float(f"{i}.{i}"))
139140
elif "text" in column_name:
140-
values[column].append("This line is made to test the work")
141+
values[column].append(secrets.token_hex(16))
141142
elif "datetime" in column_name:
142143
rand_second = randint(0, i * time_multiplier) # noqa: S311
143144
values[column].append(self.current_datetime() + timedelta(seconds=rand_second))

0 commit comments

Comments
 (0)