Skip to content

Commit 2f36900

Browse files
committed
Add support for Date/Date32/DateTime/DateTime64
1 parent 34532e4 commit 2f36900

File tree

4 files changed

+73
-2
lines changed

4 files changed

+73
-2
lines changed

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,11 @@ void SimplePOJODataTest() throws Exception {
215215
"booleanPrimitive Boolean," +
216216
"booleanObject Boolean," +
217217
"str String," +
218-
"fixedStr FixedString(10)" +
218+
"fixedStr FixedString(10)," +
219+
"v_date Date," +
220+
"v_date32 Date32," +
221+
"v_dateTime DateTime," +
222+
"v_dateTime64 DateTime64," +
219223
") " +
220224
"ENGINE = MergeTree " +
221225
"ORDER BY (longPrimitive); ";
@@ -484,7 +488,11 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
484488
"booleanPrimitive Boolean," +
485489
"booleanObject Boolean," +
486490
"str String," +
487-
"fixedStr FixedString(10)" +
491+
"fixedStr FixedString(10)," +
492+
"v_date Date," +
493+
"v_date32 Date32," +
494+
"v_dateTime DateTime," +
495+
"v_dateTime64 DateTime64," +
488496
") " +
489497
"ENGINE = MergeTree " +
490498
"ORDER BY (longPrimitive) " +

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,10 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
3838
Serialize.writeString(out, input.getStr(), false, false, ClickHouseDataType.String, false, "str");
3939
Serialize.writeFixedString(out, input.getFixedStr(), false, false, ClickHouseDataType.FixedString, false, 10, "fixedStr");
4040

41+
Serialize.writeDate(out, input.getDate(), false, false, ClickHouseDataType.Date, false, "v_date");
42+
Serialize.writeDate32(out, input.getDate32(), false, false, ClickHouseDataType.Date32, false, "v_date32");
43+
Serialize.writeTimeDate(out, input.getDateTime(), false, false, ClickHouseDataType.DateTime, false, "v_dateTime");
44+
Serialize.writeTimeDate64(out, input.getDateTime64(), false, false, ClickHouseDataType.DateTime64, false, "v_dateTime64", 1);
45+
4146
}
4247
}

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/pojo/SimplePOJO.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package org.apache.flink.connector.clickhouse.sink.pojo;
22

33
import java.math.BigInteger;
4+
import java.time.LocalDate;
5+
import java.time.LocalDateTime;
46

57
public class SimplePOJO {
68

@@ -32,6 +34,11 @@ public class SimplePOJO {
3234
private BigInteger bigInteger128;
3335
private BigInteger bigInteger256;
3436

37+
private LocalDate date;
38+
private LocalDate date32;
39+
private LocalDateTime dateTime;
40+
private LocalDateTime dateTime64;
41+
3542
public SimplePOJO(int index) {
3643
this.bytePrimitive = Byte.MIN_VALUE;
3744
this.byteObject = Byte.MAX_VALUE;
@@ -60,6 +67,10 @@ public SimplePOJO(int index) {
6067
this.bigInteger128 = BigInteger.valueOf(longPrimitive);
6168
this.bigInteger256 = BigInteger.valueOf(longPrimitive);
6269

70+
this.date = LocalDate.ofEpochDay(0);
71+
this.date32 = LocalDate.ofEpochDay(0);
72+
this.dateTime = LocalDateTime.now();
73+
this.dateTime64 = LocalDateTime.now();
6374

6475
}
6576

@@ -122,4 +133,13 @@ public Double getDoubleObject() {
122133
public BigInteger getBigInteger256() { return bigInteger256; }
123134

124135
public String getFixedStr() { return fixedStr; }
136+
137+
public LocalDate getDate() { return date; }
138+
139+
public LocalDate getDate32() { return date32; }
140+
141+
public LocalDateTime getDateTime() { return dateTime; }
142+
143+
public LocalDateTime getDateTime64() { return dateTime64; }
144+
125145
}

flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Serialize.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.lang.reflect.Method;
1313
import java.math.BigInteger;
1414
import java.time.LocalDate;
15+
import java.time.LocalDateTime;
1516
import java.time.ZoneId;
1617
import java.time.ZonedDateTime;
1718
import java.util.HashMap;
@@ -128,6 +129,43 @@ public static void writeDate(OutputStream out, ZonedDateTime value, boolean defa
128129
}
129130
}
130131

132+
public static void writeDate32(OutputStream out, LocalDate value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
133+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
134+
SerializerUtils.writeDate32(out, value, ZoneId.of("UTC")); // TODO: check
135+
}
136+
}
137+
138+
public static void writeDate32(OutputStream out, ZonedDateTime value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
139+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
140+
SerializerUtils.writeDate32(out, value, ZoneId.of("UTC")); // TODO: check
141+
}
142+
}
143+
144+
// Support for DateTime section
145+
public static void writeTimeDate(OutputStream out, LocalDateTime value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
146+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
147+
SerializerUtils.writeDateTime(out, value, ZoneId.of("UTC")); // TODO: check
148+
}
149+
}
150+
151+
public static void writeTimeDate(OutputStream out, ZonedDateTime value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
152+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
153+
SerializerUtils.writeDateTime(out, value, ZoneId.of("UTC")); // TODO: check
154+
}
155+
}
156+
157+
public static void writeTimeDate64(OutputStream out, LocalDateTime value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column, int scale) throws IOException {
158+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
159+
SerializerUtils.writeDateTime64(out, value, scale, ZoneId.of("UTC")); // TODO: check
160+
}
161+
}
162+
163+
public static void writeTimeDate64(OutputStream out, ZonedDateTime value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column, int scale) throws IOException {
164+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
165+
SerializerUtils.writeDateTime64(out, value, scale, ZoneId.of("UTC")); // TODO: check
166+
}
167+
}
168+
131169
// clickhouse type String support
132170
public static void writeString(OutputStream out, String value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
133171
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {

0 commit comments

Comments
 (0)