Skip to content

Commit d6c7f89

Browse files
committed
[feat-#893][jdbc]range split strategy support all numeric type
1 parent c47f133 commit d6c7f89

File tree

6 files changed

+118
-49
lines changed

6 files changed

+118
-49
lines changed

chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/sink/HBase14SinkFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public DataStreamSink<RowData> createSink(DataStream<RowData> dataSet) {
9595
rowConverter = new HbaseRowConverter(hbaseSchema, nullStringLiteral);
9696
}
9797

98-
builder.setRowConverter(rowConverter,useAbstractBaseColumn);
98+
builder.setRowConverter(rowConverter, useAbstractBaseColumn);
9999
return createOutput(dataSet, builder.finish());
100100
}
101101

chunjun-connectors/chunjun-connector-hbase-1.4/src/main/java/com/dtstack/chunjun/connector/hbase14/source/HBase14SourceFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public DataStream<RowData> createSource() {
9797
rowConverter = new HbaseRowConverter(hbaseSchema, nullStringLiteral);
9898
}
9999

100-
builder.setRowConverter(rowConverter,useAbstractBaseColumn);
100+
builder.setRowConverter(rowConverter, useAbstractBaseColumn);
101101
return createInput(builder.finish());
102102
}
103103
}

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java

Lines changed: 43 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@
4242
import org.apache.flink.table.types.logical.RowType;
4343

4444
import org.apache.commons.lang3.StringUtils;
45-
import org.apache.commons.lang3.math.NumberUtils;
4645
import org.apache.commons.lang3.tuple.Pair;
4746

47+
import java.math.BigDecimal;
4848
import java.math.BigInteger;
4949
import java.sql.Connection;
5050
import java.sql.Date;
@@ -716,52 +716,53 @@ protected String buildQuerySqlBySplit(JdbcInputSplit jdbcInputSplit, List<String
716716
protected JdbcInputSplit[] createSplitsInternalBySplitRange(int minNumSplits) {
717717
JdbcInputSplit[] splits;
718718
Pair<String, String> splitRangeFromDb = getSplitRangeFromDb();
719-
BigInteger left = NumberUtils.createBigInteger(splitRangeFromDb.getLeft());
720-
BigInteger right = NumberUtils.createBigInteger(splitRangeFromDb.getRight());
719+
if (StringUtils.isBlank(splitRangeFromDb.getLeft())
720+
|| "null".equalsIgnoreCase(splitRangeFromDb.getLeft())) {
721+
// 没有数据,返回空数组
722+
return new JdbcInputSplit[minNumSplits];
723+
}
724+
BigDecimal left = new BigDecimal(splitRangeFromDb.getLeft());
725+
BigDecimal right = new BigDecimal(splitRangeFromDb.getRight());
721726
LOG.info("create splitsInternal,the splitKey range is {} --> {}", left, right);
722-
// 没有数据 返回空数组
723-
if (left == null || right == null) {
724-
splits = new JdbcInputSplit[minNumSplits];
725-
} else {
726-
BigInteger endAndStartGap = right.subtract(left);
727-
728-
BigInteger step = endAndStartGap.divide(BigInteger.valueOf(minNumSplits));
729-
BigInteger remainder = endAndStartGap.remainder(BigInteger.valueOf(minNumSplits));
730-
if (step.compareTo(BigInteger.ZERO) == 0) {
731-
// left = right时,step和remainder都为0
732-
if (remainder.compareTo(BigInteger.ZERO) == 0) {
733-
minNumSplits = 1;
734-
} else {
735-
minNumSplits = remainder.intValue();
736-
}
727+
BigDecimal endAndStartGap = right.subtract(left);
728+
BigDecimal remainder = endAndStartGap.remainder(new BigDecimal(minNumSplits));
729+
endAndStartGap = endAndStartGap.subtract(remainder);
730+
BigDecimal step = endAndStartGap.divide(new BigDecimal(minNumSplits));
731+
732+
if (step.compareTo(BigDecimal.ZERO) == 0) {
733+
// left = right时,step和remainder都为0
734+
if (remainder.compareTo(BigDecimal.ZERO) == 0) {
735+
minNumSplits = 1;
736+
} else {
737+
minNumSplits = remainder.intValue();
737738
}
739+
}
738740

739-
splits = new JdbcInputSplit[minNumSplits];
740-
BigInteger start;
741-
BigInteger end = left;
742-
for (int i = 0; i < minNumSplits; i++) {
743-
start = end;
744-
end = start.add(step);
745-
end =
746-
end.add(
747-
(remainder.compareTo(BigInteger.valueOf(i)) > 0)
748-
? BigInteger.ONE
749-
: BigInteger.ZERO);
750-
// 分片范围是 splitPk >=start and splitPk < end 最后一个分片范围是splitPk >= start
751-
if (i == minNumSplits - 1) {
752-
end = null;
753-
}
754-
splits[i] =
755-
new JdbcInputSplit(
756-
i,
757-
minNumSplits,
758-
i,
759-
jdbcConf.getStartLocation(),
760-
null,
761-
start.toString(),
762-
Objects.isNull(end) ? null : end.toString());
741+
splits = new JdbcInputSplit[minNumSplits];
742+
BigDecimal start;
743+
BigDecimal end = left;
744+
for (int i = 0; i < minNumSplits; i++) {
745+
start = end;
746+
end = start.add(step);
747+
if (remainder.compareTo(BigDecimal.ZERO) > 0) {
748+
end = end.add(BigDecimal.ONE);
749+
remainder = remainder.subtract(BigDecimal.ONE);
763750
}
751+
// 分片范围是 splitPk >=start and splitPk < end 最后一个分片范围是splitPk >= start
752+
if (i == minNumSplits - 1) {
753+
end = null;
754+
}
755+
splits[i] =
756+
new JdbcInputSplit(
757+
i,
758+
minNumSplits,
759+
i,
760+
jdbcConf.getStartLocation(),
761+
null,
762+
start.toString(),
763+
Objects.isNull(end) ? null : end.toString());
764764
}
765+
765766
return splits;
766767
}
767768

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/SqlUtil.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public static String buildQuerySplitRangeSql(JdbcConf jdbcConf, JdbcDialect jdbc
5757

5858
} else {
5959
// rowNum字段作为splitKey
60-
if (addRowNumColumn(jdbcConf.getSplitPk())) {
60+
if (isRowNumSplitKey(jdbcConf.getSplitPk())) {
6161
StringBuilder customTableBuilder =
6262
new StringBuilder(128)
6363
.append("SELECT ")
@@ -98,7 +98,7 @@ public static String buildQuerySqlBySplit(
9898
// customSql为空 且 splitPk是ROW_NUMBER()
9999
boolean flag =
100100
StringUtils.isBlank(jdbcConf.getCustomSql())
101-
&& SqlUtil.addRowNumColumn(jdbcConf.getSplitPk());
101+
&& SqlUtil.isRowNumSplitKey(jdbcConf.getSplitPk());
102102

103103
String splitFilter = null;
104104
if (jdbcInputSplit.getTotalNumberOfSplits() > 1) {
@@ -227,7 +227,7 @@ public static String buildOrderSql(
227227
}
228228

229229
/* 是否添加自定义函数column 作为分片key ***/
230-
public static boolean addRowNumColumn(String splitKey) {
230+
public static boolean isRowNumSplitKey(String splitKey) {
231231
return StringUtils.isNotBlank(splitKey)
232232
&& splitKey.contains(ConstantValue.LEFT_PARENTHESIS_SYMBOL);
233233
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.jdbc.source;
20+
21+
import com.dtstack.chunjun.connector.jdbc.conf.JdbcConf;
22+
23+
import org.apache.commons.lang3.tuple.Pair;
24+
import org.junit.Before;
25+
import org.junit.Test;
26+
import org.junit.runner.RunWith;
27+
import org.mockito.Mockito;
28+
import org.powermock.api.mockito.PowerMockito;
29+
import org.powermock.core.classloader.annotations.PrepareForTest;
30+
import org.powermock.modules.junit4.PowerMockRunner;
31+
import org.powermock.reflect.Whitebox;
32+
import org.slf4j.Logger;
33+
34+
import java.lang.reflect.InvocationTargetException;
35+
import java.lang.reflect.Method;
36+
import java.util.Arrays;
37+
38+
/** @author liuliu 2022/4/15 */
39+
@RunWith(PowerMockRunner.class)
40+
@PrepareForTest({JdbcInputFormat.class})
41+
public class JdbcInputFormatTest {
42+
43+
JdbcInputFormat jdbcInputFormat;
44+
45+
@Before
46+
public void setup() {
47+
jdbcInputFormat = PowerMockito.mock(JdbcInputFormat.class);
48+
Logger LOG = PowerMockito.mock(Logger.class);
49+
Whitebox.setInternalState(jdbcInputFormat, "LOG", LOG);
50+
JdbcConf jdbcConf = PowerMockito.mock(JdbcConf.class);
51+
Whitebox.setInternalState(jdbcInputFormat, "jdbcConf", jdbcConf);
52+
PowerMockito.when(jdbcConf.getStartLocation()).thenReturn("10");
53+
}
54+
55+
@Test
56+
public void createSplitsInternalBySplitRangeTest()
57+
throws InvocationTargetException, IllegalAccessException {
58+
PowerMockito.when(jdbcInputFormat.createSplitsInternalBySplitRange(Mockito.anyInt()))
59+
.thenCallRealMethod();
60+
Method getSplitRangeFromDb =
61+
PowerMockito.method(JdbcInputFormat.class, "getSplitRangeFromDb");
62+
Mockito.when(getSplitRangeFromDb.invoke(jdbcInputFormat))
63+
.thenReturn(Pair.of("12.123", "345534.12"));
64+
JdbcInputSplit[] splitsInternalBySplitRange =
65+
jdbcInputFormat.createSplitsInternalBySplitRange(3);
66+
Arrays.stream(splitsInternalBySplitRange).forEach(split -> System.out.println(split));
67+
assert splitsInternalBySplitRange.length == 3;
68+
}
69+
}

chunjun-core/src/main/java/com/dtstack/chunjun/element/column/ArrayColumn.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ public class ArrayColumn extends AbstractBaseColumn {
4242
protected Array data;
4343

4444
public ArrayColumn(final Array data) {
45-
super(data);
46-
this.data = data;
45+
super(data, data.toString().length());
4746
}
4847

4948
@Override

0 commit comments

Comments
 (0)