Skip to content

Commit d2a304f

Browse files
authored
Added support for serverApi (#76)
KAFKA-181
1 parent 3b87559 commit d2a304f

File tree

11 files changed

+248
-8
lines changed

11 files changed

+248
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
## 1.6.0
66

77
### Improvements
8+
- [KAFKA-181](https://jira.mongodb.org/browse/KAFKA-181) Added support for serverApi.
89
- [KAFKA-228](https://jira.mongodb.org/browse/KAFKA-228) Added support for automatic timeseries collection creation for 5.0
910
- [KAFKA-215](https://jira.mongodb.org/browse/KAFKA-215) Added mongo specific override options for error handling properties
1011
- [KAFKA-222](https://jira.mongodb.org/browse/KAFKA-222) Added a new jar `mongo-kafka-connect-<version>-confluent.jar` which just contains

src/integrationTest/java/com/mongodb/kafka/connect/ConnectorValidationIntegrationTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb.kafka.connect;
1818

19+
import static com.mongodb.kafka.connect.sink.MongoSinkConfig.CONNECTION_URI_CONFIG;
1920
import static com.mongodb.kafka.connect.sink.MongoSinkConfig.TOPIC_OVERRIDE_CONFIG;
2021
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_EXPIRE_AFTER_SECONDS_CONFIG;
2122
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_GRANULARITY_CONFIG;
@@ -57,6 +58,7 @@
5758
import com.mongodb.kafka.connect.sink.MongoSinkConfig;
5859
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
5960
import com.mongodb.kafka.connect.source.MongoSourceConfig;
61+
import com.mongodb.kafka.connect.util.ServerApiConfig;
6062

6163
public final class ConnectorValidationIntegrationTest {
6264

@@ -101,6 +103,15 @@ void testSinkConfigValidationInvalidConnection() {
101103
MongoSinkConfig.CONNECTION_URI_CONFIG);
102104
}
103105

106+
@Test
107+
@DisplayName("Ensure sink configuration validation works with serverApi")
108+
void testSinkConfigValidationWithServerApi() {
109+
assumeTrue(isAtleastFiveDotZero(getMongoClient()));
110+
Map<String, String> sinkProperties = createSinkProperties();
111+
sinkProperties.put(ServerApiConfig.SERVER_API_VERSION_CONFIG, "1");
112+
assertValidSink(sinkProperties);
113+
}
114+
104115
@Test
105116
@DisplayName("Ensure sink configuration validation handles invalid user")
106117
void testSinkConfigValidationInvalidUser() {
@@ -118,6 +129,15 @@ void testSinkConfigValidationInvalidUser() {
118129
MongoSinkConfig.CONNECTION_URI_CONFIG);
119130
}
120131

132+
@Test
133+
@DisplayName("Ensure sink configuration validation works with invalid serverApi")
134+
void testSinkConfigValidationWithInvalidServerApi() {
135+
assumeFalse(isAtleastFiveDotZero(getMongoClient()));
136+
Map<String, String> sinkProperties = createSinkProperties();
137+
sinkProperties.put(ServerApiConfig.SERVER_API_VERSION_CONFIG, "1");
138+
assertInvalidSink(sinkProperties, CONNECTION_URI_CONFIG);
139+
}
140+
121141
@Test
122142
@DisplayName("Ensure sink validation fails with read user")
123143
void testSinkConfigValidationReadUser() {
@@ -364,12 +384,30 @@ void testSourceConfigValidation() {
364384
assertValidSource(createSourceProperties());
365385
}
366386

387+
@Test
388+
@DisplayName("Ensure source configuration validation works with serverApi")
389+
void testSourceConfigValidationWithValidServerApi() {
390+
assumeTrue(isAtleastFiveDotZero(getMongoClient()));
391+
Map<String, String> sourceProperties = createSourceProperties();
392+
sourceProperties.put(ServerApiConfig.SERVER_API_VERSION_CONFIG, "1");
393+
assertValidSource(sourceProperties);
394+
}
395+
367396
@Test
368397
@DisplayName("Ensure source configuration validation handles invalid connections")
369398
void testSourceConfigValidationInvalidConnection() {
370399
assertInvalidSource(createSourceProperties("mongodb://192.0.2.0:27017/?connectTimeoutMS=1000"));
371400
}
372401

402+
@Test
403+
@DisplayName("Ensure source configuration validation works with invalid serverApi")
404+
void testSourceConfigValidationWithInvalidServerApi() {
405+
assumeFalse(isAtleastFiveDotZero(getMongoClient()));
406+
Map<String, String> sourceProperties = createSourceProperties();
407+
sourceProperties.put(ServerApiConfig.SERVER_API_VERSION_CONFIG, "1");
408+
assertInvalidSource(sourceProperties, CONNECTION_URI_CONFIG);
409+
}
410+
373411
@Test
374412
@DisplayName("Ensure source configuration validation handles invalid user")
375413
void testSourceConfigValidationInvalidUser() {
@@ -433,6 +471,14 @@ private void assertInvalidSource(final Map<String, String> properties) {
433471
assertFalse(getSourceErrors(properties).isEmpty(), "Source had valid configuration");
434472
}
435473

474+
private void assertInvalidSource(final Map<String, String> properties, final String configName) {
475+
Optional<ConfigValue> configValue =
476+
getSourceErrors(properties).stream().filter(cv -> cv.name().equals(configName)).findFirst();
477+
assertTrue(configValue.isPresent());
478+
assertFalse(
479+
configValue.get().errorMessages().isEmpty(), format("No error for '%s'", configName));
480+
}
481+
436482
private void assertValidSource(final Map<String, String> properties) {
437483
assumeTrue(isReplicaSetOrSharded());
438484
List<ConfigValue> sourceErrors = getSourceErrors(properties);

src/main/java/com/mongodb/kafka/connect/MongoSinkConnector.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
package com.mongodb.kafka.connect;
2020

21+
import static com.mongodb.kafka.connect.sink.MongoSinkConfig.CONNECTION_URI_CONFIG;
2122
import static com.mongodb.kafka.connect.util.ConfigHelper.getConfigByName;
2223
import static com.mongodb.kafka.connect.util.ConnectionValidator.validateCanConnect;
2324
import static com.mongodb.kafka.connect.util.ConnectionValidator.validateUserHasActions;
25+
import static com.mongodb.kafka.connect.util.ServerApiConfig.validateServerApi;
2426
import static com.mongodb.kafka.connect.util.TimeseriesValidation.validTopicRegexConfigAndCollection;
2527
import static com.mongodb.kafka.connect.util.TimeseriesValidation.validateConfigAndCollection;
2628
import static java.util.Arrays.asList;
@@ -87,10 +89,11 @@ public Config validate(final Map<String, String> connectorConfigs) {
8789
return config;
8890
}
8991

90-
validateCanConnect(config, MongoSinkConfig.CONNECTION_URI_CONFIG)
92+
validateCanConnect(config, CONNECTION_URI_CONFIG)
9193
.ifPresent(
9294
client -> {
9395
try {
96+
validateServerApi(client, config);
9497
sinkConfig
9598
.getTopics()
9699
.ifPresent(
@@ -109,7 +112,7 @@ public Config validate(final Map<String, String> connectorConfigs) {
109112
MongoSinkTopicConfig.DATABASE_CONFIG),
110113
mongoSinkTopicConfig.getString(
111114
MongoSinkTopicConfig.COLLECTION_CONFIG),
112-
MongoSinkConfig.CONNECTION_URI_CONFIG,
115+
CONNECTION_URI_CONFIG,
113116
config);
114117
validateConfigAndCollection(client, mongoSinkTopicConfig, config);
115118
}));
@@ -127,7 +130,7 @@ public Config validate(final Map<String, String> connectorConfigs) {
127130
getConfigByName(config, MongoSinkTopicConfig.COLLECTION_CONFIG)
128131
.map(c -> (String) c.value())
129132
.orElse(""),
130-
MongoSinkConfig.CONNECTION_URI_CONFIG,
133+
CONNECTION_URI_CONFIG,
131134
config);
132135
validTopicRegexConfigAndCollection(client, sinkConfig, config);
133136
});

src/main/java/com/mongodb/kafka/connect/MongoSourceConnector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static com.mongodb.kafka.connect.util.ConnectionValidator.validateCanConnect;
2020
import static com.mongodb.kafka.connect.util.ConnectionValidator.validateUserHasActions;
21+
import static com.mongodb.kafka.connect.util.ServerApiConfig.validateServerApi;
2122
import static java.util.Arrays.asList;
2223
import static java.util.Collections.singletonList;
2324

@@ -60,6 +61,7 @@ public Config validate(final Map<String, String> connectorConfigs) {
6061
.ifPresent(
6162
client -> {
6263
try {
64+
validateServerApi(client, config);
6365
validateUserHasActions(
6466
client,
6567
sourceConfig.getConnectionString().getCredential(),

src/main/java/com/mongodb/kafka/connect/sink/MongoSinkConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.mongodb.kafka.connect.sink;
2020

2121
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TOPIC_OVERRIDE_PREFIX;
22+
import static com.mongodb.kafka.connect.util.ServerApiConfig.addServerApiConfig;
2223
import static com.mongodb.kafka.connect.util.Validators.errorCheckingValueValidator;
2324
import static java.lang.String.format;
2425
import static java.util.Collections.emptyList;
@@ -276,6 +277,8 @@ public Map<String, ConfigValue> validateAll(final Map<String, String> props) {
276277
Width.MEDIUM,
277278
CONNECTION_URI_DISPLAY);
278279

280+
addServerApiConfig(configDef);
281+
279282
group = "Overrides";
280283
orderInGroup = 0;
281284
configDef.define(

src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTask.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.MAX_NUM_RETRIES_CONFIG;
2323
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.RETRIES_DEFER_TIMEOUT_CONFIG;
2424
import static com.mongodb.kafka.connect.util.ConfigHelper.getMongoDriverInformation;
25+
import static com.mongodb.kafka.connect.util.ServerApiConfig.setServerApi;
2526
import static com.mongodb.kafka.connect.util.TimeseriesValidation.validateCollection;
2627
import static java.util.Collections.emptyList;
2728

@@ -50,6 +51,7 @@
5051
import org.bson.BsonDocument;
5152

5253
import com.mongodb.MongoBulkWriteException;
54+
import com.mongodb.MongoClientSettings;
5355
import com.mongodb.MongoException;
5456
import com.mongodb.MongoNamespace;
5557
import com.mongodb.bulk.BulkWriteResult;
@@ -182,9 +184,12 @@ private Consumer<MongoProcessedSinkRecordData> createErrorReporter() {
182184

183185
private MongoClient getMongoClient() {
184186
if (mongoClient == null) {
187+
MongoClientSettings.Builder builder =
188+
MongoClientSettings.builder().applyConnectionString(sinkConfig.getConnectionString());
189+
setServerApi(builder, sinkConfig);
185190
mongoClient =
186191
MongoClients.create(
187-
sinkConfig.getConnectionString(),
192+
builder.build(),
188193
getMongoDriverInformation(CONNECTOR_TYPE, sinkConfig.getString(PROVIDER_CONFIG)));
189194
}
190195
return mongoClient;

src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static com.mongodb.kafka.connect.util.ConfigHelper.collationFromJson;
2323
import static com.mongodb.kafka.connect.util.ConfigHelper.fullDocumentFromString;
2424
import static com.mongodb.kafka.connect.util.ConfigHelper.jsonArrayFromString;
25+
import static com.mongodb.kafka.connect.util.ServerApiConfig.addServerApiConfig;
2526
import static com.mongodb.kafka.connect.util.Validators.emptyString;
2627
import static com.mongodb.kafka.connect.util.Validators.errorCheckingValueValidator;
2728
import static java.lang.String.format;
@@ -499,7 +500,8 @@ public Map<String, ConfigValue> validateAll(final Map<String, String> props) {
499500
return results;
500501
}
501502
};
502-
String group = "ChangeStream";
503+
504+
String group = "Connection";
503505
int orderInGroup = 0;
504506
configDef.define(
505507
CONNECTION_URI_CONFIG,
@@ -535,6 +537,10 @@ public Map<String, ConfigValue> validateAll(final Map<String, String> props) {
535537
Width.MEDIUM,
536538
COLLECTION_DISPLAY);
537539

540+
addServerApiConfig(configDef);
541+
542+
group = "Change stream";
543+
orderInGroup = 0;
538544
configDef.define(
539545
PIPELINE_CONFIG,
540546
Type.STRING,

src/main/java/com/mongodb/kafka/connect/source/MongoSourceTask.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static com.mongodb.kafka.connect.source.producer.SchemaAndValueProducers.createKeySchemaAndValueProvider;
3131
import static com.mongodb.kafka.connect.source.producer.SchemaAndValueProducers.createValueSchemaAndValueProvider;
3232
import static com.mongodb.kafka.connect.util.ConfigHelper.getMongoDriverInformation;
33+
import static com.mongodb.kafka.connect.util.ServerApiConfig.setServerApi;
3334
import static java.lang.String.format;
3435
import static java.util.Collections.singletonMap;
3536

@@ -167,10 +168,14 @@ public void start(final Map<String, String> props) {
167168
partitionMap = null;
168169
createPartitionMap(sourceConfig);
169170

171+
MongoClientSettings.Builder builder =
172+
MongoClientSettings.builder().applyConnectionString(sourceConfig.getConnectionString());
173+
setServerApi(builder, sourceConfig);
170174
mongoClient =
171175
MongoClients.create(
172-
sourceConfig.getConnectionString(),
176+
builder.build(),
173177
getMongoDriverInformation(CONNECTOR_TYPE, sourceConfig.getString(PROVIDER_CONFIG)));
178+
174179
if (shouldCopyData()) {
175180
setCachedResultAndResumeToken();
176181
copyDataManager = new MongoCopyDataManager(sourceConfig, mongoClient);

src/main/java/com/mongodb/kafka/connect/util/ConfigHelper.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,4 +170,13 @@ public static <T> T getOverrideOrFallback(
170170
public static Optional<ConfigValue> getConfigByName(final Config config, final String name) {
171171
return config.configValues().stream().filter(cv -> cv.name().equals(name)).findFirst();
172172
}
173+
174+
public static Optional<ConfigValue> getConfigByNameWithoutErrors(
175+
final Config config, final String name) {
176+
Optional<ConfigValue> configByName = getConfigByName(config, name);
177+
if (configByName.isPresent() && configByName.get().errorMessages().isEmpty()) {
178+
return configByName;
179+
}
180+
return Optional.empty();
181+
}
173182
}

src/main/java/com/mongodb/kafka/connect/util/ConnectionValidator.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.mongodb.kafka.connect.util;
1717

18+
import static com.mongodb.kafka.connect.util.ServerApiConfig.setServerApi;
1819
import static java.lang.String.format;
1920
import static java.util.Collections.emptyList;
2021

@@ -63,9 +64,12 @@ public static Optional<MongoClient> validateCanConnect(
6364
AtomicBoolean connected = new AtomicBoolean();
6465
CountDownLatch latch = new CountDownLatch(1);
6566
ConnectionString connectionString = new ConnectionString((String) configValue.value());
67+
MongoClientSettings.Builder mongoClientSettingsBuilder =
68+
MongoClientSettings.builder().applyConnectionString(connectionString);
69+
setServerApi(mongoClientSettingsBuilder, config);
70+
6671
MongoClientSettings mongoClientSettings =
67-
MongoClientSettings.builder()
68-
.applyConnectionString(connectionString)
72+
mongoClientSettingsBuilder
6973
.applyToClusterSettings(
7074
b ->
7175
b.addClusterListener(

0 commit comments

Comments
 (0)