2121import org .apache .flink .cdc .common .event .CreateTableEvent ;
2222import org .apache .flink .cdc .common .event .Event ;
2323import org .apache .flink .cdc .common .schema .Schema ;
24+ import org .apache .flink .cdc .common .types .DataType ;
2425import org .apache .flink .cdc .connectors .base .options .StartupOptions ;
2526import org .apache .flink .cdc .connectors .base .source .meta .offset .OffsetFactory ;
27+ import org .apache .flink .cdc .connectors .base .source .meta .split .SnapshotSplit ;
28+ import org .apache .flink .cdc .connectors .base .source .meta .split .SourceSplitBase ;
2629import org .apache .flink .cdc .connectors .base .source .meta .split .SourceSplitState ;
2730import org .apache .flink .cdc .connectors .base .source .metrics .SourceReaderMetrics ;
2831import org .apache .flink .cdc .connectors .base .source .reader .IncrementalSourceRecordEmitter ;
2932import org .apache .flink .cdc .connectors .postgres .source .PostgresDialect ;
3033import org .apache .flink .cdc .connectors .postgres .source .config .PostgresSourceConfig ;
3134import org .apache .flink .cdc .connectors .postgres .source .utils .TableDiscoveryUtils ;
3235import org .apache .flink .cdc .connectors .postgres .utils .PostgresSchemaUtils ;
36+ import org .apache .flink .cdc .connectors .postgres .utils .PostgresTypeUtils ;
3337import org .apache .flink .cdc .debezium .DebeziumDeserializationSchema ;
38+ import org .apache .flink .cdc .debezium .event .DebeziumEventDeserializationSchema ;
3439import org .apache .flink .connector .base .source .reader .RecordEmitter ;
3540
3641import io .debezium .connector .postgresql .connection .PostgresConnection ;
42+ import io .debezium .data .Envelope ;
43+ import io .debezium .relational .Column ;
44+ import io .debezium .relational .Table ;
3745import io .debezium .relational .TableId ;
46+ import io .debezium .relational .history .TableChanges ;
47+ import org .apache .kafka .connect .data .Field ;
48+ import org .apache .kafka .connect .data .Struct ;
3849import org .apache .kafka .connect .source .SourceRecord ;
3950
4051import java .sql .SQLException ;
41- import java .util .ArrayList ;
52+ import java .util .HashMap ;
4253import java .util .HashSet ;
4354import java .util .List ;
55+ import java .util .Map ;
56+ import java .util .Objects ;
4457import java .util .Set ;
4558
59+ import static io .debezium .connector .AbstractSourceInfo .SCHEMA_NAME_KEY ;
60+ import static io .debezium .connector .AbstractSourceInfo .TABLE_NAME_KEY ;
4661import static org .apache .flink .cdc .connectors .base .source .meta .wartermark .WatermarkEvent .isLowWatermarkEvent ;
47- import static org .apache .flink .cdc .connectors .base .utils .SourceRecordUtils .getTableId ;
4862import static org .apache .flink .cdc .connectors .base .utils .SourceRecordUtils .isDataChangeRecord ;
4963import static org .apache .flink .cdc .connectors .base .utils .SourceRecordUtils .isSchemaChangeEvent ;
5064
@@ -60,7 +74,7 @@ public class PostgresPipelineRecordEmitter<T> extends IncrementalSourceRecordEmi
6074 private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true ;
6175 private boolean isBounded = false ;
6276
63- private final List < CreateTableEvent > createTableEventCache = new ArrayList <>() ;
77+ private final Map < TableId , CreateTableEvent > createTableEventCache ;
6478
6579 public PostgresPipelineRecordEmitter (
6680 DebeziumDeserializationSchema debeziumDeserializationSchema ,
@@ -76,19 +90,40 @@ public PostgresPipelineRecordEmitter(
7690 this .sourceConfig = sourceConfig ;
7791 this .postgresDialect = postgresDialect ;
7892 this .alreadySendCreateTableTables = new HashSet <>();
93+ this .createTableEventCache =
94+ ((DebeziumEventDeserializationSchema ) debeziumDeserializationSchema )
95+ .getCreateTableEventCache ();
7996 generateCreateTableEvent (sourceConfig );
8097 this .isBounded = StartupOptions .snapshot ().equals (sourceConfig .getStartupOptions ());
8198 }
8299
100+ @ Override
101+ public void applySplit (SourceSplitBase split ) {
102+ if ((isBounded ) && createTableEventCache .isEmpty () && split instanceof SnapshotSplit ) {
103+ // TableSchemas in SnapshotSplit only contains one table.
104+ createTableEventCache .putAll (generateCreateTableEvent (sourceConfig ));
105+ } else {
106+ for (TableChanges .TableChange tableChange : split .getTableSchemas ().values ()) {
107+ CreateTableEvent createTableEvent =
108+ new CreateTableEvent (
109+ toCdcTableId (tableChange .getId ()),
110+ buildSchemaFromTable (tableChange .getTable ()));
111+ ((DebeziumEventDeserializationSchema ) debeziumDeserializationSchema )
112+ .applyChangeEvent (createTableEvent );
113+ }
114+ }
115+ }
116+
83117 @ Override
84118 protected void processElement (
85119 SourceRecord element , SourceOutput <T > output , SourceSplitState splitState )
86120 throws Exception {
87121 if (shouldEmitAllCreateTableEventsInSnapshotMode && isBounded ) {
88122 // In snapshot mode, we simply emit all schemas at once.
89- for (CreateTableEvent createTableEvent : createTableEventCache ) {
90- output .collect ((T ) createTableEvent );
91- }
123+ createTableEventCache .forEach (
124+ (tableId , createTableEvent ) -> {
125+ output .collect ((T ) createTableEvent );
126+ });
92127 shouldEmitAllCreateTableEventsInSnapshotMode = false ;
93128 } else if (isLowWatermarkEvent (element ) && splitState .isSnapshotSplitState ()) {
94129 TableId tableId = splitState .asSnapshotSplitState ().toSourceSplit ().getTableId ();
@@ -99,21 +134,61 @@ protected void processElement(
99134 }
100135 }
101136 } else {
102- if (isDataChangeRecord (element ) || isSchemaChangeEvent (element )) {
137+ boolean isDataChangeRecord = isDataChangeRecord (element );
138+ if (isDataChangeRecord || isSchemaChangeEvent (element )) {
103139 TableId tableId = getTableId (element );
104140 if (!alreadySendCreateTableTables .contains (tableId )) {
105- for (CreateTableEvent createTableEvent : createTableEventCache ) {
106- if (createTableEvent != null ) {
107- output .collect ((T ) createTableEvent );
108- }
141+ CreateTableEvent createTableEvent = createTableEventCache .get (tableId );
142+ if (createTableEvent != null ) {
143+ output .collect ((T ) createTableEvent );
109144 }
110145 alreadySendCreateTableTables .add (tableId );
111146 }
147+ // In rare case, we may miss some CreateTableEvents before DataChangeEvents.
148+ // Don't send CreateTableEvent for SchemaChangeEvents as it's the latest schema.
149+ if (isDataChangeRecord && !createTableEventCache .containsKey (tableId )) {
150+ CreateTableEvent createTableEvent = getCreateTableEvent (sourceConfig , tableId );
151+ output .collect ((T ) createTableEvent );
152+ createTableEventCache .put (tableId , createTableEvent );
153+ }
112154 }
113155 }
114156 super .processElement (element , output , splitState );
115157 }
116158
159+ private Schema buildSchemaFromTable (Table table ) {
160+ List <Column > columns = table .columns ();
161+ Schema .Builder tableBuilder = Schema .newBuilder ();
162+ for (int i = 0 ; i < columns .size (); i ++) {
163+ Column column = columns .get (i );
164+
165+ String colName = column .name ();
166+ DataType dataType ;
167+ try (PostgresConnection jdbc = postgresDialect .openJdbcConnection ()) {
168+ dataType =
169+ PostgresTypeUtils .fromDbzColumn (
170+ column ,
171+ this .sourceConfig .getDbzConnectorConfig (),
172+ jdbc .getTypeRegistry ());
173+ }
174+ if (!column .isOptional ()) {
175+ dataType = dataType .notNull ();
176+ }
177+ tableBuilder .physicalColumn (
178+ colName ,
179+ dataType ,
180+ column .comment (),
181+ column .defaultValueExpression ().orElse (null ));
182+ }
183+ tableBuilder .comment (table .comment ());
184+
185+ List <String > primaryKey = table .primaryKeyColumnNames ();
186+ if (Objects .nonNull (primaryKey ) && !primaryKey .isEmpty ()) {
187+ tableBuilder .primaryKey (primaryKey );
188+ }
189+ return tableBuilder .build ();
190+ }
191+
117192 private void sendCreateTableEvent (
118193 PostgresConnection jdbc , TableId tableId , SourceOutput <Event > output ) {
119194 Schema schema = PostgresSchemaUtils .getTableSchema (tableId , sourceConfig , jdbc );
@@ -124,8 +199,40 @@ private void sendCreateTableEvent(
124199 schema ));
125200 }
126201
127- private void generateCreateTableEvent (PostgresSourceConfig sourceConfig ) {
202+ private org .apache .flink .cdc .common .event .TableId toCdcTableId (
203+ io .debezium .relational .TableId dbzTableId ) {
204+ String schemaName =
205+ dbzTableId .catalog () == null ? dbzTableId .schema () : dbzTableId .catalog ();
206+ return org .apache .flink .cdc .common .event .TableId .tableId (schemaName , dbzTableId .table ());
207+ }
208+
209+ private CreateTableEvent getCreateTableEvent (
210+ PostgresSourceConfig sourceConfig , TableId tableId ) {
211+ try (PostgresConnection jdbc = postgresDialect .openJdbcConnection ()) {
212+ Schema schema = PostgresSchemaUtils .getTableSchema (tableId , sourceConfig , jdbc );
213+ return new CreateTableEvent (
214+ org .apache .flink .cdc .common .event .TableId .tableId (
215+ tableId .schema (), tableId .table ()),
216+ schema );
217+ }
218+ }
219+
220+ private TableId getTableId (SourceRecord dataRecord ) {
221+ Struct value = (Struct ) dataRecord .value ();
222+ Struct source = value .getStruct (Envelope .FieldName .SOURCE );
223+ Field field = source .schema ().field (SCHEMA_NAME_KEY );
224+ String schemaName = null ;
225+ if (field != null ) {
226+ schemaName = source .getString (SCHEMA_NAME_KEY );
227+ }
228+ String tableName = source .getString (TABLE_NAME_KEY );
229+ return new TableId (null , schemaName , tableName );
230+ }
231+
232+ private Map <TableId , CreateTableEvent > generateCreateTableEvent (
233+ PostgresSourceConfig sourceConfig ) {
128234 try (PostgresConnection jdbc = postgresDialect .openJdbcConnection ()) {
235+ Map <TableId , CreateTableEvent > createTableEventCache = new HashMap <>();
129236 List <TableId > capturedTableIds =
130237 TableDiscoveryUtils .listTables (
131238 sourceConfig .getDatabaseList ().get (0 ),
@@ -134,12 +241,14 @@ private void generateCreateTableEvent(PostgresSourceConfig sourceConfig) {
134241 sourceConfig .includePartitionedTables ());
135242 for (TableId tableId : capturedTableIds ) {
136243 Schema schema = PostgresSchemaUtils .getTableSchema (tableId , sourceConfig , jdbc );
137- createTableEventCache .add (
244+ createTableEventCache .put (
245+ tableId ,
138246 new CreateTableEvent (
139247 org .apache .flink .cdc .common .event .TableId .tableId (
140248 tableId .schema (), tableId .table ()),
141249 schema ));
142250 }
251+ return createTableEventCache ;
143252 } catch (SQLException e ) {
144253 throw new RuntimeException ("Cannot start emitter to fetch table schema." , e );
145254 }
0 commit comments