Skip to content

Commit 1d722d7

Browse files
authored
Improved support for Qlik DELETE events (#67)
Support the scenario where 'beforeData' is missing or null. KAFKA-211 KAFKA-194
1 parent 1837a7c commit 1d722d7

File tree

4 files changed

+90
-14
lines changed

4 files changed

+90
-14
lines changed

src/main/java/com/mongodb/kafka/connect/sink/cdc/qlik/rdbms/operations/Delete.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,6 @@ public WriteModel<BsonDocument> perform(final SinkDocument doc) {
4040
() ->
4141
new DataException("Error: value doc must not be missing for delete operation"));
4242

43-
return new DeleteOneModel<>(OperationHelper.createFilterDocument(keyDoc, valueDoc));
43+
return new DeleteOneModel<>(OperationHelper.createDeleteFilterDocument(keyDoc, valueDoc));
4444
}
4545
}

src/main/java/com/mongodb/kafka/connect/sink/cdc/qlik/rdbms/operations/OperationHelper.java

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public final class OperationHelper {
3737

3838
private static final String ID_FIELD = "_id";
3939
private static final String DATA_BEFORE_FIELD = "beforeData";
40-
private static final String DATA_AFTER_FIELD = "data";
40+
private static final String DATA_FIELD = "data";
4141
private static final String MESSAGE_FIELD = "message";
4242
private static final String HEADERS_FIELD = "headers";
4343
private static final String OPERATION_FIELD = "operation";
@@ -95,7 +95,7 @@ static BsonDocument createFilterDocument(final BsonDocument keyDocument) {
9595
.orElseGet(() -> new BsonDocument(ID_FIELD, new BsonObjectId()));
9696
}
9797

98-
static BsonDocument createFilterDocument(
98+
static BsonDocument createUpdateFilterDocument(
9999
final BsonDocument keyDocument, final BsonDocument valueDocument) {
100100
BsonDocument filter =
101101
getFilterFromKeyDocument(keyDocument)
@@ -114,10 +114,31 @@ static BsonDocument createFilterDocument(
114114
return filter;
115115
}
116116

117+
static BsonDocument createDeleteFilterDocument(
118+
final BsonDocument keyDocument, final BsonDocument valueDocument) {
119+
BsonDocument filter =
120+
getFilterFromKeyDocument(keyDocument)
121+
.orElseGet(
122+
() -> {
123+
BsonDocument messageDocument =
124+
getSubDocumentOrOriginal(MESSAGE_FIELD, valueDocument);
125+
return getSubDocumentNotNullOrOriginal(
126+
DATA_BEFORE_FIELD,
127+
getSubDocumentNotNullOrOriginal(DATA_FIELD, messageDocument));
128+
});
129+
if (filter.isEmpty()) {
130+
throw new DataException(
131+
format(
132+
"Error: Value Document does not contain the expected data, cannot create filter: %s.",
133+
valueDocument.toJson()));
134+
}
135+
return filter;
136+
}
137+
117138
static BsonDocument createReplaceDocument(
118139
final BsonDocument filterDocument, final BsonDocument valueDocument) {
119140
BsonDocument messageDocument = getSubDocumentOrOriginal(MESSAGE_FIELD, valueDocument);
120-
BsonDocument afterDocument = getSubDocumentOrOriginal(DATA_AFTER_FIELD, messageDocument);
141+
BsonDocument afterDocument = getSubDocumentOrOriginal(DATA_FIELD, messageDocument);
121142

122143
BsonDocument replaceDocument = new BsonDocument();
123144
if (filterDocument.containsKey(ID_FIELD)) {
@@ -132,7 +153,7 @@ static BsonDocument createReplaceDocument(
132153
static BsonDocument createUpdateDocument(final BsonDocument valueDocument) {
133154
BsonDocument messageDocument = getSubDocumentOrOriginal(MESSAGE_FIELD, valueDocument);
134155
BsonDocument beforeDocument = getSubDocumentOrOriginal(DATA_BEFORE_FIELD, messageDocument);
135-
BsonDocument afterDocument = getSubDocumentOrOriginal(DATA_AFTER_FIELD, messageDocument);
156+
BsonDocument afterDocument = getSubDocumentOrOriginal(DATA_FIELD, messageDocument);
136157

137158
if (afterDocument.isEmpty()) {
138159
throw new DataException(
@@ -164,16 +185,31 @@ private static Optional<BsonDocument> getFilterFromKeyDocument(final BsonDocumen
164185
}
165186

166187
private static BsonDocument getSubDocumentOrOriginal(
167-
final String field, final BsonDocument original) {
168-
if (original.containsKey(field)) {
169-
BsonValue subDocument = original.get(field);
170-
if (!subDocument.isDocument()) {
188+
final String fieldName, final BsonDocument original) {
189+
return getSubDocumentOrOriginal(fieldName, original, false);
190+
}
191+
192+
private static BsonDocument getSubDocumentNotNullOrOriginal(
193+
final String fieldName, final BsonDocument original) {
194+
return getSubDocumentOrOriginal(fieldName, original, true);
195+
}
196+
197+
private static BsonDocument getSubDocumentOrOriginal(
198+
final String fieldName, final BsonDocument original, final boolean ignoreNull) {
199+
if (original.containsKey(fieldName)) {
200+
BsonValue fieldValue = original.get(fieldName);
201+
202+
if (fieldValue.isNull() && ignoreNull) {
203+
return original;
204+
}
205+
206+
if (!fieldValue.isDocument()) {
171207
throw new DataException(
172208
format(
173209
"Error: Value document contains a '%s' that is not a document: %s",
174-
OperationHelper.MESSAGE_FIELD, original));
210+
fieldName, original));
175211
}
176-
return subDocument.asDocument();
212+
return fieldValue.asDocument();
177213
}
178214
return original;
179215
}

src/main/java/com/mongodb/kafka/connect/sink/cdc/qlik/rdbms/operations/Update.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ public WriteModel<BsonDocument> perform(final SinkDocument doc) {
4444
() ->
4545
new DataException("Error: value doc must not be missing for update operation"));
4646

47-
BsonDocument filterDocument = OperationHelper.createFilterDocument(keyDocument, valueDocument);
47+
BsonDocument filterDocument =
48+
OperationHelper.createUpdateFilterDocument(keyDocument, valueDocument);
4849
BsonDocument updateDocument = OperationHelper.createUpdateDocument(valueDocument);
4950
if (updateDocument.isEmpty()) {
5051
return null;

src/test/java/com/mongodb/kafka/connect/sink/cdc/qlik/rdbms/operations/DeleteTest.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,46 @@ void testValidSinkDocumentNoPK() {
7878
BsonDocument keyDoc = new BsonDocument();
7979
BsonDocument valueDoc =
8080
BsonDocument.parse(
81-
"{message: { headers: { operation : 'INSERT' } , beforeData: {text: 'misc', number: 9876, active: true}}}");
81+
"{message: { headers: { operation : 'DELETE' } , beforeData: {text: 'misc', number: 9876, active: true}}}");
82+
83+
WriteModel<BsonDocument> result = RDBMS_DELETE.perform(new SinkDocument(keyDoc, valueDoc));
84+
assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel");
85+
86+
DeleteOneModel<BsonDocument> writeModel = (DeleteOneModel<BsonDocument>) result;
87+
assertTrue(
88+
writeModel.getFilter() instanceof BsonDocument,
89+
"filter expected to be of type BsonDocument");
90+
assertEquals(filterDoc, writeModel.getFilter());
91+
}
92+
93+
@Test
94+
@DisplayName("when valid cdc event without PK and beforeData as null then correct DeleteOneModel")
95+
void testValidSinkDocumentNoPKAndNullBeforeData() {
96+
BsonDocument filterDoc = BsonDocument.parse("{text: 'misc', number: 9876, active: true}");
97+
BsonDocument keyDoc = new BsonDocument();
98+
BsonDocument valueDoc =
99+
BsonDocument.parse(
100+
"{message: { headers: { operation : 'DELETE' } , beforeData: null, "
101+
+ "data: {text: 'misc', number: 9876, active: true}}}");
102+
103+
WriteModel<BsonDocument> result = RDBMS_DELETE.perform(new SinkDocument(keyDoc, valueDoc));
104+
assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel");
105+
106+
DeleteOneModel<BsonDocument> writeModel = (DeleteOneModel<BsonDocument>) result;
107+
assertTrue(
108+
writeModel.getFilter() instanceof BsonDocument,
109+
"filter expected to be of type BsonDocument");
110+
assertEquals(filterDoc, writeModel.getFilter());
111+
}
112+
113+
@Test
114+
@DisplayName("when valid cdc event without PK and no beforeData then correct DeleteOneModel")
115+
void testValidSinkDocumentNoPKAndNoBeforeData() {
116+
BsonDocument filterDoc = BsonDocument.parse("{text: 'misc', number: 9876, active: true}");
117+
BsonDocument keyDoc = new BsonDocument();
118+
BsonDocument valueDoc =
119+
BsonDocument.parse(
120+
"{message: { headers: { operation : 'DELETE' }, data: {text: 'misc', number: 9876, active: true}}}");
82121

83122
WriteModel<BsonDocument> result = RDBMS_DELETE.perform(new SinkDocument(keyDoc, valueDoc));
84123
assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel");
@@ -116,6 +155,6 @@ void testEmptyKeyDocAndEmptyValueBeforeField() {
116155
new SinkDocument(
117156
new BsonDocument(),
118157
BsonDocument.parse(
119-
"{message: { headers: { operation : 'INSERT' } , beforeData: { }}}"))));
158+
"{message: { headers: { operation : 'DELETE' } , beforeData: { }}}"))));
120159
}
121160
}

0 commit comments

Comments
 (0)