Skip to content

Commit 5be5036

Browse files
Abraham Lealrozza
authored andcommitted
Add Attunity / Qlik replciate CDC support
KAFKA-194
1 parent d065e37 commit 5be5036

File tree

9 files changed

+1264
-0
lines changed

9 files changed

+1264
-0
lines changed
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.kafka.connect.sink.cdc.attunity.rdbms.oracle;
18+
19+
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
20+
import com.mongodb.kafka.connect.sink.cdc.CdcHandler;
21+
import com.mongodb.kafka.connect.sink.cdc.CdcOperation;
22+
import com.mongodb.kafka.connect.sink.cdc.debezium.OperationType;
23+
import org.apache.kafka.connect.errors.DataException;
24+
import org.bson.BsonDocument;
25+
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
29+
public abstract class AttunityCdcHandler extends CdcHandler {
30+
private static final String OPERATION_TYPE_FIELD_PATH = "operation";
31+
private static final String OPERATION_TYPE_WRAPPER_PATH = "headers";
32+
private static final String OPERATION_TYPE_TOPLEVEL_WRAPPER_PATH = "message";
33+
34+
private final Map<OperationType, CdcOperation> operations = new HashMap<>();
35+
36+
public AttunityCdcHandler(final MongoSinkTopicConfig config) {
37+
super(config);
38+
}
39+
40+
protected void registerOperations(final Map<OperationType, CdcOperation> operations) {
41+
this.operations.putAll(operations);
42+
}
43+
44+
public CdcOperation getCdcOperation(final BsonDocument doc) {
45+
if (doc.containsKey(OPERATION_TYPE_TOPLEVEL_WRAPPER_PATH)) {
46+
try {
47+
if (!doc.getDocument(OPERATION_TYPE_TOPLEVEL_WRAPPER_PATH)
48+
.getDocument(OPERATION_TYPE_WRAPPER_PATH)
49+
.containsKey(OPERATION_TYPE_FIELD_PATH)
50+
|| !doc.getDocument(OPERATION_TYPE_TOPLEVEL_WRAPPER_PATH)
51+
.getDocument(OPERATION_TYPE_WRAPPER_PATH)
52+
.get(OPERATION_TYPE_FIELD_PATH)
53+
.isString()) {
54+
throw new DataException("Error: value doc is missing CDC operation type of type string");
55+
}
56+
String operation =
57+
getAttunityOperation(
58+
doc.getDocument(OPERATION_TYPE_TOPLEVEL_WRAPPER_PATH)
59+
.getDocument(OPERATION_TYPE_WRAPPER_PATH)
60+
.get(OPERATION_TYPE_FIELD_PATH)
61+
.asString()
62+
.getValue());
63+
CdcOperation op = operations.get(OperationType.fromText(operation));
64+
if (op == null) {
65+
throw new DataException(
66+
"Error: no CDC operation found in mapping for operation="
67+
+ doc.getDocument(OPERATION_TYPE_TOPLEVEL_WRAPPER_PATH)
68+
.getDocument(OPERATION_TYPE_WRAPPER_PATH)
69+
.get(OPERATION_TYPE_FIELD_PATH)
70+
.asString()
71+
.getValue());
72+
}
73+
return op;
74+
} catch (IllegalArgumentException exc) {
75+
throw new DataException("Error: parsing CDC operation failed", exc);
76+
}
77+
} else {
78+
try {
79+
if (!doc.getDocument(OPERATION_TYPE_WRAPPER_PATH).containsKey(OPERATION_TYPE_FIELD_PATH)
80+
|| !doc.getDocument(OPERATION_TYPE_WRAPPER_PATH)
81+
.get(OPERATION_TYPE_FIELD_PATH)
82+
.isString()) {
83+
throw new DataException("Error: value doc is missing CDC operation type of type string");
84+
}
85+
String operation =
86+
getAttunityOperation(
87+
doc.getDocument(OPERATION_TYPE_WRAPPER_PATH)
88+
.get(OPERATION_TYPE_FIELD_PATH)
89+
.asString()
90+
.getValue());
91+
CdcOperation op = operations.get(OperationType.fromText(operation));
92+
if (op == null) {
93+
throw new DataException(
94+
"Error: no CDC operation found in mapping for operation="
95+
+ doc.getDocument(OPERATION_TYPE_WRAPPER_PATH)
96+
.get(OPERATION_TYPE_FIELD_PATH)
97+
.asString()
98+
.getValue());
99+
}
100+
return op;
101+
} catch (IllegalArgumentException exc) {
102+
throw new DataException("Error: parsing CDC operation failed", exc);
103+
}
104+
}
105+
}
106+
107+
private String getAttunityOperation(String fromKey) {
108+
switch(fromKey){
109+
case "INSERT":
110+
case "REFRESH":
111+
return "c";
112+
case "READ":
113+
return "r";
114+
case "UPDATE":
115+
return "u";
116+
case "DELETE":
117+
return "d";
118+
default:
119+
throw new IllegalArgumentException("Error: unknown operation type " + fromKey);
120+
}
121+
}
122+
123+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.kafka.connect.sink.cdc.attunity.rdbms.oracle;
18+
19+
import com.mongodb.client.model.DeleteOneModel;
20+
import com.mongodb.client.model.WriteModel;
21+
import com.mongodb.kafka.connect.sink.cdc.CdcOperation;
22+
import com.mongodb.kafka.connect.sink.cdc.debezium.OperationType;
23+
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
24+
import org.apache.kafka.connect.errors.DataException;
25+
import org.bson.BsonDocument;
26+
27+
public class AttunityRdbmsDelete implements CdcOperation {
28+
29+
@Override
30+
public WriteModel<BsonDocument> perform(final SinkDocument doc) {
31+
32+
BsonDocument keyDoc =
33+
doc.getKeyDoc()
34+
.orElseThrow(
35+
() -> new DataException("Error: key doc must not be missing for delete operation"));
36+
37+
BsonDocument valueDoc =
38+
doc.getValueDoc()
39+
.orElseThrow(
40+
() -> new DataException("Error: value doc must not be missing for delete operation"));
41+
42+
try {
43+
BsonDocument filterDoc =
44+
AttunityRdbmsHandler.generateFilterDoc(keyDoc, valueDoc, OperationType.DELETE);
45+
return new DeleteOneModel<>(filterDoc);
46+
} catch (Exception exc) {
47+
throw new DataException(exc);
48+
}
49+
50+
}
51+
52+
}
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.kafka.connect.sink.cdc.attunity.rdbms.oracle;
18+
19+
import com.mongodb.client.model.WriteModel;
20+
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
21+
import com.mongodb.kafka.connect.sink.cdc.CdcOperation;
22+
import com.mongodb.kafka.connect.sink.cdc.debezium.OperationType;
23+
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
24+
import org.apache.kafka.connect.errors.DataException;
25+
import org.bson.BsonDocument;
26+
import org.bson.BsonInvalidOperationException;
27+
import org.bson.BsonObjectId;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.util.HashMap;
32+
import java.util.Map;
33+
import java.util.Optional;
34+
35+
public class AttunityRdbmsHandler extends AttunityCdcHandler {
36+
static final String ID_FIELD = "_id";
37+
private static final String JSON_DOC_BEFORE_FIELD = "beforeData";
38+
private static final String JSON_DOC_AFTER_FIELD = "data";
39+
private static final String JSON_DOC_WRAPPER_FIELD = "message";
40+
private static final Logger LOGGER = LoggerFactory.getLogger(AttunityRdbmsHandler.class);
41+
private static final Map<OperationType, CdcOperation> DEFAULT_OPERATIONS = new HashMap<OperationType, CdcOperation>(){{
42+
put(OperationType.CREATE, new AttunityRdbmsInsert());
43+
put(OperationType.READ, new AttunityRdbmsInsert());
44+
put(OperationType.UPDATE, new AttunityRdbmsUpdate());
45+
put(OperationType.DELETE, new AttunityRdbmsDelete());
46+
}};
47+
48+
public AttunityRdbmsHandler(final MongoSinkTopicConfig config) {
49+
this(config, DEFAULT_OPERATIONS);
50+
}
51+
52+
public AttunityRdbmsHandler(final MongoSinkTopicConfig config,
53+
final Map<OperationType, CdcOperation> operations) {
54+
super(config);
55+
registerOperations(operations);
56+
}
57+
58+
@Override
59+
public Optional<WriteModel<BsonDocument>> handle(final SinkDocument doc) {
60+
61+
BsonDocument keyDoc = doc.getKeyDoc().orElseGet(BsonDocument::new);
62+
63+
BsonDocument valueDoc = doc.getValueDoc().orElseGet(BsonDocument::new);
64+
65+
if (valueDoc.isEmpty()) {
66+
LOGGER.debug("skipping attunity tombstone event for kafka topic compaction");
67+
return Optional.empty();
68+
}
69+
70+
return Optional.ofNullable(getCdcOperation(valueDoc)
71+
.perform(new SinkDocument(keyDoc, valueDoc)));
72+
}
73+
74+
static BsonDocument generateFilterDoc(final BsonDocument keyDoc, final BsonDocument valueDoc, final OperationType opType) {
75+
final boolean checkOpType =
76+
opType.equals(OperationType.CREATE) || opType.equals(OperationType.READ);
77+
if (valueDoc.containsKey(JSON_DOC_WRAPPER_FIELD)) {
78+
if (keyDoc.keySet().isEmpty()) {
79+
if (checkOpType) {
80+
// create: no PK info in keyDoc -> generate ObjectId
81+
return new BsonDocument(ID_FIELD, new BsonObjectId());
82+
}
83+
// update or delete: no PK info in keyDoc -> take everything in 'beforeData' field
84+
try {
85+
BsonDocument filter =
86+
valueDoc.getDocument(JSON_DOC_WRAPPER_FIELD).getDocument(JSON_DOC_BEFORE_FIELD);
87+
if (filter.isEmpty()) {
88+
throw new BsonInvalidOperationException("value doc beforeData field is empty");
89+
}
90+
return filter;
91+
} catch (BsonInvalidOperationException exc) {
92+
throw new DataException(
93+
"Error: value doc 'beforeData' field is empty or has invalid type"
94+
+ " for update/delete operation which seems severely wrong -> defensive actions taken!",
95+
exc);
96+
}
97+
}
98+
} else {
99+
if (keyDoc.keySet().isEmpty()) {
100+
if (checkOpType) {
101+
// create: no PK info in keyDoc -> generate ObjectId
102+
return new BsonDocument(ID_FIELD, new BsonObjectId());
103+
}
104+
// update or delete: no PK info in keyDoc -> take everything in 'beforeData' field
105+
try {
106+
BsonDocument filter = valueDoc.getDocument(JSON_DOC_BEFORE_FIELD);
107+
if (filter.isEmpty()) {
108+
throw new BsonInvalidOperationException("value doc beforeData field is empty");
109+
}
110+
return filter;
111+
} catch (BsonInvalidOperationException exc) {
112+
throw new DataException(
113+
"Error: value doc 'beforeData' field is empty or has invalid type"
114+
+ " for update/delete operation which seems severely wrong -> defensive actions taken!",
115+
exc);
116+
}
117+
}
118+
}
119+
120+
// build filter document composed of all PK columns
121+
BsonDocument pk = new BsonDocument();
122+
for (String f : keyDoc.keySet()) {
123+
pk.put(f, keyDoc.get(f));
124+
}
125+
return new BsonDocument(ID_FIELD, pk);
126+
}
127+
128+
static BsonDocument generateUpsertOrReplaceDoc(final BsonDocument keyDoc, final BsonDocument valueDoc, final BsonDocument filterDoc) {
129+
130+
BsonDocument afterDoc;
131+
BsonDocument upsertDoc = new BsonDocument();
132+
if (valueDoc.containsKey(JSON_DOC_WRAPPER_FIELD)) {
133+
if (!valueDoc.getDocument(JSON_DOC_WRAPPER_FIELD).containsKey(JSON_DOC_AFTER_FIELD)
134+
|| valueDoc.getDocument(JSON_DOC_WRAPPER_FIELD).get(JSON_DOC_AFTER_FIELD).isNull()
135+
|| !valueDoc.getDocument(JSON_DOC_WRAPPER_FIELD).get(JSON_DOC_AFTER_FIELD).isDocument()
136+
|| valueDoc
137+
.getDocument(JSON_DOC_WRAPPER_FIELD)
138+
.getDocument(JSON_DOC_AFTER_FIELD)
139+
.isEmpty()) {
140+
throw new DataException(
141+
"Error: valueDoc must contain non-empty 'data' field"
142+
+ " of type document for insert/update operation");
143+
}
144+
145+
afterDoc = valueDoc.getDocument(JSON_DOC_WRAPPER_FIELD).getDocument(JSON_DOC_AFTER_FIELD);
146+
} else {
147+
if (!valueDoc.containsKey(JSON_DOC_AFTER_FIELD)
148+
|| valueDoc.get(JSON_DOC_AFTER_FIELD).isNull()
149+
|| !valueDoc.get(JSON_DOC_AFTER_FIELD).isDocument()
150+
|| valueDoc.getDocument(JSON_DOC_AFTER_FIELD).isEmpty()) {
151+
throw new DataException(
152+
"Error: valueDoc must contain non-empty 'data' field"
153+
+ " of type document for insert/update operation");
154+
}
155+
156+
afterDoc = valueDoc.getDocument(JSON_DOC_AFTER_FIELD);
157+
}
158+
159+
if (filterDoc.containsKey(ID_FIELD)) {
160+
upsertDoc.put(ID_FIELD, filterDoc.get(ID_FIELD));
161+
}
162+
163+
for (String f : afterDoc.keySet()) {
164+
upsertDoc.put(f, afterDoc.get(f));
165+
}
166+
return upsertDoc;
167+
}
168+
169+
static BsonDocument generateUpdateDoc(final BsonDocument keyDoc, final BsonDocument valueDoc, final BsonDocument filterDoc) {
170+
171+
BsonDocument updateDoc = new BsonDocument();
172+
BsonDocument updates = new BsonDocument();
173+
BsonDocument beforeDoc;
174+
BsonDocument afterDoc;
175+
if (valueDoc.containsKey(JSON_DOC_WRAPPER_FIELD)) {
176+
if (!valueDoc.getDocument(JSON_DOC_WRAPPER_FIELD).containsKey(JSON_DOC_AFTER_FIELD)
177+
|| valueDoc.getDocument(JSON_DOC_WRAPPER_FIELD).get(JSON_DOC_AFTER_FIELD).isNull()
178+
|| !valueDoc.getDocument(JSON_DOC_WRAPPER_FIELD).get(JSON_DOC_AFTER_FIELD).isDocument()
179+
|| valueDoc
180+
.getDocument(JSON_DOC_WRAPPER_FIELD)
181+
.getDocument(JSON_DOC_AFTER_FIELD)
182+
.isEmpty()) {
183+
throw new DataException(
184+
"Error: valueDoc must contain non-empty 'data' field"
185+
+ " of type document for insert/update operation");
186+
}
187+
188+
beforeDoc = valueDoc.getDocument(JSON_DOC_WRAPPER_FIELD).getDocument(JSON_DOC_BEFORE_FIELD);
189+
afterDoc = valueDoc.getDocument(JSON_DOC_WRAPPER_FIELD).getDocument(JSON_DOC_AFTER_FIELD);
190+
191+
} else {
192+
if (!valueDoc.containsKey(JSON_DOC_AFTER_FIELD)
193+
|| valueDoc.get(JSON_DOC_AFTER_FIELD).isNull()
194+
|| !valueDoc.get(JSON_DOC_AFTER_FIELD).isDocument()
195+
|| valueDoc.getDocument(JSON_DOC_AFTER_FIELD).isEmpty()) {
196+
throw new DataException(
197+
"Error: valueDoc must contain non-empty 'data' field"
198+
+ " of type document for insert/update operation");
199+
}
200+
beforeDoc = valueDoc.getDocument(JSON_DOC_BEFORE_FIELD);
201+
afterDoc = valueDoc.getDocument(JSON_DOC_AFTER_FIELD);
202+
}
203+
204+
for (String key : afterDoc.keySet()) {
205+
if (!afterDoc.get(key).equals(beforeDoc.get(key))) {
206+
updates.put(key, afterDoc.get(key));
207+
}
208+
}
209+
210+
updateDoc.append("$set", updates);
211+
return updateDoc;
212+
}
213+
214+
}

0 commit comments

Comments
 (0)