Skip to content

Commit dde4650

Browse files
authored
Optimize MySQL Bootstrap Chunking by Switching from MD5 to CRC32 (#987)
The existing MD5-based partitioning strategy caused chunking and bootstrapping of large tables to be extremely slow and prone to connection timeouts. The partitioning strategy has been updated to use CRC32 instead of MD5 for MySQL bootstrapping.
1 parent 3076293 commit dde4650

File tree

2 files changed

+22
-24
lines changed

2 files changed

+22
-24
lines changed

datastream-common/src/main/java/com/linkedin/datastream/common/databases/dbreader/MySqlChunkedQueryManager.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public class MySqlChunkedQueryManager implements ChunkedQueryManager {
1919
private static final String SELECT_FROM = "SELECT * FROM ( ";
2020

2121
/** Generate base predicate for sharding keys to given number of partitions.
22-
* Ex: MOD ( CONV ( MD5 ( CONCAT ( K1, K2, K3 ) ) , 16, 10 ) , 10 ) for a table with 3 keys {K1, K2, K3} and 10 partitions */
22+
* Ex: MOD ( CRC32 ( CONCAT ( K1, K2, K3 ) ), 10 ) for a table with 3 keys {K1, K2, K3} and 10 partitions */
2323
private static String generatePerPartitionHashPredicate(List<String> keys, int partitionCount) {
2424
StringBuilder query = new StringBuilder();
2525
int keyCount = keys.size();
@@ -31,17 +31,15 @@ private static String generatePerPartitionHashPredicate(List<String> keys, int p
3131
}
3232
query.append(" )");
3333

34-
// Wrap that with MOD, CONV, and MD5 to generate a hash for sharding
35-
// MOD ( CONV ( MD5 ( CONCAT ( A, B, C ) ) , 16, 10 ) , 10 )
36-
query.insert(0, "MD5 ( ").append(" )");
37-
// 16, 10 converts from HEX to DEC
38-
query.insert(0, "CONV ( ").append(" , 16, 10 )");
34+
// Wrap that with MOD, CRC32 to generate a hash for sharding
35+
// MOD ( CRC32 ( CONCAT ( A, B, C ) ) , 10 )
36+
query.insert(0, "CRC32 ( ").append(" )");
3937
query.insert(0, "MOD ( ").append(" , ").append(partitionCount).append(" )");
4038
return query.toString();
4139
}
4240

4341
/** Generate predicate for filtering rows hashing to the assigned partitions :
44-
* Ex: WHERE ( MOD ( CONV ( MD5 ( CONCAT ( K1, K2, K3 ) ) , 16, 10 ) , 10 ) IN (1 , 6 ) )
42+
* Ex: WHERE ( MOD ( CRC32 ( CONCAT ( K1, K2, K3 ) ), 10 ) IN (1 , 6 ) )
4543
* where 1 and 6 are the assigned partitions, 10 the partition count and, {K1, K2, K3} the keys of the table
4644
*/
4745
private static String generateFullPartitionHashPredicate(String perPartitionPredicate, List<Integer> partitions) {
@@ -121,7 +119,7 @@ public void prepareChunkedQuery(PreparedStatement stmt, List<Object> values) thr
121119
// SELECT * FROM
122120
// (
123121
// SELECT * FROM TABLE
124-
// ) nestedTab1 WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) 16, 10 ) , 10 ) IN ( 2 , 5 ) )
122+
// ) nestedTab1 WHERE ( MOD CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 2 , 5 ) )
125123
// AND ( ( KEY1 > ? ) OR ( KEY1 = ? AND KEY2 > ? ) )
126124
// ORDER BY KEY1 , KEY2
127125
// ) as nestedTab2 LIMIT 10;

datastream-common/src/test/java/com/linkedin/datastream/common/databases/dbreader/TestMysqlChunkedQueryManager.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ public void testSimpleKeySinglePartition() {
2727
* (
2828
* SELECT * FROM TABLE
2929
* ) nestedTab1
30-
* WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 ) ) , 16, 10 ) , 10 ) IN ( 3 ) )
30+
* WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 ) ) , 10 ) IN ( 3 ) )
3131
* ORDER BY KEY1
3232
* ) as nestedTab2 LIMIT 10;
3333
*/
3434
String firstExpected =
3535
"SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1 "
36-
+ "WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 ) ) , 16, 10 ) , 10 ) IN ( 3 ) ) ORDER BY KEY1 ) as nestedTab2 LIMIT 10";
36+
+ "WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 ) ) , 10 ) IN ( 3 ) ) ORDER BY KEY1 ) as nestedTab2 LIMIT 10";
3737

3838
/**
3939
* SELECT * FROM
@@ -42,12 +42,12 @@ public void testSimpleKeySinglePartition() {
4242
* (
4343
* SELECT * FROM TABLE
4444
* ) nestedTab1
45-
* WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 ) ) , 16, 10 ) , 10 ) IN ( 3 ) ) AND ( ( KEY1 > ? ) )
45+
* WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 ) ) , 10 ) IN ( 3 ) ) AND ( ( KEY1 > ? ) )
4646
* ORDER BY KEY1
4747
* ) as nestedTab2 LIMIT 10;
4848
*/
4949
String chunkedExpected = "SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1 "
50-
+ "WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 ) ) , 16, 10 ) , 10 ) IN ( 3 ) ) AND ( ( KEY1 > ? ) ) ORDER BY KEY1 ) as nestedTab2 LIMIT 10";
50+
+ "WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 ) ) , 10 ) IN ( 3 ) ) AND ( ( KEY1 > ? ) ) ORDER BY KEY1 ) as nestedTab2 LIMIT 10";
5151
testQueryString(MANAGER, firstExpected, chunkedExpected, NESTED_QUERY, KEY, CHUNK_SIZE, PARTITION_COUNT, PARTITION);
5252
}
5353

@@ -64,10 +64,10 @@ public void testSimpleKeyMultiPartition() {
6464
* (
6565
* SELECT * FROM TABLE
6666
* ) nestedTab1
67-
* WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 ) ) , 16, 10 ) , 10 ) IN ( 2 , 5 ) ) ORDER BY KEY1
67+
* WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 ) ) , 10 ) IN ( 2 , 5 ) ) ORDER BY KEY1
6868
* ) as nestedTab2 LIMIT 10;
6969
*/
70-
String firstExpected = "SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1 WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 ) ) , 16, 10 ) , 10 ) "
70+
String firstExpected = "SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1 WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 ) ) , 10 ) "
7171
+ "IN ( 2 , 5 ) ) ORDER BY KEY1 ) as nestedTab2 LIMIT 10";
7272

7373
/**
@@ -77,11 +77,11 @@ public void testSimpleKeyMultiPartition() {
7777
* (
7878
* SELECT * FROM TABLE
7979
* ) nestedTab1
80-
* WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 ) ) , 16, 10 ) , 10 ) IN ( 2 , 5 ) ) AND ( ( KEY1 > ? ) ) ORDER BY KEY1
80+
* WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 ) ) , 10 ) IN ( 2 , 5 ) ) AND ( ( KEY1 > ? ) ) ORDER BY KEY1
8181
* ) as nestedTab2 LIMIT 10;
8282
*/
8383
String chunkedExpected = "SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1 "
84-
+ "WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 ) ) , 16, 10 ) , 10 ) IN ( 2 , 5 ) ) "
84+
+ "WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 ) ) , 10 ) IN ( 2 , 5 ) ) "
8585
+ "AND ( ( KEY1 > ? ) ) ORDER BY KEY1 ) as nestedTab2 LIMIT 10";
8686
testQueryString(MANAGER, firstExpected, chunkedExpected, NESTED_QUERY, KEY, CHUNK_SIZE, PARTITION_COUNT,
8787
PARTITIONS);
@@ -96,12 +96,12 @@ public void testCompositeKeySinglePartition() {
9696
* SELECT * FROM
9797
* (
9898
* SELECT * FROM TABLE
99-
* ) nestedTab1 WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) , 16, 10 ) , 10 ) IN ( 3 ) )
99+
* ) nestedTab1 WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 3 ) )
100100
* ORDER BY KEY1 , KEY2
101101
* ) as nestedTab2 LIMIT 10;
102102
*/
103103
String firstExpected = "SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1"
104-
+ " WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) , 16, 10 ) , 10 ) IN ( 3 ) ) ORDER BY KEY1 , KEY2 ) as nestedTab2 LIMIT 10";
104+
+ " WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 3 ) ) ORDER BY KEY1 , KEY2 ) as nestedTab2 LIMIT 10";
105105

106106
/**
107107
* SELECT * FROM
@@ -110,12 +110,12 @@ public void testCompositeKeySinglePartition() {
110110
* (
111111
* SELECT * FROM TABLE
112112
* ) nestedTab1
113-
* WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) , 16, 10 ) , 10 ) IN ( 3 ) ) AND ( ( KEY1 > ? ) OR ( KEY1 = ? AND KEY2 > ? ) )
113+
* WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 3 ) ) AND ( ( KEY1 > ? ) OR ( KEY1 = ? AND KEY2 > ? ) )
114114
* ORDER BY KEY1 , KEY2
115115
* ) as nestedTab2 LIMIT 10;
116116
*/
117117
String chunkedExpected = "SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1 "
118-
+ "WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) , 16, 10 ) , 10 ) IN ( 3 ) ) AND ( ( KEY1 > ? ) OR ( KEY1 = ? AND KEY2 > ? ) ) "
118+
+ "WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 3 ) ) AND ( ( KEY1 > ? ) OR ( KEY1 = ? AND KEY2 > ? ) ) "
119119
+ "ORDER BY KEY1 , KEY2 ) as nestedTab2 LIMIT 10";
120120
testQueryString(MANAGER, firstExpected, chunkedExpected, NESTED_QUERY, KEYS, CHUNK_SIZE, PARTITION_COUNT,
121121
PARTITION);
@@ -130,12 +130,12 @@ public void testCompositeKeyMultiPartition() {
130130
* SELECT * FROM
131131
* (
132132
* SELECT * FROM TABLE
133-
* ) nestedTab1 WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) , 16, 10 ) , 10 ) IN ( 2 , 5 ) )
133+
* ) nestedTab1 WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 2 , 5 ) )
134134
* ORDER BY KEY1 , KEY2
135135
* ) as nestedTab2 LIMIT 10;
136136
*/
137137
String firstExpected = "SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1 "
138-
+ "WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) , 16, 10 ) , 10 ) IN ( 2 , 5 ) ) "
138+
+ "WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 2 , 5 ) ) "
139139
+ "ORDER BY KEY1 , KEY2 ) as nestedTab2 LIMIT 10";
140140

141141
/**
@@ -145,13 +145,13 @@ public void testCompositeKeyMultiPartition() {
145145
* SELECT * FROM
146146
* (
147147
* SELECT * FROM TABLE
148-
* ) nestedTab1 WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) , 16, 10 ) , 10 ) IN ( 2 , 5 ) )
148+
* ) nestedTab1 WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 2 , 5 ) )
149149
* AND ( ( KEY1 > ? ) OR ( KEY1 = ? AND KEY2 > ? ) )
150150
* ORDER BY KEY1 , KEY2
151151
* ) as nestedTab2 LIMIT 10;
152152
*/
153153
String chunkedExpected = "SELECT * FROM ( SELECT * FROM ( SELECT * FROM TABLE ) nestedTab1 "
154-
+ "WHERE ( MOD ( CONV ( MD5 ( CONCAT ( KEY1 , KEY2 ) ) , 16, 10 ) , 10 ) IN ( 2 , 5 ) ) "
154+
+ "WHERE ( MOD ( CRC32 ( CONCAT ( KEY1 , KEY2 ) ) , 10 ) IN ( 2 , 5 ) ) "
155155
+ "AND ( ( KEY1 > ? ) OR ( KEY1 = ? AND KEY2 > ? ) ) ORDER BY KEY1 , KEY2 ) as nestedTab2 LIMIT 10";
156156
testQueryString(MANAGER, firstExpected, chunkedExpected, NESTED_QUERY, KEYS, CHUNK_SIZE, PARTITION_COUNT,
157157
PARTITIONS);

0 commit comments

Comments
 (0)