Skip to content

Commit 1f021f7

Browse files
committed
Added AllowList and BlockList field projectors
Deprecated: `Blacklist[Key|Value]Projector`, `Whitelist[Key|Value]Projector`. Use: `BlockListKeyProjector`, `BlockListValueProjector`, `AllowListKeyProjector` or `AllowListValueProjector` instead. KAFKA-112
1 parent 268f7e7 commit 1f021f7

20 files changed

+630
-302
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
- [KAFKA-118](https://jira.mongodb.org/browse/KAFKA-118) Made UuidStrategy configurable so can output BsonBinary Uuid values
1212
- [KAFKA-101](https://jira.mongodb.org/browse/KAFKA-101) Added `UuidProvidedInKeyStrategy` & `UuidProvidedInValueStrategy`
1313
- [KAFKA-114](https://jira.mongodb.org/browse/KAFKA-114) Added `UpdateOneBusinessKeyTimestampStrategy` write model strategy`
14+
- [KAFKA-112](https://jira.mongodb.org/browse/KAFKA-112) Added `BlockList` and `AllowList` field projector type configurations and
15+
`BlockListKeyProjector`, `BlockListValueProjector`, `AllowListKeyProjector`and `AllowListValueProjector` Post processors.
16+
Deprecated: `BlacklistKeyProjector`, `BlacklistValueProjector`, `WhitelistKeyProjector` and `WhitelistValueProjector`.
1417

1518
## 1.1.0
1619
- [KAFKA-45](https://jira.mongodb.org/browse/KAFKA-45) Allow the Sink connector to ignore unused source record key or value fields.

src/main/java/com/mongodb/kafka/connect/sink/Configurable.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,10 @@
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
15-
*
1615
*/
1716

1817
package com.mongodb.kafka.connect.sink;
1918

20-
import org.apache.kafka.common.config.AbstractConfig;
21-
2219
public interface Configurable {
23-
void configure(AbstractConfig configuration);
20+
void configure(MongoSinkTopicConfig configuration);
2421
}

src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTopicConfig.java

Lines changed: 21 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import static java.util.Collections.singletonList;
2929
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
3030

31-
import java.util.Arrays;
3231
import java.util.HashMap;
3332
import java.util.List;
3433
import java.util.Map;
@@ -45,16 +44,10 @@
4544
import com.mongodb.MongoNamespace;
4645

4746
import com.mongodb.kafka.connect.sink.cdc.CdcHandler;
48-
import com.mongodb.kafka.connect.sink.processor.BlacklistKeyProjector;
49-
import com.mongodb.kafka.connect.sink.processor.BlacklistValueProjector;
5047
import com.mongodb.kafka.connect.sink.processor.PostProcessors;
51-
import com.mongodb.kafka.connect.sink.processor.WhitelistKeyProjector;
52-
import com.mongodb.kafka.connect.sink.processor.WhitelistValueProjector;
53-
import com.mongodb.kafka.connect.sink.processor.field.projection.FieldProjector;
5448
import com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy;
5549
import com.mongodb.kafka.connect.sink.processor.id.strategy.IdStrategy;
5650
import com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy;
57-
import com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy;
5851
import com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy;
5952
import com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneDefaultStrategy;
6053
import com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelStrategy;
@@ -67,7 +60,9 @@ public class MongoSinkTopicConfig extends AbstractConfig {
6760
public enum FieldProjectionType {
6861
NONE,
6962
BLACKLIST,
70-
WHITELIST
63+
WHITELIST,
64+
ALLOWLIST,
65+
BLOCKLIST
7166
}
7267

7368
public enum UuidBsonFormat {
@@ -121,7 +116,7 @@ public enum UuidBsonFormat {
121116
private static final String DOCUMENT_ID_STRATEGY_UUID_FORMAT_DISPLAY =
122117
"The document id strategy uuid format";
123118
private static final String DOCUMENT_ID_STRATEGY_UUID_FORMAT_DOC =
124-
"The bson output format for UuidStrategy.";
119+
"The bson output format when using the `UuidStrategy`.";
125120
private static final String DOCUMENT_ID_STRATEGY_UUID_FORMAT_DEFAULT = "string";
126121

127122
public static final String KEY_PROJECTION_TYPE_CONFIG = "key.projection.type";
@@ -278,32 +273,21 @@ public MongoNamespace getNamespace() {
278273
return namespace;
279274
}
280275

281-
@SuppressWarnings("unchecked")
276+
private <T> T configureInstance(final T instance) {
277+
if (instance instanceof Configurable) {
278+
((Configurable) instance).configure(this);
279+
}
280+
return instance;
281+
}
282+
282283
public IdStrategy getIdStrategy() {
283284
if (idStrategy == null) {
284-
String className = getString(DOCUMENT_ID_STRATEGY_CONFIG);
285-
Optional<FieldProjector> fieldProjector = getFieldProjector(className);
286285
idStrategy =
287-
createInstance(
288-
DOCUMENT_ID_STRATEGY_CONFIG,
289-
className,
290-
IdStrategy.class,
291-
() -> {
292-
Class<IdStrategy> clazz = (Class<IdStrategy>) Class.forName(className);
293-
boolean hasFieldProjectorConstructor =
294-
fieldProjector.isPresent()
295-
&& Arrays.stream(clazz.getConstructors())
296-
.anyMatch(
297-
i ->
298-
i.getParameterTypes().length == 1
299-
&& i.getParameterTypes()[0].equals(FieldProjector.class));
300-
if (hasFieldProjectorConstructor) {
301-
return clazz
302-
.getConstructor(FieldProjector.class)
303-
.newInstance(fieldProjector.get());
304-
}
305-
return clazz.getConstructor().newInstance();
306-
});
286+
configureInstance(
287+
createInstance(
288+
DOCUMENT_ID_STRATEGY_CONFIG,
289+
getString(DOCUMENT_ID_STRATEGY_CONFIG),
290+
IdStrategy.class));
307291
}
308292
return idStrategy;
309293
}
@@ -318,10 +302,11 @@ PostProcessors getPostProcessors() {
318302
WriteModelStrategy getWriteModelStrategy() {
319303
if (writeModelStrategy == null) {
320304
writeModelStrategy =
321-
createInstance(
322-
WRITEMODEL_STRATEGY_CONFIG,
323-
getString(WRITEMODEL_STRATEGY_CONFIG),
324-
WriteModelStrategy.class);
305+
configureInstance(
306+
createInstance(
307+
WRITEMODEL_STRATEGY_CONFIG,
308+
getString(WRITEMODEL_STRATEGY_CONFIG),
309+
WriteModelStrategy.class));
325310
}
326311
return writeModelStrategy;
327312
}
@@ -490,41 +475,6 @@ private static Map<String, String> createSinkTopicOriginals(final Map<String, St
490475
return topicConfig;
491476
}
492477

493-
private Optional<FieldProjector> getFieldProjector(final String strategyClassName) {
494-
FieldProjectionType keyProjectionType =
495-
FieldProjectionType.valueOf(getString(KEY_PROJECTION_TYPE_CONFIG).toUpperCase());
496-
FieldProjectionType valueProjectionType =
497-
FieldProjectionType.valueOf(getString(VALUE_PROJECTION_TYPE_CONFIG).toUpperCase());
498-
499-
if (keyProjectionType.equals(FieldProjectionType.NONE)
500-
&& strategyClassName.equals(PartialKeyStrategy.class.getName())) {
501-
throw new ConnectConfigException(
502-
DOCUMENT_ID_STRATEGY_CONFIG,
503-
strategyClassName,
504-
format("Invalid %s value", KEY_PROJECTION_TYPE_CONFIG));
505-
} else if (valueProjectionType.equals(FieldProjectionType.NONE)
506-
&& strategyClassName.equals(PartialValueStrategy.class.getName())) {
507-
throw new ConnectConfigException(
508-
DOCUMENT_ID_STRATEGY_CONFIG,
509-
strategyClassName,
510-
format("Invalid %s value", VALUE_PROJECTION_TYPE_CONFIG));
511-
}
512-
513-
if (keyProjectionType.equals(FieldProjectionType.BLACKLIST)) {
514-
return Optional.of(new BlacklistKeyProjector(this));
515-
} else if (keyProjectionType.equals(FieldProjectionType.WHITELIST)) {
516-
return Optional.of(new WhitelistKeyProjector(this));
517-
}
518-
519-
if (valueProjectionType.equals(FieldProjectionType.BLACKLIST)) {
520-
return Optional.of(new BlacklistValueProjector(this));
521-
} else if (valueProjectionType.equals(FieldProjectionType.WHITELIST)) {
522-
return Optional.of(new WhitelistValueProjector(this));
523-
}
524-
525-
return Optional.empty();
526-
}
527-
528478
private static ConfigDef createConfigDef() {
529479

530480
ConfigDef configDef = new ConfigDef();
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* Original Work: Apache License, Version 2.0, Copyright 2017 Hans-Peter Grahsl.
17+
*/
18+
19+
package com.mongodb.kafka.connect.sink.processor;
20+
21+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.FieldProjectionType.ALLOWLIST;
22+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.KEY_PROJECTION_LIST_CONFIG;
23+
24+
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
25+
import com.mongodb.kafka.connect.sink.processor.field.projection.AllowListProjector;
26+
27+
public class AllowListKeyProjector extends AllowListProjector {
28+
29+
public AllowListKeyProjector(final MongoSinkTopicConfig config) {
30+
this(config, config.getString(KEY_PROJECTION_LIST_CONFIG));
31+
}
32+
33+
public AllowListKeyProjector(final MongoSinkTopicConfig config, final String fieldList) {
34+
super(config, buildProjectionList(ALLOWLIST, fieldList), SinkDocumentField.KEY);
35+
}
36+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* Original Work: Apache License, Version 2.0, Copyright 2017 Hans-Peter Grahsl.
17+
*/
18+
19+
package com.mongodb.kafka.connect.sink.processor;
20+
21+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.FieldProjectionType.ALLOWLIST;
22+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.VALUE_PROJECTION_LIST_CONFIG;
23+
24+
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
25+
import com.mongodb.kafka.connect.sink.processor.field.projection.AllowListProjector;
26+
27+
public class AllowListValueProjector extends AllowListProjector {
28+
29+
public AllowListValueProjector(final MongoSinkTopicConfig config) {
30+
this(config, config.getString(VALUE_PROJECTION_LIST_CONFIG));
31+
}
32+
33+
public AllowListValueProjector(final MongoSinkTopicConfig config, final String fieldList) {
34+
super(config, buildProjectionList(ALLOWLIST, fieldList), SinkDocumentField.VALUE);
35+
}
36+
}

src/main/java/com/mongodb/kafka/connect/sink/processor/BlacklistKeyProjector.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,13 @@
1818

1919
package com.mongodb.kafka.connect.sink.processor;
2020

21-
import org.apache.kafka.connect.sink.SinkRecord;
22-
2321
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
24-
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
25-
import com.mongodb.kafka.connect.sink.processor.field.projection.BlacklistProjector;
2622

27-
public class BlacklistKeyProjector extends BlacklistProjector {
23+
/** @deprecated Use {@link BlockListKeyProjector} instead */
24+
@Deprecated
25+
public class BlacklistKeyProjector extends BlockListKeyProjector {
2826

2927
public BlacklistKeyProjector(final MongoSinkTopicConfig config) {
30-
super(config, getKeyFields(config));
31-
}
32-
33-
@Override
34-
public void process(final SinkDocument doc, final SinkRecord orig) {
35-
if (isUsingBlacklistKeyProjection()) {
36-
doc.getKeyDoc().ifPresent(kd -> getFields().forEach(f -> doProjection(f, kd)));
37-
}
28+
super(config);
3829
}
3930
}

src/main/java/com/mongodb/kafka/connect/sink/processor/BlacklistValueProjector.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,13 @@
1818

1919
package com.mongodb.kafka.connect.sink.processor;
2020

21-
import org.apache.kafka.connect.sink.SinkRecord;
22-
2321
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
24-
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
25-
import com.mongodb.kafka.connect.sink.processor.field.projection.BlacklistProjector;
2622

27-
public class BlacklistValueProjector extends BlacklistProjector {
23+
/** @deprecated Use {@link BlockListValueProjector} instead */
24+
@Deprecated
25+
public class BlacklistValueProjector extends BlockListValueProjector {
2826

2927
public BlacklistValueProjector(final MongoSinkTopicConfig config) {
30-
super(config, getValueFields(config));
31-
}
32-
33-
@Override
34-
public void process(final SinkDocument doc, final SinkRecord orig) {
35-
if (isUsingBlacklistValueProjection()) {
36-
doc.getValueDoc().ifPresent(vd -> getFields().forEach(f -> doProjection(f, vd)));
37-
}
28+
super(config);
3829
}
3930
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* Original Work: Apache License, Version 2.0, Copyright 2017 Hans-Peter Grahsl.
17+
*/
18+
19+
package com.mongodb.kafka.connect.sink.processor;
20+
21+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.FieldProjectionType.BLOCKLIST;
22+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.KEY_PROJECTION_LIST_CONFIG;
23+
24+
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
25+
import com.mongodb.kafka.connect.sink.processor.field.projection.BlockListProjector;
26+
27+
public class BlockListKeyProjector extends BlockListProjector {
28+
29+
public BlockListKeyProjector(final MongoSinkTopicConfig config) {
30+
this(config, config.getString(KEY_PROJECTION_LIST_CONFIG));
31+
}
32+
33+
public BlockListKeyProjector(final MongoSinkTopicConfig config, final String fieldList) {
34+
super(config, buildProjectionList(BLOCKLIST, fieldList), SinkDocumentField.KEY);
35+
}
36+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* Original Work: Apache License, Version 2.0, Copyright 2017 Hans-Peter Grahsl.
17+
*/
18+
19+
package com.mongodb.kafka.connect.sink.processor;
20+
21+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.FieldProjectionType.BLOCKLIST;
22+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.VALUE_PROJECTION_LIST_CONFIG;
23+
24+
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
25+
import com.mongodb.kafka.connect.sink.processor.field.projection.BlockListProjector;
26+
27+
public class BlockListValueProjector extends BlockListProjector {
28+
29+
public BlockListValueProjector(final MongoSinkTopicConfig config) {
30+
this(config, config.getString(VALUE_PROJECTION_LIST_CONFIG));
31+
}
32+
33+
public BlockListValueProjector(final MongoSinkTopicConfig config, final String fieldList) {
34+
super(config, buildProjectionList(BLOCKLIST, fieldList), SinkDocumentField.VALUE);
35+
}
36+
}

0 commit comments

Comments
 (0)