Skip to content

Commit 0b6c2e8

Browse files
authored
[FLINK-38398][pipeline-connector][postgresql] Improve PostgresSQL temporal field type supported with debezium.time.precision.mode=connect config (#4132)
1 parent 9ea8224 commit 0b6c2e8

File tree

6 files changed

+278
-15
lines changed

6 files changed

+278
-15
lines changed

docs/content.zh/docs/connectors/pipeline-connectors/postgres.md

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -417,11 +417,11 @@ pipeline:
417417
- debezium.time.precision.mode=adaptive_time_microseconds
418418
- debezium.time.precision.mode=connect
419419

420-
注意: 受限当前CDC对时间类型的支持,<code>debezium.time.precision.mode</code>为adaptive或adaptive_time_microseconds或connect Time类型都转化为Integer类型,并精度为3,后续将进行完善
420+
注意: 受限当前CDC对时间类型Time的精度为3,<code>debezium.time.precision.mode</code>为adaptive或adaptive_time_microseconds或connect Time类型都转化为Time(3)类型
421421

422422
<u>debezium.time.precision.mode=adaptive</u>
423423

424-
当<code>debezium.time.precision.mode</code>属性设置为默认的 adaptive(自适应)时,连接器会根据列的数据类型定义来确定字面类型和语义类型。这可以确保事件能够精确地表示数据库中的值
424+
当<code>debezium.time.precision.mode</code>属性设置为默认的 adaptive(自适应)时,TIME的精度为3,TIMESTAMP的精度为6
425425
<div class="wy-table-responsive">
426426
<table class="colwidths-auto docutils">
427427
<thead>
@@ -440,7 +440,7 @@ pipeline:
440440
<td>
441441
TIME([P])
442442
</td>
443-
<td>INTEGER</td>
443+
<td>TIME(3)</td>
444444
</tr>
445445
<tr>
446446
<td>
@@ -452,6 +452,72 @@ pipeline:
452452
</table>
453453
</div>
454454

455+
<u>debezium.time.precision.mode=adaptive_time_microseconds</u>
456+
457+
当<code>debezium.time.precision.mode</code>属性设置为默认的 adaptive_time_microseconds时,TIME的精度为3,TIMESTAMP的精度为6。
458+
<div class="wy-table-responsive">
459+
<table class="colwidths-auto docutils">
460+
<thead>
461+
<tr>
462+
<th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th>
463+
<th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th>
464+
</tr>
465+
</thead>
466+
<tbody>
467+
<tr>
468+
<td>
469+
DATE
470+
<td>DATE</td>
471+
</tr>
472+
<tr>
473+
<td>
474+
TIME([P])
475+
</td>
476+
<td>TIME(3)</td>
477+
</tr>
478+
<tr>
479+
<td>
480+
TIMESTAMP([P])
481+
</td>
482+
<td>TIMESTAMP([P])</td>
483+
</tr>
484+
</tbody>
485+
</table>
486+
</div>
487+
488+
<u>debezium.time.precision.mode=connect</u>
489+
490+
当<code>debezium.time.precision.mode</code>属性设置为默认的 connect时,TIME和TIMESTAMP的精度都为3。
491+
<div class="wy-table-responsive">
492+
<table class="colwidths-auto docutils">
493+
<thead>
494+
<tr>
495+
<th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th>
496+
<th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th>
497+
</tr>
498+
</thead>
499+
<tbody>
500+
<tr>
501+
<td>
502+
DATE
503+
<td>DATE</td>
504+
</tr>
505+
<tr>
506+
<td>
507+
TIME([P])
508+
</td>
509+
<td>TIME(3)</td>
510+
</tr>
511+
<tr>
512+
<td>
513+
TIMESTAMP([P])
514+
</td>
515+
<td>TIMESTAMP(3)</td>
516+
</tr>
517+
</tbody>
518+
</table>
519+
</div>
520+
455521
### Decimal types Mapping
456522
PostgreSQL 连接器配置属性 <code>debezium.decimal.handling.mode</code> 的设置决定了连接器如何映射十进制类型。
457523

docs/content/docs/connectors/pipeline-connectors/postgres.md

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -412,11 +412,11 @@ Other than PostgreSQL’s TIMESTAMPTZ data types, which contain time zone inform
412412
- debezium.time.precision.mode=adaptive_time_microseconds
413413
- debezium.time.precision.mode=connect
414414

415-
Note: Due to current CDC limitations in supporting time types, when <code>debezium.time.precision.mode</code> is set to "adaptive", "adaptive_time_microseconds", or when using Connect time types, all time values are converted to the Integer type with a precision of 3. This will be improved in future updates.
415+
Note: Due to the current CDC limitation, the precision for the TIME type is fixed at 3. Regardless of whether <code>debezium.time.precision.mode<code> is set to adaptive, adaptive_time_microseconds, or connect, the TIME type will be converted to TIME(3).
416416

417417
<u>debezium.time.precision.mode=adaptive</u>
418418

419-
When the <code>debezium.time.precision.mode</code> property is set to adaptive, the default, the connector determines the literal type and semantic type based on the column’s data type definition. This ensures that events exactly represent the values in the database.
419+
When the <code>debezium.time.precision.mode</code> property is set to the default value `adaptive_time_microseconds`, the precision of `TIME` is 3, and the precision of `TIMESTAMP` is 6.
420420
<div class="wy-table-responsive">
421421
<table class="colwidths-auto docutils">
422422
<thead>
@@ -435,7 +435,7 @@ When the <code>debezium.time.precision.mode</code> property is set to adaptive,
435435
<td>
436436
TIME([P])
437437
</td>
438-
<td>INTEGER</td>
438+
<td>TIME(3)</td>
439439
</tr>
440440
<tr>
441441
<td>
@@ -447,6 +447,72 @@ When the <code>debezium.time.precision.mode</code> property is set to adaptive,
447447
</table>
448448
</div>
449449

450+
<u>debezium.time.precision.mode=adaptive_time_microseconds</u>
451+
452+
When the `debezium.time.precision.mode` property is set to the value `adaptive_time_microseconds`, the precision of `TIME` is 3, and the precision of `TIMESTAMP` is 6.
453+
<div class="wy-table-responsive">
454+
<table class="colwidths-auto docutils">
455+
<thead>
456+
<tr>
457+
<th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th>
458+
<th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th>
459+
</tr>
460+
</thead>
461+
<tbody>
462+
<tr>
463+
<td>
464+
DATE
465+
<td>DATE</td>
466+
</tr>
467+
<tr>
468+
<td>
469+
TIME([P])
470+
</td>
471+
<td>TIME(3)</td>
472+
</tr>
473+
<tr>
474+
<td>
475+
TIMESTAMP([P])
476+
</td>
477+
<td>TIMESTAMP([P])</td>
478+
</tr>
479+
</tbody>
480+
</table>
481+
</div>
482+
483+
<u>debezium.time.precision.mode=connect</u>
484+
485+
When the <code>debezium.time.precision.mode</code> property is set to the default value connect, both TIME and TIMESTAMP have a precision of 3.
486+
<div class="wy-table-responsive">
487+
<table class="colwidths-auto docutils">
488+
<thead>
489+
<tr>
490+
<th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th>
491+
<th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th>
492+
</tr>
493+
</thead>
494+
<tbody>
495+
<tr>
496+
<td>
497+
DATE
498+
<td>DATE</td>
499+
</tr>
500+
<tr>
501+
<td>
502+
TIME([P])
503+
</td>
504+
<td>TIME(3)</td>
505+
</tr>
506+
<tr>
507+
<td>
508+
TIMESTAMP([P])
509+
</td>
510+
<td>TIMESTAMP(3)</td>
511+
</tr>
512+
</tbody>
513+
</table>
514+
</div>
515+
450516
### Decimal types Mapping
451517
The setting of the PostgreSQL connector configuration property <code>debezium.decimal.handling.mode</code> determines how the connector maps decimal types.
452518

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ public static DataType handleTimeWithTemporalMode(TemporalPrecisionMode mode, in
277277
case ADAPTIVE:
278278
case ADAPTIVE_TIME_MICROSECONDS:
279279
case CONNECT:
280-
return DataTypes.INT();
280+
return DataTypes.TIME(scale);
281281
default:
282282
throw new IllegalArgumentException("Unknown temporal precision mode: " + mode);
283283
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java

Lines changed: 118 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import java.sql.Timestamp;
6868
import java.time.Instant;
6969
import java.time.LocalDateTime;
70+
import java.time.LocalTime;
7071
import java.time.ZoneId;
7172
import java.util.ArrayList;
7273
import java.util.HashMap;
@@ -289,9 +290,9 @@ public void testTimeTypesWithTemporalModeAdaptive() throws Exception {
289290
new Object[] {
290291
2,
291292
DateData.fromEpochDay(18460),
292-
64822000,
293-
64822123,
294-
64822123,
293+
TimeData.fromLocalTime(LocalTime.parse("18:00:22")),
294+
TimeData.fromLocalTime(LocalTime.parse("18:00:22.123")),
295+
TimeData.fromLocalTime(LocalTime.parse("18:00:22.123456")),
295296
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
296297
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
297298
TimestampData.fromLocalDateTime(
@@ -306,6 +307,117 @@ public void testTimeTypesWithTemporalModeAdaptive() throws Exception {
306307
.isEqualTo(expectedSnapshot);
307308
}
308309

310+
@Test
311+
public void testTimeTypesWithTemporalModeMicroSeconds() throws Exception {
312+
initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
313+
314+
Properties debeziumProps = new Properties();
315+
debeziumProps.setProperty("time.precision.mode", "adaptive_time_microseconds");
316+
317+
PostgresSourceConfigFactory configFactory =
318+
(PostgresSourceConfigFactory)
319+
new PostgresSourceConfigFactory()
320+
.hostname(POSTGIS_CONTAINER.getHost())
321+
.port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
322+
.username(TEST_USER)
323+
.password(TEST_PASSWORD)
324+
.databaseList(POSTGRES_CONTAINER.getDatabaseName())
325+
.tableList("inventory.time_types")
326+
.startupOptions(StartupOptions.initial())
327+
.debeziumProperties(debeziumProps)
328+
.serverTimeZone("UTC");
329+
configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
330+
configFactory.slotName(slotName);
331+
configFactory.decodingPluginName("pgoutput");
332+
333+
FlinkSourceProvider sourceProvider =
334+
(FlinkSourceProvider)
335+
new PostgresDataSource(configFactory).getEventSourceProvider();
336+
337+
CloseableIterator<Event> events =
338+
env.fromSource(
339+
sourceProvider.getSource(),
340+
WatermarkStrategy.noWatermarks(),
341+
PostgresDataSourceFactory.IDENTIFIER,
342+
new EventTypeInfo())
343+
.executeAndCollect();
344+
345+
Object[] expectedSnapshot =
346+
new Object[] {
347+
2,
348+
DateData.fromEpochDay(18460),
349+
TimeData.fromLocalTime(LocalTime.parse("18:00:22")),
350+
TimeData.fromLocalTime(LocalTime.parse("18:00:22.123")),
351+
TimeData.fromLocalTime(LocalTime.parse("18:00:22.123456")),
352+
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
353+
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
354+
TimestampData.fromLocalDateTime(
355+
LocalDateTime.parse("2020-07-17T18:00:22.123456")),
356+
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
357+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
358+
};
359+
360+
List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
361+
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
362+
Assertions.assertThat(recordFields(snapshotRecord, TIME_TYPES_WITH_ADAPTIVE))
363+
.isEqualTo(expectedSnapshot);
364+
}
365+
366+
@Test
367+
public void testTimeTypesWithTemporalModeConnect() throws Exception {
368+
initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
369+
370+
Properties debeziumProps = new Properties();
371+
debeziumProps.setProperty("time.precision.mode", "connect");
372+
373+
PostgresSourceConfigFactory configFactory =
374+
(PostgresSourceConfigFactory)
375+
new PostgresSourceConfigFactory()
376+
.hostname(POSTGIS_CONTAINER.getHost())
377+
.port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
378+
.username(TEST_USER)
379+
.password(TEST_PASSWORD)
380+
.databaseList(POSTGRES_CONTAINER.getDatabaseName())
381+
.tableList("inventory.time_types")
382+
.startupOptions(StartupOptions.initial())
383+
.debeziumProperties(debeziumProps)
384+
.serverTimeZone("UTC");
385+
configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
386+
configFactory.slotName(slotName);
387+
configFactory.decodingPluginName("pgoutput");
388+
389+
FlinkSourceProvider sourceProvider =
390+
(FlinkSourceProvider)
391+
new PostgresDataSource(configFactory).getEventSourceProvider();
392+
393+
CloseableIterator<Event> events =
394+
env.fromSource(
395+
sourceProvider.getSource(),
396+
WatermarkStrategy.noWatermarks(),
397+
PostgresDataSourceFactory.IDENTIFIER,
398+
new EventTypeInfo())
399+
.executeAndCollect();
400+
401+
Object[] expectedSnapshot =
402+
new Object[] {
403+
2,
404+
DateData.fromEpochDay(18460),
405+
TimeData.fromLocalTime(LocalTime.parse("18:00:22")),
406+
TimeData.fromLocalTime(LocalTime.parse("18:00:22.123")),
407+
TimeData.fromLocalTime(LocalTime.parse("18:00:22.123")),
408+
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
409+
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
410+
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
411+
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
412+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
413+
};
414+
415+
List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
416+
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
417+
Assertions.assertThat(recordFields(snapshotRecord, TIME_TYPES_WITH_ADAPTIVE))
418+
.isEqualTo(expectedSnapshot);
419+
}
420+
309421
@Test
310422
public void testHandlingDecimalModePrecise() throws Exception {
311423
initializePostgresTable(POSTGIS_CONTAINER, "decimal_mode_test");
@@ -923,9 +1035,9 @@ private Instant toInstant(String ts) {
9231035
RowType.of(
9241036
DataTypes.INT(),
9251037
DataTypes.DATE(),
926-
DataTypes.INT(),
927-
DataTypes.INT(),
928-
DataTypes.INT(),
1038+
DataTypes.TIME(0),
1039+
DataTypes.TIME(3),
1040+
DataTypes.TIME(6),
9291041
DataTypes.TIMESTAMP(0),
9301042
DataTypes.TIMESTAMP(3),
9311043
DataTypes.TIMESTAMP(6),

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import java.nio.ByteBuffer;
6767
import java.time.Instant;
6868
import java.util.Collections;
69+
import java.util.Date;
6970
import java.util.HashMap;
7071
import java.util.List;
7172
import java.util.Map;
@@ -313,6 +314,10 @@ protected Object convertToFloat(Object dbzObj, Schema schema) {
313314
}
314315

315316
protected Object convertToDate(Object dbzObj, Schema schema) {
317+
if (dbzObj instanceof Date) {
318+
Instant instant = ((Date) dbzObj).toInstant();
319+
return DateData.fromLocalDate(instant.atZone(java.time.ZoneOffset.UTC).toLocalDate());
320+
}
316321
return DateData.fromLocalDate(TemporalConversions.toLocalDate(dbzObj));
317322
}
318323

@@ -326,6 +331,9 @@ protected Object convertToTime(Object dbzObj, Schema schema) {
326331
}
327332
} else if (dbzObj instanceof Integer) {
328333
return TimeData.fromMillisOfDay((int) dbzObj);
334+
} else if (dbzObj instanceof Date) {
335+
long millisOfDay = ((Date) dbzObj).getTime() % (24 * 60 * 60 * 1000);
336+
return TimeData.fromMillisOfDay((int) millisOfDay);
329337
}
330338
// get number of milliseconds of the day
331339
return TimeData.fromLocalTime(TemporalConversions.toLocalTime(dbzObj));
@@ -346,6 +354,12 @@ protected Object convertToTimestamp(Object dbzObj, Schema schema) {
346354
Math.floorDiv(nano, 1000_000), (int) (Math.floorMod(nano, 1000_000)));
347355
}
348356
}
357+
if (dbzObj instanceof Date) {
358+
if (schema.name().equals(org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME)) {
359+
Instant instant = ((Date) dbzObj).toInstant();
360+
return TimestampData.fromMillis(instant.toEpochMilli());
361+
}
362+
}
349363
throw new IllegalArgumentException(
350364
"Unable to convert to TIMESTAMP from unexpected value '"
351365
+ dbzObj

0 commit comments

Comments
 (0)