Skip to content

Commit 6c81807

Browse files
committed
[cdc] Add debezium-bson format document and bugfix bson value convert to java value
1 parent 6a1e477 commit 6c81807

File tree

9 files changed

+632
-31
lines changed

9 files changed

+632
-31
lines changed
Lines changed: 329 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
---
2+
title: "Debezium BSON"
3+
weight: 6
4+
type: docs
5+
aliases:
6+
- /cdc-ingestion/debezium-bson.html
7+
---
8+
<!--
9+
Licensed to the Apache Software Foundation (ASF) under one
10+
or more contributor license agreements. See the NOTICE file
11+
distributed with this work for additional information
12+
regarding copyright ownership. The ASF licenses this file
13+
to you under the Apache License, Version 2.0 (the
14+
"License"); you may not use this file except in compliance
15+
with the License. You may obtain a copy of the License at
16+
17+
http://www.apache.org/licenses/LICENSE-2.0
18+
19+
Unless required by applicable law or agreed to in writing,
20+
software distributed under the License is distributed on an
21+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22+
KIND, either express or implied. See the License for the
23+
specific language governing permissions and limitations
24+
under the License.
25+
-->
26+
27+
# Debezium BSON Format
28+
29+
30+
The debezium-bson format is one of the formats supported by <a href="{{< ref "/cdc-ingestion/kafka-cdc" >}}">Kafka CDC</a>.
31+
It is the format obtained by collecting mongodb through debezium, which is similar to
32+
<a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/formats/debezium/">debezium-json</a> format.
33+
However, MongoDB does not have a fixed schema, and the field types of each document may be different, so the before/after fields
34+
in JSON are all string types, while the debezium-json format requires a JSON object type.
35+
36+
37+
## Prepare MongoDB BSON Jar
38+
39+
Can be downloaded from the [Maven repository](https://mvnrepository.com/artifact/org.mongodb/bson)
40+
41+
```
42+
bson-*.jar
43+
```
44+
45+
## Introduction
46+
47+
{{< hint info >}}
48+
The debezium bson format requires insert/update/delete event messages include the full document, and include a field that represents the state of the document before the change.
49+
This requires setting debezium's capture.mode to change_streams_update_full_with_pre_image and [capture.mode.full.update.type](https://debezium.io/documentation/reference/stable/connectors/mongodb.html#mongodb-property-capture-mode-full-update-type) to post_image.
50+
The database must be running **MongoDB 6.0 or later** to use this option.
51+
{{< /hint >}}
52+
53+
Here is a simple example for an update operation captured from a Mongodb customers collection in JSON format:
54+
55+
```json
56+
{
57+
"schema": {
58+
"type": "struct",
59+
"fields": [
60+
{
61+
"type": "string",
62+
"optional": true,
63+
"name": "io.debezium.data.Json",
64+
"version": 1,
65+
"field": "before"
66+
},
67+
{
68+
"type": "string",
69+
"optional": true,
70+
"name": "io.debezium.data.Json",
71+
"version": 1,
72+
"field": "after"
73+
},
74+
...
75+
]
76+
},
77+
"payload": {
78+
"before": "{\"_id\": {\"$oid\" : \"596e275826f08b2730779e1f\"}, \"name\" : \"Anne\", \"create_time\" : {\"$numberLong\" : \"1558965506000\"}, \"tags\":[\"success\"]}",
79+
"after": "{\"_id\": {\"$oid\" : \"596e275826f08b2730779e1f\"}, \"name\" : \"Anne\", \"create_time\" : {\"$numberLong\" : \"1558965506000\"}, \"tags\":[\"passion\",\"success\"]}",
80+
"source": {
81+
"db": "inventory",
82+
"rs": "rs0",
83+
"collection": "customers",
84+
...
85+
},
86+
"op": "u",
87+
"ts_ms": 1558965515240,
88+
"ts_us": 1558965515240142,
89+
"ts_ns": 1558965515240142879
90+
}
91+
}
92+
```
93+
94+
This document from the MongoDB collection customers has 4 columns, the _id is a BSON ObjectID, name is a string,
95+
create_time is a long, tags is an array of string. The following is the processing result in debezium-bson format:
96+
97+
Document Schema:
98+
99+
| Field Name | Field Type | Key |
100+
|------------|------------|-------------|
101+
| _id | STRING | Primary Key |
102+
| name | STRING | |
103+
| create_time| STRING | |
104+
| tags | STRING | |
105+
106+
Records:
107+
108+
| RowKind | _id | name | create_time | tags |
109+
|---------|--------------------------|-------|----------------------------|-----------------------|
110+
| -U | 596e275826f08b2730779e1f | Anne | 1558965506000 | ["success"] |
111+
| +U | 596e275826f08b2730779e1f | Anne | 1558965506000 | ["passion","success"] |
112+
113+
114+
### How it works
115+
Because the schema field of the event message does not have the field information of the document, the debezium-bson format does not require event messages to have schema information. The specific operations are as follows:
116+
117+
- Parse the before/after fields of the event message into BSONDocument.
118+
- Recursive traversal all fields of BSONDocument and convert BsonValue to Java Object.
119+
- All top-level fields of before/after are converted to string type, and _id is fixed to primary key
120+
- If the top-level fields of before/after is a basic type(such as Integer/Long, etc.), it is directly converted to a string, if not, it is converted to a JSON string
121+
122+
Below is a list of top-level field BsonValue conversion examples:
123+
124+
<table class="configuration table table-bordered">
125+
<thead>
126+
<tr>
127+
<th class="text-left" style="width: 20%">BsonValue Type</th>
128+
<th class="text-left" style="width: 40%">Json Value</th>
129+
<th class="text-left" style="width: 40%">Conversion Result String</th>
130+
</tr>
131+
</thead>
132+
<tbody>
133+
<tr>
134+
<td><h5>BsonString</h5></td>
135+
<td>"hello"</td>
136+
<td>"hello"</td>
137+
</tr>
138+
<tr>
139+
<td><h5>BsonInt32</h5></td>
140+
<td>123</td>
141+
<td>"123"</td>
142+
</tr>
143+
<tr>
144+
<td><h5>BsonInt64</h5></td>
145+
<td>
146+
<ul>
147+
<li>1735934393769</li>
148+
<li>{"$numberLong": 1735934393769}</li>
149+
</ul>
150+
</td>
151+
<td>"1735934393769"</td>
152+
</tr>
153+
<tr>
154+
<td><h5>BsonDouble</h5></td>
155+
<td>
156+
<ul>
157+
<li>{"$numberDouble": "3.14"}</li>
158+
<li>{"$numberDouble": "NaN"}</li>
159+
<li>{"$numberDouble": "Infinity"}</li>
160+
<li>{"$numberDouble": "-Infinity"}</li>
161+
</ul>
162+
</td>
163+
<td>
164+
<ul>
165+
<li>"3.14"</li>
166+
<li>"NaN"</li>
167+
<li>"Infinity"</li>
168+
<li>"-Infinity"</li>
169+
</ul>
170+
</td>
171+
</tr>
172+
<tr>
173+
<td><h5>BsonBoolean</h5></td>
174+
<td>
175+
<ul>
176+
<li>true</li>
177+
<li>false</li>
178+
</ul>
179+
</td>
180+
<td>
181+
<ul>
182+
<li>"true"</li>
183+
<li>"false"</li>
184+
</ul>
185+
</td>
186+
</tr>
187+
<tr>
188+
<td><h5>BsonArray</h5></td>
189+
<td>[1,2,{"$numberLong": 1735934393769}]</td>
190+
<td>"[1,2,1735934393769]"</td>
191+
</tr>
192+
<tr>
193+
<td><h5>BsonObjectId</h5></td>
194+
<td>{"$oid": "596e275826f08b2730779e1f"}</td>
195+
<td>"596e275826f08b2730779e1f"</td>
196+
</tr>
197+
<tr>
198+
<td><h5>BsonDateTime</h5></td>
199+
<td>{"$date": 1735934393769 }</td>
200+
<td>"1735934393769"</td>
201+
</tr>
202+
<tr>
203+
<td><h5>BsonNull</h5></td>
204+
<td>null</td>
205+
<td>null</td>
206+
</tr>
207+
<tr>
208+
<td><h5>BsonUndefined</h5></td>
209+
<td>{"$undefined": true}</td>
210+
<td>null</td>
211+
</tr>
212+
<tr>
213+
<td><h5>BsonBinary</h5></td>
214+
<td>{"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "0"}</td>
215+
<td>"uE2/4v5MSVOiJZkOo3APKQ=="</td>
216+
</tr>
217+
<tr>
218+
<td><h5>BsonBinary(type=UUID)</h5></td>
219+
<td>{"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "4"}</td>
220+
<td>"b84dbfe2-fe4c-4953-a225-990ea3700f29"</td>
221+
</tr>
222+
<tr>
223+
<td><h5>BsonDecimal128</h5></td>
224+
<td>
225+
<ul>
226+
<li>{"$numberDecimal": "3.14"}</li>
227+
<li>{"$numberDecimal": "NaN"}</li>
228+
</ul>
229+
</td>
230+
<td>
231+
<ul>
232+
<li>"3.14"</li>
233+
<li>"NaN"</li>
234+
</ul>
235+
</td>
236+
</tr>
237+
<tr>
238+
<td><h5>BsonRegularExpression</h5></td>
239+
<td>{"$regularExpression": {"pattern": "^pass$", "options": "i"}}</td>
240+
<td>"/^pass$/i"</td>
241+
</tr>
242+
<tr>
243+
<td><h5>BsonSymbol</h5></td>
244+
<td>{"$symbol": "symbol"}</td>
245+
<td>"symbol"</td>
246+
</tr>
247+
<tr>
248+
<td><h5>BsonTimestamp</h5></td>
249+
<td>{"$timestamp": {"t": 1736997330, "i": 2}}</td>
250+
<td>"1736997330"</td>
251+
</tr>
252+
<tr>
253+
<td><h5>BsonMinKey</h5></td>
254+
<td>{"$minKey": 1}</td>
255+
<td>"BsonMinKey"</td>
256+
</tr>
257+
<tr>
258+
<td><h5>BsonMaxKey</h5></td>
259+
<td>{"$maxKey": 1}</td>
260+
<td>"BsonMaxKey"</td>
261+
</tr>
262+
<tr>
263+
<td><h5>BsonJavaScript</h5></td>
264+
<td>{"$code": "function(){}"}</td>
265+
<td>"function(){}"</td>
266+
</tr>
267+
<tr>
268+
<td><h5>BsonJavaScriptWithScope</h5></td>
269+
<td>{"$code": "function(){}", "$scope": {"name": "Anne"}}</td>
270+
<td>'{"$code": "function(){}", "$scope": {"name": "Anne"}}'</td>
271+
</tr>
272+
<tr>
273+
<td><h5>BsonDocument</h5></td>
274+
<td>
275+
<pre>
276+
{
277+
"decimalPi": {"$numberDecimal": "3.14"},
278+
"doublePi": {"$numberDouble": "3.14"},
279+
"doubleNaN": {"$numberDouble": "NaN"},
280+
"decimalNaN": {"$numberDecimal": "NaN"},
281+
"long": {"$numberLong": "100"},
282+
"bool": true,
283+
"array": [
284+
{"$numberInt": "1"},
285+
{"$numberLong": "2"}
286+
]
287+
}
288+
</pre>
289+
</td>
290+
<td>
291+
<pre>
292+
'{
293+
"decimalPi":3.14,
294+
"doublePi":3.14,
295+
"doubleNaN":"NaN",
296+
"decimalNaN":"NaN",
297+
"long":100,
298+
"bool":true,
299+
"array":[1,2]
300+
}'
301+
</pre>
302+
</td>
303+
</tr>
304+
</tbody>
305+
</table>
306+
307+
308+
### How to use
309+
Use debezium-bson by adding the kafka_conf parameter **value.format=debezium-bson**. Let’s take table synchronization as an example:
310+
311+
```bash
312+
<FLINK_HOME>/bin/flink run \
313+
/path/to/paimon-flink-action-{{< version >}}.jar \
314+
kafka_sync_table \
315+
--warehouse hdfs:///path/to/warehouse \
316+
--database test_db \
317+
--table ods_mongodb_customers \
318+
--primary_keys _id \
319+
--kafka_conf properties.bootstrap.servers=127.0.0.1:9020 \
320+
--kafka_conf topic=customers \
321+
--kafka_conf properties.group.id=123456 \
322+
--kafka_conf value.format=debezium-bson \
323+
--catalog_conf metastore=filesystem \
324+
--table_conf bucket=4 \
325+
--table_conf changelog-producer=input \
326+
--table_conf sink.parallelism=4
327+
```
328+
329+

docs/content/cdc-ingestion/kafka-cdc.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ If a message in a Kafka topic is a change event captured from another database u
6767
<td><a href="https://docs.aws.amazon.com/dms/latest/userguide/Welcome.html">aws-dms-json</a></td>
6868
<td>True</td>
6969
</tr>
70+
<tr>
71+
<td><a href="{{< ref "/cdc-ingestion/debezium-bson" >}}">debezium-bson</a></td>
72+
<td>True</td>
73+
</tr>
7074
</tbody>
7175
</table>
7276

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import org.apache.paimon.flink.action.cdc.ComputedColumn;
2323
import org.apache.paimon.flink.action.cdc.TypeMapping;
2424
import org.apache.paimon.flink.action.cdc.mongodb.BsonValueConvertor;
25+
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
2526
import org.apache.paimon.types.DataTypes;
27+
import org.apache.paimon.types.RowKind;
2628
import org.apache.paimon.types.RowType;
2729
import org.apache.paimon.utils.Preconditions;
2830
import org.apache.paimon.utils.TypeUtils;
@@ -37,6 +39,7 @@
3739

3840
import javax.annotation.Nullable;
3941

42+
import java.util.ArrayList;
4043
import java.util.Collections;
4144
import java.util.LinkedHashMap;
4245
import java.util.List;
@@ -45,6 +48,11 @@
4548

4649
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_PAYLOAD;
4750
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_SCHEMA;
51+
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_TYPE;
52+
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_DELETE;
53+
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_INSERT;
54+
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_READE;
55+
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_UPDATE;
4856
import static org.apache.paimon.utils.JsonSerdeUtil.writeValueAsString;
4957

5058
/**
@@ -69,6 +77,28 @@ public DebeziumBsonRecordParser(TypeMapping typeMapping, List<ComputedColumn> co
6977
super(typeMapping, computedColumns);
7078
}
7179

80+
@Override
81+
public List<RichCdcMultiplexRecord> extractRecords() {
82+
String operation = getAndCheck(FIELD_TYPE).asText();
83+
List<RichCdcMultiplexRecord> records = new ArrayList<>();
84+
switch (operation) {
85+
case OP_INSERT:
86+
case OP_READE:
87+
processRecord(getData(), RowKind.INSERT, records);
88+
break;
89+
case OP_UPDATE:
90+
processRecord(getBefore(operation), RowKind.DELETE, records);
91+
processRecord(getData(), RowKind.INSERT, records);
92+
break;
93+
case OP_DELETE:
94+
processRecord(getBefore(operation), RowKind.DELETE, records);
95+
break;
96+
default:
97+
throw new UnsupportedOperationException("Unknown record operation: " + operation);
98+
}
99+
return records;
100+
}
101+
72102
@Override
73103
protected void setRoot(CdcSourceRecord record) {
74104
JsonNode node = (JsonNode) record.getValue();

0 commit comments

Comments
 (0)