|
26 | 26 | import org.apache.hadoop.mapreduce.InputSplit; |
27 | 27 | import org.apache.hadoop.mapreduce.RecordReader; |
28 | 28 | import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| 29 | +import org.apache.hadoop.mapreduce.lib.db.BigDecimalSplitter; |
29 | 30 | import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; |
30 | 31 | import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; |
| 32 | +import org.apache.hadoop.mapreduce.lib.db.DBSplitter; |
31 | 33 | import org.apache.hadoop.mapreduce.lib.db.DBWritable; |
32 | 34 | import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat; |
33 | 35 | import org.slf4j.Logger; |
34 | 36 | import org.slf4j.LoggerFactory; |
35 | 37 |
|
36 | 38 | import java.io.IOException; |
| 39 | +import java.math.BigDecimal; |
| 40 | +import java.math.RoundingMode; |
37 | 41 | import java.sql.Connection; |
38 | 42 | import java.sql.Driver; |
39 | 43 | import java.sql.DriverManager; |
40 | 44 | import java.sql.SQLException; |
41 | 45 | import java.sql.Statement; |
| 46 | +import java.sql.Types; |
42 | 47 | import java.util.Properties; |
43 | 48 |
|
44 | 49 | /** |
@@ -169,6 +174,26 @@ public void close() throws IOException { |
169 | 174 | }; |
170 | 175 | } |
171 | 176 |
|
| 177 | + @Override |
| 178 | + protected DBSplitter getSplitter(int sqlDataType) { |
| 179 | + if (sqlDataType == Types.NUMERIC || sqlDataType == Types.DECIMAL) { |
| 180 | + return new CustomBigDecimalSplitter(); |
| 181 | + } |
| 182 | + return super.getSplitter(sqlDataType); |
| 183 | + } |
| 184 | + |
| 185 | + static class CustomBigDecimalSplitter extends BigDecimalSplitter { |
| 186 | + @Override |
| 187 | + protected BigDecimal tryDivide(BigDecimal numerator, BigDecimal denominator) { |
| 188 | + BigDecimal size = numerator.divide(denominator, RoundingMode.HALF_UP); |
| 189 | + if (size.compareTo(new BigDecimal("0")) <= 0) { |
| 190 | + int effectiveScale = Math.max(numerator.scale(), denominator.scale()) + 5; |
| 191 | + return numerator.divide(denominator, effectiveScale, RoundingMode.HALF_UP); |
| 192 | + } |
| 193 | + return size; |
| 194 | + } |
| 195 | + } |
| 196 | + |
172 | 197 | @Override |
173 | 198 | protected void closeConnection() { |
174 | 199 | super.closeConnection(); |
|
0 commit comments