Skip to content

Commit 9ac3dd1

Browse files
committed
[Flink] debezium-bson format using the id field in the Kafka Key as Update before information
1 parent 6c81807 commit 9ac3dd1

File tree

9 files changed

+224
-57
lines changed

9 files changed

+224
-57
lines changed

docs/content/cdc-ingestion/debezium-bson.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ bson-*.jar
4747
{{< hint info >}}
4848
The debezium bson format requires insert/update/delete event messages include the full document, and include a field that represents the state of the document before the change.
4949
This requires setting debezium's capture.mode to change_streams_update_full_with_pre_image and [capture.mode.full.update.type](https://debezium.io/documentation/reference/stable/connectors/mongodb.html#mongodb-property-capture-mode-full-update-type) to post_image.
50-
The database must be running **MongoDB 6.0 or later** to use this option.
50+
Before version 6.0 of MongoDB, it was not possible to obtain 'Update Before' information. Therefore, using the id field in the Kafka Key as 'Update before' information
5151
{{< /hint >}}
5252

5353
Here is a simple example for an update operation captured from a Mongodb customers collection in JSON format:
@@ -145,7 +145,7 @@ Below is a list of top-level field BsonValue conversion examples:
145145
<td>
146146
<ul>
147147
<li>1735934393769</li>
148-
<li>{"$numberLong": 1735934393769}</li>
148+
<li>{"$numberLong": "1735934393769"}</li>
149149
</ul>
150150
</td>
151151
<td>"1735934393769"</td>
@@ -186,7 +186,7 @@ Below is a list of top-level field BsonValue conversion examples:
186186
</tr>
187187
<tr>
188188
<td><h5>BsonArray</h5></td>
189-
<td>[1,2,{"$numberLong": 1735934393769}]</td>
189+
<td>[1,2,{"$numberLong": "1735934393769"}]</td>
190190
<td>"[1,2,1735934393769]"</td>
191191
</tr>
192192
<tr>

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131

3232
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3333
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
34+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
35+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.TextNode;
3436

3537
import org.bson.BsonDocument;
3638
import org.bson.BsonValue;
@@ -46,13 +48,16 @@
4648
import java.util.Map;
4749
import java.util.Objects;
4850

51+
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_BEFORE;
4952
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_PAYLOAD;
5053
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_SCHEMA;
5154
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_TYPE;
5255
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_DELETE;
5356
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_INSERT;
5457
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_READE;
5558
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_UPDATE;
59+
import static org.apache.paimon.utils.JsonSerdeUtil.fromJson;
60+
import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
5661
import static org.apache.paimon.utils.JsonSerdeUtil.writeValueAsString;
5762

5863
/**
@@ -71,8 +76,11 @@ public class DebeziumBsonRecordParser extends DebeziumJsonRecordParser {
7176

7277
private static final String FIELD_COLLECTION = "collection";
7378
private static final String FIELD_OBJECT_ID = "_id";
79+
private static final String FIELD_KEY_ID = "id";
7480
private static final List<String> PRIMARY_KEYS = Collections.singletonList(FIELD_OBJECT_ID);
7581

82+
private ObjectNode keyRoot;
83+
7684
public DebeziumBsonRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
7785
super(typeMapping, computedColumns);
7886
}
@@ -87,11 +95,11 @@ public List<RichCdcMultiplexRecord> extractRecords() {
8795
processRecord(getData(), RowKind.INSERT, records);
8896
break;
8997
case OP_UPDATE:
90-
processRecord(getBefore(operation), RowKind.DELETE, records);
98+
processDeleteRecord(operation, records);
9199
processRecord(getData(), RowKind.INSERT, records);
92100
break;
93101
case OP_DELETE:
94-
processRecord(getBefore(operation), RowKind.DELETE, records);
102+
processDeleteRecord(operation, records);
95103
break;
96104
default:
97105
throw new UnsupportedOperationException("Unknown record operation: " + operation);
@@ -101,11 +109,14 @@ public List<RichCdcMultiplexRecord> extractRecords() {
101109

102110
@Override
103111
protected void setRoot(CdcSourceRecord record) {
104-
JsonNode node = (JsonNode) record.getValue();
105-
if (node.has(FIELD_SCHEMA)) {
106-
root = node.get(FIELD_PAYLOAD);
107-
} else {
108-
root = node;
112+
root = (JsonNode) record.getValue();
113+
if (root.has(FIELD_SCHEMA)) {
114+
root = root.get(FIELD_PAYLOAD);
115+
}
116+
117+
keyRoot = (ObjectNode) record.getKey();
118+
if (!isNull(keyRoot) && keyRoot.has(FIELD_SCHEMA)) {
119+
keyRoot = (ObjectNode) keyRoot.get(FIELD_PAYLOAD);
109120
}
110121
}
111122

@@ -158,4 +169,37 @@ protected String getTableName() {
158169
protected String format() {
159170
return "debezium-bson";
160171
}
172+
173+
public boolean checkBeforeExists() {
174+
return !isNull(root.get(FIELD_BEFORE));
175+
}
176+
177+
private void processDeleteRecord(String operation, List<RichCdcMultiplexRecord> records) {
178+
if (checkBeforeExists()) {
179+
processRecord(getBefore(operation), RowKind.DELETE, records);
180+
} else {
181+
// Before version 6.0 of MongoDB, it was not possible to obtain 'Update Before'
182+
// information. Therefore, data is first deleted using the key 'id'
183+
JsonNode idNode = null;
184+
Preconditions.checkArgument(
185+
!isNull(keyRoot) && !isNull(idNode = keyRoot.get(FIELD_KEY_ID)),
186+
"Invalid %s format: missing '%s' field in key when '%s' is '%s' for: %s.",
187+
format(),
188+
FIELD_KEY_ID,
189+
FIELD_TYPE,
190+
operation,
191+
keyRoot);
192+
193+
// Deserialize id from json string to JsonNode
194+
Map<String, JsonNode> record =
195+
Collections.singletonMap(
196+
FIELD_OBJECT_ID, fromJson(idNode.asText(), JsonNode.class));
197+
198+
try {
199+
processRecord(new TextNode(writeValueAsString(record)), RowKind.DELETE, records);
200+
} catch (JsonProcessingException e) {
201+
throw new RuntimeException("Failed to deserialize key record.", e);
202+
}
203+
}
204+
}
161205
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,14 @@ public CdcSourceRecord deserialize(ConsumerRecord<byte[], byte[]> message) throw
6464
}
6565

6666
try {
67-
return new CdcSourceRecord(objectMapper.readValue(message.value(), JsonNode.class));
67+
byte[] key = message.key();
68+
JsonNode keyNode = null;
69+
if (key != null) {
70+
keyNode = objectMapper.readValue(key, JsonNode.class);
71+
}
72+
73+
JsonNode valueNode = objectMapper.readValue(message.value(), JsonNode.class);
74+
return new CdcSourceRecord(null, keyNode, valueNode);
6875
} catch (Exception e) {
6976
LOG.error("Invalid Json:\n{}", new String(message.value()));
7077
throw e;

paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java

Lines changed: 113 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,25 @@
2020

2121
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
2222
import org.apache.paimon.flink.action.cdc.TypeMapping;
23-
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
23+
import org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumJsonDeserializationSchema;
2424
import org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
2525
import org.apache.paimon.flink.sink.cdc.CdcRecord;
2626
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
2727
import org.apache.paimon.schema.Schema;
2828
import org.apache.paimon.types.RowKind;
29+
import org.apache.paimon.types.RowType;
30+
import org.apache.paimon.utils.JsonSerdeUtil;
31+
import org.apache.paimon.utils.StringUtils;
2932

3033
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
31-
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
34+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
35+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.TextNode;
3236

37+
import org.apache.kafka.clients.consumer.ConsumerRecord;
3338
import org.junit.Assert;
34-
import org.junit.Before;
35-
import org.junit.Test;
39+
import org.junit.jupiter.api.AfterAll;
40+
import org.junit.jupiter.api.BeforeAll;
41+
import org.junit.jupiter.api.Test;
3642
import org.slf4j.Logger;
3743
import org.slf4j.LoggerFactory;
3844

@@ -47,70 +53,99 @@
4753
import java.util.Map;
4854

4955
/** Test for DebeziumBsonRecordParser. */
50-
public class DebeziumBsonRecordParserTest extends KafkaActionITCaseBase {
56+
public class DebeziumBsonRecordParserTest {
5157

5258
private static final Logger log = LoggerFactory.getLogger(DebeziumBsonRecordParserTest.class);
53-
private static List<String> insertList = new ArrayList<>();
54-
private static List<String> updateList = new ArrayList<>();
55-
private static List<String> deleteList = new ArrayList<>();
59+
private static List<CdcSourceRecord> insertList = new ArrayList<>();
60+
private static List<CdcSourceRecord> updateList = new ArrayList<>();
61+
private static List<CdcSourceRecord> deleteList = new ArrayList<>();
5662

57-
private static ObjectMapper objMapper = new ObjectMapper();
63+
private static ArrayList<CdcSourceRecord> bsonRecords = new ArrayList<>();
64+
private static ArrayList<CdcSourceRecord> jsonRecords = new ArrayList<>();
65+
66+
private static Map<String, String> keyEvent = new HashMap<>();
67+
68+
private static KafkaDebeziumJsonDeserializationSchema deserialization =
69+
new KafkaDebeziumJsonDeserializationSchema();
5870

5971
private static Map<String, String> beforeEvent = new HashMap<>();
6072

61-
static {
73+
private static Map<String, String> afterEvent = new HashMap<>();
74+
75+
@BeforeAll
76+
public static void beforeAll() {
77+
keyEvent.put("_id", "67ab25755c0d5ac87eb8c632");
78+
6279
beforeEvent.put("_id", "67ab25755c0d5ac87eb8c632");
6380
beforeEvent.put("created_at", "1736207571013");
6481
beforeEvent.put("created_by", "peter");
6582
beforeEvent.put("tags", "[\"pending\"]");
6683
beforeEvent.put("updated_at", "1739455297970");
67-
}
6884

69-
private static Map<String, String> afterEvent = new HashMap<>();
70-
71-
static {
7285
afterEvent.put("_id", "67ab25755c0d5ac87eb8c632");
7386
afterEvent.put("created_at", "1736207571013");
7487
afterEvent.put("created_by", "peter");
7588
afterEvent.put("tags", "[\"succeed\"]");
7689
afterEvent.put("updated_at", "1739455397970");
77-
}
7890

79-
@Before
80-
public void setup() {
8191
String insertRes = "kafka/debezium-bson/table/event/event-insert.txt";
8292
String updateRes = "kafka/debezium-bson/table/event/event-update.txt";
8393
String deleteRes = "kafka/debezium-bson/table/event/event-delete.txt";
84-
URL url;
94+
String bsonPth = "kafka/debezium-bson/table/event/event-bson.txt";
95+
String jsonPath = "kafka/debezium-bson/table/event/event-json.txt";
96+
8597
try {
86-
url = DebeziumBsonRecordParserTest.class.getClassLoader().getResource(insertRes);
87-
Files.readAllLines(Paths.get(url.toURI())).stream()
88-
.filter(this::isRecordLine)
89-
.forEach(e -> insertList.add(e));
98+
parseCdcSourceRecords(insertRes, insertList);
99+
100+
parseCdcSourceRecords(updateRes, updateList);
90101

91-
url = DebeziumBsonRecordParserTest.class.getClassLoader().getResource(updateRes);
92-
Files.readAllLines(Paths.get(url.toURI())).stream()
93-
.filter(this::isRecordLine)
94-
.forEach(e -> updateList.add(e));
102+
parseCdcSourceRecords(deleteRes, deleteList);
95103

96-
url = DebeziumBsonRecordParserTest.class.getClassLoader().getResource(deleteRes);
97-
Files.readAllLines(Paths.get(url.toURI())).stream()
98-
.filter(this::isRecordLine)
99-
.forEach(e -> deleteList.add(e));
104+
parseCdcSourceRecords(bsonPth, bsonRecords);
100105

106+
parseCdcSourceRecords(jsonPath, jsonRecords);
101107
} catch (Exception e) {
102108
log.error("Fail to init debezium-json cases", e);
103109
}
104110
}
105111

112+
@AfterAll
113+
public static void afterAll() {
114+
insertList.clear();
115+
updateList.clear();
116+
deleteList.clear();
117+
bsonRecords.clear();
118+
jsonRecords.clear();
119+
}
120+
121+
private static void parseCdcSourceRecords(String resourcePath, List<CdcSourceRecord> records)
122+
throws Exception {
123+
URL url = DebeziumBsonRecordParserTest.class.getClassLoader().getResource(resourcePath);
124+
List<String> line = Files.readAllLines(Paths.get(url.toURI()));
125+
String keyNode = null;
126+
for (String json : line) {
127+
if (StringUtils.isNullOrWhitespaceOnly(json) || !json.startsWith("{")) {
128+
continue;
129+
}
130+
if (keyNode == null) {
131+
keyNode = json;
132+
} else {
133+
// test kafka deserialization
134+
records.add(
135+
deserialization.deserialize(
136+
new ConsumerRecord<>(
137+
"topic", 0, 0, keyNode.getBytes(), json.getBytes())));
138+
keyNode = null;
139+
}
140+
}
141+
}
142+
106143
@Test
107144
public void extractInsertRecord() throws Exception {
108145
DebeziumBsonRecordParser parser =
109146
new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
110-
for (String json : insertList) {
111-
// 将json解析为JsonNode对象
112-
JsonNode rootNode = objMapper.readValue(json, JsonNode.class);
113-
CdcSourceRecord cdcRecord = new CdcSourceRecord(rootNode);
147+
Assert.assertFalse(insertList.isEmpty());
148+
for (CdcSourceRecord cdcRecord : insertList) {
114149
Schema schema = parser.buildSchema(cdcRecord);
115150
Assert.assertEquals(schema.primaryKeys(), Arrays.asList("_id"));
116151

@@ -136,10 +171,8 @@ public void extractInsertRecord() throws Exception {
136171
public void extractUpdateRecord() throws Exception {
137172
DebeziumBsonRecordParser parser =
138173
new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
139-
for (String json : updateList) {
140-
// 将json解析为JsonNode对象
141-
JsonNode jsonNode = objMapper.readValue(json, JsonNode.class);
142-
CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode);
174+
Assert.assertFalse(updateList.isEmpty());
175+
for (CdcSourceRecord cdcRecord : updateList) {
143176
Schema schema = parser.buildSchema(cdcRecord);
144177
Assert.assertEquals(schema.primaryKeys(), Arrays.asList("_id"));
145178

@@ -148,7 +181,11 @@ public void extractUpdateRecord() throws Exception {
148181

149182
CdcRecord updateBefore = records.get(0).toRichCdcRecord().toCdcRecord();
150183
Assert.assertEquals(updateBefore.kind(), RowKind.DELETE);
151-
Assert.assertEquals(beforeEvent, updateBefore.data());
184+
if (parser.checkBeforeExists()) {
185+
Assert.assertEquals(beforeEvent, updateBefore.data());
186+
} else {
187+
Assert.assertEquals(keyEvent, updateBefore.data());
188+
}
152189

153190
CdcRecord updateAfter = records.get(1).toRichCdcRecord().toCdcRecord();
154191
Assert.assertEquals(updateAfter.kind(), RowKind.INSERT);
@@ -169,10 +206,8 @@ public void extractUpdateRecord() throws Exception {
169206
public void extractDeleteRecord() throws Exception {
170207
DebeziumBsonRecordParser parser =
171208
new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
172-
for (String json : deleteList) {
173-
// 将json解析为JsonNode对象
174-
JsonNode jsonNode = objMapper.readValue(json, JsonNode.class);
175-
CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode);
209+
Assert.assertFalse(deleteList.isEmpty());
210+
for (CdcSourceRecord cdcRecord : deleteList) {
176211
Schema schema = parser.buildSchema(cdcRecord);
177212
Assert.assertEquals(schema.primaryKeys(), Arrays.asList("_id"));
178213

@@ -181,7 +216,11 @@ public void extractDeleteRecord() throws Exception {
181216

182217
CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord();
183218
Assert.assertEquals(result.kind(), RowKind.DELETE);
184-
Assert.assertEquals(afterEvent, result.data());
219+
if (parser.checkBeforeExists()) {
220+
Assert.assertEquals(beforeEvent, result.data());
221+
} else {
222+
Assert.assertEquals(keyEvent, result.data());
223+
}
185224

186225
String dbName = parser.getDatabaseName();
187226
Assert.assertEquals(dbName, "bigdata_test");
@@ -193,4 +232,34 @@ public void extractDeleteRecord() throws Exception {
193232
Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
194233
}
195234
}
235+
236+
@Test
237+
public void bsonConvertJsonTest() throws Exception {
238+
DebeziumBsonRecordParser parser =
239+
new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
240+
241+
Assert.assertFalse(jsonRecords.isEmpty());
242+
for (int i = 0; i < jsonRecords.size(); i++) {
243+
CdcSourceRecord bsonRecord = bsonRecords.get(i);
244+
CdcSourceRecord jsonRecord = jsonRecords.get(i);
245+
246+
JsonNode bsonTextNode =
247+
new TextNode(JsonSerdeUtil.writeValueAsString(bsonRecord.getValue()));
248+
Map<String, String> resultMap = parser.extractRowData(bsonTextNode, RowType.builder());
249+
250+
ObjectNode expectNode = (ObjectNode) jsonRecord.getValue();
251+
252+
expectNode
253+
.fields()
254+
.forEachRemaining(
255+
entry -> {
256+
String key = entry.getKey();
257+
String expectValue = null;
258+
if (!JsonSerdeUtil.isNull(entry.getValue())) {
259+
expectValue = entry.getValue().asText();
260+
}
261+
Assert.assertEquals(expectValue, resultMap.get(key));
262+
});
263+
}
264+
}
196265
}

0 commit comments

Comments
 (0)