Skip to content

Commit 1837a7c

Browse files
committed
Fixed _id projection.
Previously, `_id` has always being projected even if not explicitly allowed or blocked. This undocumented implicit behaviour is surprising and breaks the explicit nature of projections. KAFKA-209
1 parent c41d263 commit 1837a7c

File tree

7 files changed

+63
-98
lines changed

7 files changed

+63
-98
lines changed

CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@
1212
- [KAFKA-207](https://jira.mongodb.org/browse/KAFKA-207) Improved efficiency of heartbeats by making them tombstone messages.
1313
- [KAFKA-174](https://jira.mongodb.org/browse/KAFKA-174) Improved error messages when using invalid pipeline operators.
1414
- [KAFKA-194](https://jira.mongodb.org/browse/KAFKA-194) Added support for Qlik Replicate CDC.
15-
- [KAFKA-209](https://jira.mongodb.org/browse/KAFKA-209) Log a warning message when there the `_id` value and the
16-
id strategy is configured not to overwrite the existing `_id` value.
1715

1816
### Bug Fixes
1917
- [KAFKA-195](https://jira.mongodb.org/browse/KAFKA-195) Fixed topics.regex sink validation issue for synthetic config property
2018
- [KAFKA-203](https://jira.mongodb.org/browse/KAFKA-203) Fixed sink NPE issue when using with confluent connect 6.1.0
19+
- [KAFKA-209](https://jira.mongodb.org/browse/KAFKA-209) Fixed `_id` always being projected even if not explicitly allowed or blocked.
20+
Log a warning message when there the `_id` value and the id strategy is configured not to overwrite the `_id`.
2121

2222
## 1.4.0
2323

src/main/java/com/mongodb/kafka/connect/sink/processor/DocumentIdAdder.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,9 @@ public void process(final SinkDocument doc, final SinkRecord orig) {
5252
vd.append(ID_FIELD, idStrategy.generateId(doc, orig));
5353
} else if (vd.containsKey(ID_FIELD)) {
5454
LOGGER.warn(
55-
"Warning configuration: '{}' is set to false and the document "
56-
+ "contains an '{}' field.",
57-
DOCUMENT_ID_STRATEGY_OVERWRITE_EXISTING_CONFIG,
58-
ID_FIELD);
55+
"Cannot overwrite the existing '{}' value. '{}' is set to false and the document.",
56+
ID_FIELD,
57+
DOCUMENT_ID_STRATEGY_OVERWRITE_EXISTING_CONFIG);
5958
}
6059
});
6160
}

src/main/java/com/mongodb/kafka/connect/sink/processor/field/projection/AllowListProjector.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616

1717
package com.mongodb.kafka.connect.sink.processor.field.projection;
1818

19-
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.FieldProjectionType.ALLOWLIST;
20-
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.ID_FIELD;
21-
2219
import java.util.Arrays;
2320
import java.util.Iterator;
2421
import java.util.Map;
@@ -36,11 +33,15 @@ public AllowListProjector(
3633
final MongoSinkTopicConfig config,
3734
final Set<String> fields,
3835
final SinkDocumentField sinkDocumentField) {
39-
super(config, fields, ALLOWLIST, sinkDocumentField);
36+
super(config, fields, sinkDocumentField);
4037
}
4138

4239
@Override
43-
protected void doProjection(final String field, final BsonDocument doc) {
40+
protected void projectDocument(final BsonDocument doc) {
41+
doProjection("", doc);
42+
}
43+
44+
private void doProjection(final String field, final BsonDocument doc) {
4445
// special case short circuit check for '**' pattern
4546
// this is essentially the same as not using
4647
// whitelisting at all but instead take the full record
@@ -57,8 +58,7 @@ protected void doProjection(final String field, final BsonDocument doc) {
5758
: field + FieldProjector.SUB_FIELD_DOT_SEPARATOR + entry.getKey();
5859
BsonValue value = entry.getValue();
5960

60-
// NOTE: always keep the _id field
61-
if ((!getFields().contains(key) && !key.equals(ID_FIELD)) && !checkForWildcardMatch(key)) {
61+
if (!getFields().contains(key) && !checkForWildcardMatch(key)) {
6262
iter.remove();
6363
}
6464

@@ -73,7 +73,7 @@ protected void doProjection(final String field, final BsonDocument doc) {
7373
}
7474
}
7575
if (value.isArray()) {
76-
BsonArray values = (BsonArray) value;
76+
BsonArray values = value.asArray();
7777
for (BsonValue v : values.getValues()) {
7878
if (v != null && v.isDocument()) {
7979
doProjection(key, (BsonDocument) v);

src/main/java/com/mongodb/kafka/connect/sink/processor/field/projection/BlockListProjector.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@
1515
*/
1616
package com.mongodb.kafka.connect.sink.processor.field.projection;
1717

18-
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.FieldProjectionType.BLOCKLIST;
19-
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.ID_FIELD;
20-
2118
import java.util.Iterator;
2219
import java.util.Map;
2320
import java.util.Set;
@@ -34,22 +31,23 @@ public BlockListProjector(
3431
final MongoSinkTopicConfig config,
3532
final Set<String> fields,
3633
final SinkDocumentField sinkDocumentField) {
37-
super(config, fields, BLOCKLIST, sinkDocumentField);
34+
super(config, fields, sinkDocumentField);
3835
}
3936

4037
@Override
41-
protected void doProjection(final String field, final BsonDocument doc) {
38+
protected void projectDocument(final BsonDocument doc) {
39+
getFields().forEach(f -> doProjection(f, doc));
40+
}
41+
42+
private void doProjection(final String field, final BsonDocument doc) {
4243
if (!field.contains(FieldProjector.SUB_FIELD_DOT_SEPARATOR)) {
4344
if (field.equals(FieldProjector.SINGLE_WILDCARD)
4445
|| field.equals(FieldProjector.DOUBLE_WILDCARD)) {
4546
handleWildcard(field, "", doc);
4647
return;
4748
}
4849

49-
// NOTE: never try to remove the _id field
50-
if (!field.equals(ID_FIELD)) {
51-
doc.remove(field);
52-
}
50+
doc.remove(field);
5351
return;
5452
}
5553

@@ -86,11 +84,6 @@ private void handleWildcard(
8684
Map.Entry<String, BsonValue> entry = iter.next();
8785
BsonValue value = entry.getValue();
8886

89-
// NOTE: never try to remove the _id field
90-
if (entry.getKey().equals(ID_FIELD)) {
91-
continue;
92-
}
93-
9487
if (firstPart.equals(FieldProjector.DOUBLE_WILDCARD)) {
9588
iter.remove();
9689
} else if (firstPart.equals(FieldProjector.SINGLE_WILDCARD)) {

src/main/java/com/mongodb/kafka/connect/sink/processor/field/projection/FieldProjector.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public abstract class FieldProjector extends PostProcessor {
4040
static final String SUB_FIELD_DOT_SEPARATOR = ".";
4141

4242
private final Set<String> fields;
43-
private final FieldProjectionType fieldProjectionType;
4443
private final SinkDocumentField sinkDocumentField;
4544

4645
public enum SinkDocumentField {
@@ -51,11 +50,9 @@ public enum SinkDocumentField {
5150
public FieldProjector(
5251
final MongoSinkTopicConfig config,
5352
final Set<String> fields,
54-
final FieldProjectionType fieldProjectionType,
5553
final SinkDocumentField sinkDocumentField) {
5654
super(config);
5755
this.fields = fields;
58-
this.fieldProjectionType = fieldProjectionType;
5956
this.sinkDocumentField = sinkDocumentField;
6057
}
6158

@@ -65,16 +62,7 @@ public Set<String> getFields() {
6562

6663
@Override
6764
public void process(final SinkDocument doc, final SinkRecord orig) {
68-
switch (fieldProjectionType) {
69-
case ALLOWLIST:
70-
getDocumentToProcess(doc).ifPresent(vd -> doProjection("", vd));
71-
break;
72-
case BLOCKLIST:
73-
getDocumentToProcess(doc).ifPresent(vd -> getFields().forEach(f -> doProjection(f, vd)));
74-
break;
75-
default:
76-
// Do nothing
77-
}
65+
getDocumentToProcess(doc).ifPresent(this::projectDocument);
7866
}
7967

8068
private Optional<BsonDocument> getDocumentToProcess(final SinkDocument sinkDocument) {
@@ -83,7 +71,7 @@ private Optional<BsonDocument> getDocumentToProcess(final SinkDocument sinkDocum
8371
: sinkDocument.getValueDoc();
8472
}
8573

86-
protected abstract void doProjection(String field, BsonDocument doc);
74+
protected abstract void projectDocument(BsonDocument doc);
8775

8876
public static Set<String> buildProjectionList(
8977
final FieldProjectionType fieldProjectionType, final String fieldList) {

src/test/java/com/mongodb/kafka/connect/sink/processor/field/projection/FieldProjectorTest.java

Lines changed: 25 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -76,31 +76,30 @@ class FieldProjectorTest {
7676

7777
@BeforeAll
7878
static void setupFlatDocMaps() {
79-
// NOTE: FieldProjectors are currently implemented so that
80-
// a) when block listing: already present _id fields are never removed even if specified
81-
// b) when allow listing: already present _id fields are always kept even if not specified
82-
8379
// key projection settings
8480
BsonDocument keyDocument1 =
8581
BsonDocument.parse(
8682
"{_id: 'ABC-123', myBoolean: true, myInt: 42, "
8783
+ "myBytes: {$binary: 'QUJD', $type: '00'}, myArray: []}");
88-
BsonDocument keyDocument2 = BsonDocument.parse("{_id: 'ABC-123'}");
84+
BsonDocument keyDocument2 = BsonDocument.parse("{}");
8985
BsonDocument keyDocument3 =
9086
BsonDocument.parse(
9187
"{_id: 'ABC-123', myBytes: {$binary: 'QUJD', $type: '00'}, myArray: []}");
9288
BsonDocument keyDocument4 =
9389
BsonDocument.parse(
94-
"{_id: 'ABC-123', myBoolean: true, myBytes: {$binary: 'QUJD', $type: '00'}, "
95-
+ "myArray: []}");
90+
"{myBoolean: true, myBytes: {$binary: 'QUJD', $type: '00'}, myArray: []}");
91+
BsonDocument keyDocument5 =
92+
BsonDocument.parse(
93+
"{myBoolean: true, myInt: 42, "
94+
+ "myBytes: {$binary: 'QUJD', $type: '00'}, myArray: []}");
9695

9796
flatKeyFieldsMapBlockList =
9897
new HashMap<String, BsonDocument>() {
9998
{
10099
put("", keyDocument1);
101100
put("*", keyDocument2);
102101
put("**", keyDocument2);
103-
put("_id", keyDocument1);
102+
put("_id", keyDocument5);
104103
put("myBoolean, myInt", keyDocument3);
105104
put("missing1, unknown2", keyDocument1);
106105
}
@@ -123,14 +122,17 @@ static void setupFlatDocMaps() {
123122
"{_id: 'XYZ-789', myLong: {$numberLong: '42'}, "
124123
+ "myDouble: 23.23, myString: 'BSON', "
125124
+ "myBytes: {$binary: 'eHl6', $type: '00'}, myArray: []}");
126-
BsonDocument valueDocument2 = BsonDocument.parse("{_id: 'XYZ-789'}");
125+
BsonDocument valueDocument2 = BsonDocument.parse("{}");
127126
BsonDocument valueDocument3 =
128127
BsonDocument.parse(
129128
"{_id: 'XYZ-789', myString: 'BSON', "
130129
+ "myBytes: {$binary: 'eHl6', $type: '00'}, myArray: []}");
131130
BsonDocument valueDocument4 =
132131
BsonDocument.parse(
133-
"{_id: 'XYZ-789', myDouble: 23.23, "
132+
"{myDouble: 23.23, myBytes: {$binary: 'eHl6', $type: '00'}, myArray: []}");
133+
BsonDocument valueDocument5 =
134+
BsonDocument.parse(
135+
"{ myLong: {$numberLong: '42'}, myDouble: 23.23, myString: 'BSON', "
134136
+ "myBytes: {$binary: 'eHl6', $type: '00'}, myArray: []}");
135137

136138
flatValueFieldsMapBlockList =
@@ -139,7 +141,7 @@ static void setupFlatDocMaps() {
139141
put("", valueDocument1);
140142
put("*", valueDocument2);
141143
put("**", valueDocument2);
142-
put("_id", valueDocument1);
144+
put("_id", valueDocument5);
143145
put("myLong, myDouble", valueDocument3);
144146
put("missing1,unknown2", valueDocument1);
145147
}
@@ -159,26 +161,16 @@ static void setupFlatDocMaps() {
159161

160162
@BeforeAll
161163
static void setupNestedFieldLists() {
162-
163-
// NOTE: FieldProjectors are currently implemented so that
164-
// a) when block listing: already present _id fields are never removed even if specified
165-
// and
166-
// b) when allow listing: already present _id fields are always kept even if not specified
167-
168164
BsonDocument keyDocument1 =
169165
BsonDocument.parse(
170-
"{_id: 'ABC-123', myInt: 42, "
171-
+ "subDoc1: {myBoolean: false}, subDoc2: {myString: 'BSON2'}}");
166+
"{myInt: 42, subDoc1: {myBoolean: false}, subDoc2: {myString: 'BSON2'}}");
172167
BsonDocument keyDocument2 =
173168
BsonDocument.parse(
174-
"{_id: 'ABC-123', "
175-
+ "subDoc1: {myString: 'BSON1', myBoolean: false}, "
176-
+ "subDoc2: {myString: 'BSON2', myBoolean: true}}");
177-
BsonDocument keyDocument3 = BsonDocument.parse("{_id: 'ABC-123'}");
169+
"{subDoc1: {myString: 'BSON1', myBoolean: false}, subDoc2: {myString: 'BSON2', myBoolean: true}}");
170+
BsonDocument keyDocument3 = BsonDocument.parse("{}");
178171
BsonDocument keyDocument4 =
179-
BsonDocument.parse(
180-
"{_id: 'ABC-123', subDoc1: {myBoolean: false}, subDoc2: {myBoolean: true}}");
181-
BsonDocument keyDocument5 = BsonDocument.parse("{_id: 'ABC-123', subDoc1: {}, subDoc2: {}}");
172+
BsonDocument.parse("{subDoc1: {myBoolean: false}, subDoc2: {myBoolean: true}}");
173+
BsonDocument keyDocument5 = BsonDocument.parse("{subDoc1: {}, subDoc2: {}}");
182174
BsonDocument keyDocument6 =
183175
BsonDocument.parse("{_id: 'ABC-123', myInt: 42, subDoc1: {}, subDoc2: {}}");
184176

@@ -202,26 +194,23 @@ static void setupNestedFieldLists() {
202194
};
203195

204196
// Value documents
205-
BsonDocument valueDocument1 = BsonDocument.parse("{_id: 'XYZ-789', myBoolean: true}");
206-
BsonDocument valueDocument2 = BsonDocument.parse("{_id: 'XYZ-789'}");
197+
BsonDocument valueDocument1 = BsonDocument.parse("{myBoolean: true}");
198+
BsonDocument valueDocument2 = BsonDocument.parse("{}");
207199
BsonDocument valueDocument3 =
208200
BsonDocument.parse(
209201
"{_id: 'XYZ-789', myBoolean: true, "
210202
+ "subDoc1: {myFieldA: 'some text', myFieldB: 12.34}}");
211203
BsonDocument valueDocument4 =
212204
BsonDocument.parse(
213-
"{_id: 'XYZ-789', "
214-
+ "subDoc1: {subSubDoc: {myString: 'some text', myInt: 0, myBoolean: false}}, subDoc2: {}}");
205+
"{subDoc1: {subSubDoc: {myString: 'some text', myInt: 0, myBoolean: false}}, subDoc2: {}}");
215206
BsonDocument valueDocument5 =
216207
BsonDocument.parse(
217-
"{_id: 'XYZ-789', "
218-
+ "subDoc1: {subSubDoc: {myString: 'some text', myInt: 0, myBoolean: false}}, "
208+
"{subDoc1: {subSubDoc: {myString: 'some text', myInt: 0, myBoolean: false}}, "
219209
+ "subDoc2: {subSubDoc: {myBytes: {$binary: 'eHl6', $type: '00'}, "
220210
+ " myArray: [{key: 'abc', value: 123}, {key: 'xyz', value: 987}]}}}");
221211
BsonDocument valueDocument6 =
222212
BsonDocument.parse(
223-
"{_id: 'XYZ-789',"
224-
+ "subDoc1: {myFieldA: 'some text', myFieldB: 12.34}, subDoc2: {myFieldA: 'some text', myFieldB: 12.34}}");
213+
"{subDoc1: {myFieldA: 'some text', myFieldB: 12.34}, subDoc2: {myFieldA: 'some text', myFieldB: 12.34}}");
225214
BsonDocument valueDocument7 =
226215
BsonDocument.parse(
227216
"{_id: 'XYZ-789', myBoolean: true,"
@@ -235,8 +224,7 @@ static void setupNestedFieldLists() {
235224
BsonDocument.parse("{_id: 'XYZ-789', myBoolean: true, subDoc1: {}, subDoc2: {}}");
236225
BsonDocument valueDocument10 =
237226
BsonDocument.parse(
238-
"{_id: 'XYZ-789',"
239-
+ "subDoc1: {myFieldA: 'some text', myFieldB: 12.34, "
227+
"{ subDoc1: {myFieldA: 'some text', myFieldB: 12.34, "
240228
+ "subSubDoc: {myString: 'some text', myInt: 0, myBoolean: false}}, "
241229
+ "subDoc2: {subSubDoc: {myArray: [{key: 'abc', value: 123}, {key: 'xyz', value: 987}]}}}");
242230
BsonDocument valueDocument11 =
@@ -248,8 +236,7 @@ static void setupNestedFieldLists() {
248236
+ " subSubDoc: {myBytes: {$binary: 'eHl6', $type: '00'}, "
249237
+ " myArray: [{key: 'abc', value: 123}, {key: 'xyz', value: 987}]}}}");
250238
BsonDocument valueDocument12 =
251-
BsonDocument.parse(
252-
"{_id: 'XYZ-789', " + "subDoc2: {subSubDoc: {myArray: [{key: 'abc'}, {key: 'xyz'}]}}}");
239+
BsonDocument.parse("{subDoc2: {subSubDoc: {myArray: [{key: 'abc'}, {key: 'xyz'}]}}}");
253240

254241
nestedValueFieldsMapBlockList =
255242
new HashMap<String, BsonDocument>() {

0 commit comments

Comments
 (0)