Skip to content

Commit ef9e214

Browse files
authored
[cdc] Support computed columns when sync_database (#5816)
1 parent ffff4e0 commit ef9e214

File tree

9 files changed

+345
-44
lines changed

9 files changed

+345
-44
lines changed

docs/content/cdc-ingestion/kafka-cdc.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ To use this feature through `flink run`, run the following shell command.
214214
[--type_mapping to-string] \
215215
[--partition_keys <partition_keys>] \
216216
[--primary_keys <primary-keys>] \
217+
[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \
217218
[--kafka_conf <kafka-source-conf> [--kafka_conf <kafka-source-conf> ...]] \
218219
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
219220
[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
@@ -244,7 +245,8 @@ Synchronization from one Kafka topic to Paimon database.
244245
--catalog_conf uri=thrift://hive-metastore:9083 \
245246
--table_conf bucket=4 \
246247
--table_conf changelog-producer=input \
247-
--table_conf sink.parallelism=4
248+
--table_conf sink.parallelism=4 \
249+
--computed_column 'pt=date_format(event_tm, yyyyMMdd)'
248250
```
249251
250252
Synchronization from multiple Kafka topics to Paimon database.

docs/layouts/shortcodes/generated/kafka_sync_database.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
</tr>
9393
<tr>
9494
<td><h5>--computed_column</h5></td>
95-
<td>The definitions of computed columns. The argument field is from Kafka topic's table field name. See <a href="../overview/#computed-functions">here</a> for a complete list of configurations. </td>
95+
<td>The definitions of computed columns. The argument field is from Kafka topic's table field name. See <a href="../overview/#computed-functions">here</a> for a complete list of configurations. NOTICE: It returns null if the referenced column does not exist in the source table.</td>
9696
</tr>
9797
<tr>
9898
<td><h5>--eager_init</h5></td>

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ public String fieldReference() {
5353
return expression.fieldReference();
5454
}
5555

56+
@Nullable
57+
public DataType fieldReferenceType() {
58+
return expression.fieldReferenceType();
59+
}
60+
5661
/** Compute column's value from given argument. Return null if input is null. */
5762
@Nullable
5863
public String eval(@Nullable String input) {
@@ -61,4 +66,13 @@ public String eval(@Nullable String input) {
6166
}
6267
return expression.eval(input);
6368
}
69+
70+
/** Compute column's value from given argument. Return null if input is null. */
71+
@Nullable
72+
public String eval(@Nullable String input, DataType inputType) {
73+
if (fieldReference() != null && input == null) {
74+
return null;
75+
}
76+
return expression.eval(input, inputType);
77+
}
6478
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,18 @@
1818

1919
package org.apache.paimon.flink.action.cdc;
2020

21+
import org.apache.paimon.annotation.VisibleForTesting;
2122
import org.apache.paimon.types.DataField;
2223
import org.apache.paimon.types.DataType;
2324
import org.apache.paimon.utils.Preconditions;
2425

2526
import java.util.ArrayList;
27+
import java.util.Collections;
28+
import java.util.HashMap;
29+
import java.util.HashSet;
2630
import java.util.List;
2731
import java.util.Map;
32+
import java.util.Set;
2833
import java.util.stream.Collectors;
2934

3035
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -75,6 +80,90 @@ public static List<ComputedColumn> buildComputedColumns(
7580
Expression.create(typeMapping, caseSensitive, exprName, args)));
7681
}
7782

78-
return computedColumns;
83+
return sortComputedColumns(computedColumns);
84+
}
85+
86+
@VisibleForTesting
87+
public static List<ComputedColumn> sortComputedColumns(List<ComputedColumn> columns) {
88+
Set<String> columnNames = new HashSet<>();
89+
for (ComputedColumn col : columns) {
90+
columnNames.add(col.columnName());
91+
}
92+
93+
// For simple processing, no reference or referring to another computed column, means
94+
// independent
95+
List<ComputedColumn> independent = new ArrayList<>();
96+
List<ComputedColumn> dependent = new ArrayList<>();
97+
98+
for (ComputedColumn col : columns) {
99+
if (col.fieldReference() == null || !columnNames.contains(col.fieldReference())) {
100+
independent.add(col);
101+
} else {
102+
dependent.add(col);
103+
}
104+
}
105+
106+
// Sort dependent columns with topological sort
107+
Map<String, ComputedColumn> columnMap = new HashMap<>();
108+
Map<String, Set<String>> reverseDependencies = new HashMap<>();
109+
110+
for (ComputedColumn col : dependent) {
111+
columnMap.put(col.columnName(), col);
112+
reverseDependencies
113+
.computeIfAbsent(col.fieldReference(), k -> new HashSet<>())
114+
.add(col.columnName());
115+
}
116+
117+
List<ComputedColumn> sortedDependent = new ArrayList<>();
118+
Set<String> visited = new HashSet<>();
119+
Set<String> tempMark = new HashSet<>(); // For cycle detection
120+
121+
for (ComputedColumn col : dependent) {
122+
if (!visited.contains(col.columnName())) {
123+
dfs(
124+
col.columnName(),
125+
reverseDependencies,
126+
columnMap,
127+
sortedDependent,
128+
visited,
129+
tempMark);
130+
}
131+
}
132+
133+
Collections.reverse(sortedDependent);
134+
135+
// Independent should precede dependent
136+
List<ComputedColumn> result = new ArrayList<>();
137+
result.addAll(independent);
138+
result.addAll(sortedDependent);
139+
140+
return result;
141+
}
142+
143+
private static void dfs(
144+
String node,
145+
Map<String, Set<String>> reverseDependencies,
146+
Map<String, ComputedColumn> columnMap,
147+
List<ComputedColumn> sorted,
148+
Set<String> visited,
149+
Set<String> tempMark) {
150+
if (tempMark.contains(node)) {
151+
throw new IllegalArgumentException("Cycle detected: " + node);
152+
}
153+
if (visited.contains(node)) {
154+
return;
155+
}
156+
157+
tempMark.add(node);
158+
ComputedColumn current = columnMap.get(node);
159+
160+
// Process the dependencies
161+
for (String dependent : reverseDependencies.getOrDefault(node, Collections.emptySet())) {
162+
dfs(dependent, reverseDependencies, columnMap, sorted, visited, tempMark);
163+
}
164+
165+
tempMark.remove(node);
166+
visited.add(node);
167+
sorted.add(current);
79168
}
80169
}

0 commit comments

Comments
 (0)