Skip to content

Commit e43943d

Browse files
committed
Adding suppport in Decimal
1 parent d625225 commit e43943d

File tree

5 files changed

+53
-7
lines changed

5 files changed

+53
-7
lines changed

README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,11 @@ Planned for a future release — a complete end-to-end example will be added onc
146146
| long/Long | UInt64 || Serialize.writeUInt64 |
147147
| BigInteger | UInt128 || Serialize.writeUInt128 |
148148
| BigInteger | UInt256 || Serialize.writeUInt256 |
149-
| BigDecimal | Decimal | | N/A |
150-
| BigDecimal | Decimal32 | | N/A |
151-
| BigDecimal | Decimal64 | | N/A |
152-
| BigDecimal | Decimal128 | | N/A |
153-
| BigDecimal | Decimal256 | | N/A |
149+
| BigDecimal | Decimal | | Serialize.writeDecimal |
150+
| BigDecimal | Decimal32 | | Serialize.writeDecimal |
151+
| BigDecimal | Decimal64 | | Serialize.writeDecimal |
152+
| BigDecimal | Decimal128 | | Serialize.writeDecimal |
153+
| BigDecimal | Decimal256 | | Serialize.writeDecimal |
154154
| float/Float | Float || Serialize.writeFloat32 |
155155
| double/Double | Double || Serialize.writeFloat64 |
156156
| boolean/Boolean | Boolean || Serialize.writeBoolean |

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,11 @@ void SimplePOJODataTest() throws Exception {
218218
"uint64Object UInt64," +
219219
"uint128Object UInt128," +
220220
"uint256Object UInt256," +
221+
"decimal Decimal," +
222+
"decimal32 Decimal32(9)," +
223+
"decimal64 Decimal64(18)," +
224+
"decimal128 Decimal128(38)," +
225+
"decimal256 Decimal256(76)," +
221226
"floatPrimitive Float," +
222227
"floatObject Float," +
223228
"doublePrimitive Double," +
@@ -501,6 +506,11 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
501506
"uint64Object UInt64," +
502507
"uint128Object UInt128," +
503508
"uint256Object UInt256," +
509+
"decimal Decimal," +
510+
"decimal32 Decimal32(9)," +
511+
"decimal64 Decimal64(18)," +
512+
"decimal128 Decimal128(38)," +
513+
"decimal256 Decimal256(76)," +
504514
"floatPrimitive Float," +
505515
"floatObject Float," +
506516
"doublePrimitive Double," +

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
4242
Serialize.writeUInt128(out, input.getUint128Object(), false, false, ClickHouseDataType.UInt128, false, "bigInteger128");
4343
Serialize.writeUInt256(out, input.getUint256Object(), false, false, ClickHouseDataType.UInt256, false, "bigInteger256");
4444

45+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal, false, "decimal", 10, 5);
46+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal32, false, "decimal32", 9, 1);
47+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal64, false, "decimal64", 18, 10);
48+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal128, false, "decimal128", 38, 19);
49+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal256, false, "decimal256", 76, 39);
50+
4551
Serialize.writeFloat32(out, input.getFloatPrimitive(), false, false, ClickHouseDataType.Float32, false, "floatPrimitive");
4652
Serialize.writeFloat32(out, input.getFloatObject(), false, false, ClickHouseDataType.Float32, false, "floatObject");
4753

@@ -52,7 +58,7 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
5258
Serialize.writeBoolean(out, input.getBooleanObject(), false, false, ClickHouseDataType.Bool, false, "booleanObject");
5359

5460
Serialize.writeString(out, input.getStr(), false, false, ClickHouseDataType.String, false, "str");
55-
Serialize.writeFixedString(out, input.getFixedStr(), false, false, ClickHouseDataType.FixedString, false, 10, "fixedStr");
61+
Serialize.writeFixedString(out, input.getFixedStr(), false, false, ClickHouseDataType.FixedString, false, "fixedStr", 10);
5662

5763
Serialize.writeDate(out, input.getDate(), false, false, ClickHouseDataType.Date, false, "v_date");
5864
Serialize.writeDate32(out, input.getDate32(), false, false, ClickHouseDataType.Date32, false, "v_date32");

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/pojo/SimplePOJO.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.apache.flink.connector.clickhouse.sink.pojo;
22

3+
import java.math.BigDecimal;
34
import java.math.BigInteger;
45
import java.time.LocalDate;
56
import java.time.LocalDateTime;
@@ -33,6 +34,12 @@ public class SimplePOJO {
3334
private BigInteger uint128Object;
3435
private BigInteger uint256Object;
3536

37+
private BigDecimal bigDecimal;
38+
private BigDecimal bigDecimal32;
39+
private BigDecimal bigDecimal64;
40+
private BigDecimal bigDecimal128;
41+
private BigDecimal bigDecimal256;
42+
3643
private float floatPrimitive;
3744
private Float floatObject;
3845

@@ -82,6 +89,12 @@ public SimplePOJO(int index) {
8289
this.uint128Object = BigInteger.valueOf(index);
8390
this.uint256Object = BigInteger.valueOf(index);
8491

92+
this.bigDecimal = new BigDecimal(index);
93+
this.bigDecimal32 = new BigDecimal(index);
94+
this.bigDecimal64 = new BigDecimal(index);
95+
this.bigDecimal128 = new BigDecimal(index);
96+
this.bigDecimal256 = new BigDecimal(index);
97+
8598
this.floatPrimitive = Float.MIN_VALUE;
8699
this.floatObject = Float.MAX_VALUE;
87100

@@ -156,6 +169,16 @@ public Long getLongObject() {
156169

157170
public BigInteger getUint256Object() { return uint256Object; }
158171

172+
public BigDecimal getBigDecimal() { return bigDecimal; }
173+
174+
public BigDecimal getBigDecimal32() { return bigDecimal32; }
175+
176+
public BigDecimal getBigDecimal64() { return bigDecimal64; }
177+
178+
public BigDecimal getBigDecimal128() { return bigDecimal128; }
179+
180+
public BigDecimal getBigDecimal256() { return bigDecimal256; }
181+
159182
public float getFloatPrimitive() {
160183
return floatPrimitive;
161184
}

flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Serialize.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.io.IOException;
1111
import java.io.OutputStream;
1212
import java.lang.reflect.Method;
13+
import java.math.BigDecimal;
1314
import java.math.BigInteger;
1415
import java.time.LocalDate;
1516
import java.time.LocalDateTime;
@@ -174,7 +175,7 @@ public static void writeString(OutputStream out, String value, boolean defaultsS
174175
}
175176

176177
// Add a boundary check before inserting
177-
public static void writeFixedString(OutputStream out, String value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, int size, String column) throws IOException {
178+
public static void writeFixedString(OutputStream out, String value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column, int size) throws IOException {
178179
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
179180
BinaryStreamUtils.writeFixedString(out, convertToString(value), size);
180181
}
@@ -257,6 +258,12 @@ public static void writeUInt256(OutputStream out, BigInteger value, boolean defa
257258
BinaryStreamUtils.writeUnsignedInt256(out, value);
258259
}
259260
}
261+
// Decimal
262+
public static void writeDecimal(OutputStream out, BigDecimal value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column, int precision, int scale) throws IOException {
263+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
264+
BinaryStreamUtils.writeDecimal(out, value, precision, scale);
265+
}
266+
}
260267

261268
// Float32
262269
public static void writeFloat32(OutputStream out, Float value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {

0 commit comments

Comments
 (0)