Skip to content

Commit f2173bf

Browse files
committed
Added MongoSinkConnector & MongoSourceConnector validation
KAFKA-84
1 parent eb68e45 commit f2173bf

File tree

7 files changed

+691
-6
lines changed

7 files changed

+691
-6
lines changed

.evergreen/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ functions:
200200
params:
201201
script: |
202202
${PREPARE_SHELL}
203-
MONGODB_VERSION=${VERSION} TOPOLOGY=${TOPOLOGY} AUTH=${AUTH} SSL=${SSL} sh ${DRIVERS_TOOLS}/.evergreen/run-orchestration.sh
203+
MONGODB_VERSION=${VERSION} TOPOLOGY=${TOPOLOGY} AUTH=auth SSL=${SSL} sh ${DRIVERS_TOOLS}/.evergreen/run-orchestration.sh
204204
# run-orchestration generates expansion file with the MONGODB_URI for the cluster
205205
- command: expansions.update
206206
params:

src/integrationTest/java/com/mongodb/kafka/connect/ConnectorValidationTest.java

Lines changed: 402 additions & 0 deletions
Large diffs are not rendered by default.

src/main/java/com/mongodb/kafka/connect/MongoSinkConnector.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,26 @@
1818

1919
package com.mongodb.kafka.connect;
2020

21+
import static com.mongodb.kafka.connect.util.ConnectionValidator.validateCanConnect;
22+
import static com.mongodb.kafka.connect.util.ConnectionValidator.validateUserHasActions;
23+
import static java.util.Arrays.asList;
2124
import static java.util.Collections.singletonList;
2225

2326
import java.util.List;
2427
import java.util.Map;
2528

29+
import org.apache.kafka.common.config.Config;
2630
import org.apache.kafka.common.config.ConfigDef;
2731
import org.apache.kafka.connect.connector.Task;
2832
import org.apache.kafka.connect.sink.SinkConnector;
2933

3034
import com.mongodb.kafka.connect.sink.MongoSinkConfig;
3135
import com.mongodb.kafka.connect.sink.MongoSinkTask;
36+
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
37+
import com.mongodb.kafka.connect.source.MongoSourceConfig;
3238

3339
public class MongoSinkConnector extends SinkConnector {
40+
private static final List<String> REQUIRED_SINK_ACTIONS = asList("insert", "update", "remove");
3441
private Map<String, String> settings;
3542

3643
@Override
@@ -61,4 +68,38 @@ public void stop() {
6168
public ConfigDef config() {
6269
return MongoSinkConfig.CONFIG;
6370
}
71+
72+
@Override
73+
public Config validate(final Map<String, String> connectorConfigs) {
74+
Config config = super.validate(connectorConfigs);
75+
76+
MongoSinkConfig sinkConfig;
77+
try {
78+
sinkConfig = new MongoSinkConfig(connectorConfigs);
79+
} catch (Exception e) {
80+
return config;
81+
}
82+
83+
validateCanConnect(config, MongoSinkConfig.CONNECTION_URI_CONFIG)
84+
.ifPresent(client -> {
85+
try {
86+
sinkConfig.getTopics().forEach(topic -> {
87+
MongoSinkTopicConfig mongoSinkTopicConfig = sinkConfig.getMongoSinkTopicConfig(topic);
88+
validateUserHasActions(client,
89+
sinkConfig.getConnectionString().getCredential(),
90+
REQUIRED_SINK_ACTIONS,
91+
mongoSinkTopicConfig.getString(MongoSourceConfig.DATABASE_CONFIG),
92+
mongoSinkTopicConfig.getString(MongoSourceConfig.COLLECTION_CONFIG),
93+
MongoSourceConfig.CONNECTION_URI_CONFIG, config);
94+
95+
});
96+
} catch (Exception e) {
97+
// Ignore
98+
} finally {
99+
client.close();
100+
}
101+
});
102+
103+
return config;
104+
}
64105
}

src/main/java/com/mongodb/kafka/connect/MongoSourceConnector.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616

1717
package com.mongodb.kafka.connect;
1818

19+
import static com.mongodb.kafka.connect.util.ConnectionValidator.validateCanConnect;
20+
import static com.mongodb.kafka.connect.util.ConnectionValidator.validateUserHasActions;
21+
import static java.util.Arrays.asList;
1922
import static java.util.Collections.singletonList;
2023

2124
import java.util.List;
2225
import java.util.Map;
2326

27+
import org.apache.kafka.common.config.Config;
2428
import org.apache.kafka.common.config.ConfigDef;
2529
import org.apache.kafka.connect.connector.Task;
2630
import org.apache.kafka.connect.source.SourceConnector;
@@ -29,7 +33,7 @@
2933
import com.mongodb.kafka.connect.source.MongoSourceTask;
3034

3135
public class MongoSourceConnector extends SourceConnector {
32-
36+
private static final List<String> REQUIRED_SOURCE_ACTIONS = asList("changeStream", "find");
3337
private Map<String, String> settings;
3438

3539
@Override
@@ -42,6 +46,35 @@ public Class<? extends Task> taskClass() {
4246
return MongoSourceTask.class;
4347
}
4448

49+
@Override
50+
public Config validate(final Map<String, String> connectorConfigs) {
51+
Config config = super.validate(connectorConfigs);
52+
MongoSourceConfig sourceConfig;
53+
try {
54+
sourceConfig = new MongoSourceConfig(connectorConfigs);
55+
} catch (Exception e) {
56+
return config;
57+
}
58+
59+
validateCanConnect(config, MongoSourceConfig.CONNECTION_URI_CONFIG)
60+
.ifPresent(client -> {
61+
try {
62+
validateUserHasActions(client,
63+
sourceConfig.getConnectionString().getCredential(),
64+
REQUIRED_SOURCE_ACTIONS,
65+
sourceConfig.getString(MongoSourceConfig.DATABASE_CONFIG),
66+
sourceConfig.getString(MongoSourceConfig.COLLECTION_CONFIG),
67+
MongoSourceConfig.CONNECTION_URI_CONFIG, config);
68+
} catch (Exception e) {
69+
// Ignore
70+
} finally {
71+
client.close();
72+
}
73+
});
74+
75+
return config;
76+
}
77+
4578
@Override
4679
public List<Map<String, String>> taskConfigs(final int maxTasks) {
4780
return singletonList(settings);

src/main/java/com/mongodb/kafka/connect/sink/MongoSinkConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public class MongoSinkConfig extends AbstractConfig {
6666
private Map<String, MongoSinkTopicConfig> topicSinkConnectorConfigMap;
6767
private ConnectionString connectionString;
6868

69-
MongoSinkConfig(final Map<String, String> originals) {
69+
public MongoSinkConfig(final Map<String, String> originals) {
7070
super(CONFIG, originals, false);
7171
this.originals = unmodifiableMap(originals);
7272
topics = unmodifiableList(getList(TOPICS_CONFIG));
@@ -88,11 +88,11 @@ public ConnectionString getConnectionString() {
8888
return connectionString;
8989
}
9090

91-
List<String> getTopics() {
91+
public List<String> getTopics() {
9292
return topics;
9393
}
9494

95-
MongoSinkTopicConfig getMongoSinkTopicConfig(final String topic) {
95+
public MongoSinkTopicConfig getMongoSinkTopicConfig(final String topic) {
9696
if (!topics.contains(topic)) {
9797
throw new IllegalArgumentException(format("Unknown topic: %s, must be one of: %s", topic, topics));
9898
}

src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTopicConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public String getTopic() {
211211
return topic;
212212
}
213213

214-
MongoNamespace getNamespace() {
214+
public MongoNamespace getNamespace() {
215215
if (namespace == null) {
216216
String database = getString(DATABASE_CONFIG);
217217
if (database.isEmpty()) {
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.kafka.connect.util;
17+
18+
import static java.lang.String.format;
19+
import static java.util.Collections.emptyList;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.Optional;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
28+
import org.apache.kafka.common.config.Config;
29+
import org.apache.kafka.common.config.ConfigValue;
30+
import org.apache.kafka.connect.errors.ConnectException;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import org.bson.Document;
35+
36+
import com.mongodb.ConnectionString;
37+
import com.mongodb.MongoClientSettings;
38+
import com.mongodb.MongoCredential;
39+
import com.mongodb.MongoSecurityException;
40+
import com.mongodb.ReadPreference;
41+
import com.mongodb.client.MongoClient;
42+
import com.mongodb.client.MongoClients;
43+
import com.mongodb.event.ClusterClosedEvent;
44+
import com.mongodb.event.ClusterDescriptionChangedEvent;
45+
import com.mongodb.event.ClusterListener;
46+
import com.mongodb.event.ClusterOpeningEvent;
47+
48+
49+
public final class ConnectionValidator {
50+
51+
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionValidator.class);
52+
private static final String USERS_INFO = "{usersInfo: '%s', showPrivileges: 1}";
53+
private static final String ROLES_INFO = "{rolesInfo: '%s', showPrivileges: 1, showBuiltinRoles: 1}";
54+
55+
public static Optional<MongoClient> validateCanConnect(final Config config, final String connectionStringConfigName) {
56+
Optional<ConfigValue> optionalConnectionString = getConfigByName(config, connectionStringConfigName);
57+
if (optionalConnectionString.isPresent() && optionalConnectionString.get().errorMessages().isEmpty()) {
58+
ConfigValue configValue = optionalConnectionString.get();
59+
60+
AtomicBoolean connected = new AtomicBoolean();
61+
CountDownLatch latch = new CountDownLatch(1);
62+
ConnectionString connectionString = new ConnectionString((String) configValue.value());
63+
MongoClientSettings mongoClientSettings = MongoClientSettings.builder()
64+
.applyConnectionString(connectionString)
65+
.applyToClusterSettings(b -> b.addClusterListener(new ClusterListener() {
66+
@Override
67+
public void clusterOpening(final ClusterOpeningEvent event) {
68+
}
69+
70+
@Override
71+
public void clusterClosed(final ClusterClosedEvent event) {
72+
}
73+
74+
@Override
75+
public void clusterDescriptionChanged(final ClusterDescriptionChangedEvent event) {
76+
ReadPreference readPreference = connectionString.getReadPreference() != null
77+
? connectionString.getReadPreference() : ReadPreference.primaryPreferred();
78+
if (!connected.get() && event.getNewDescription().hasReadableServer(readPreference)) {
79+
connected.set(true);
80+
latch.countDown();
81+
}
82+
}
83+
}))
84+
.build();
85+
86+
long latchTimeout = mongoClientSettings.getSocketSettings().getConnectTimeout(TimeUnit.MILLISECONDS) + 500;
87+
MongoClient mongoClient = MongoClients.create(mongoClientSettings);
88+
89+
try {
90+
if (!latch.await(latchTimeout, TimeUnit.MILLISECONDS)) {
91+
configValue.addErrorMessage("Unable to connect to the server.");
92+
mongoClient.close();
93+
}
94+
} catch (InterruptedException e) {
95+
mongoClient.close();
96+
throw new ConnectException(e);
97+
}
98+
99+
if (configValue.errorMessages().isEmpty()) {
100+
return Optional.of(mongoClient);
101+
}
102+
}
103+
return Optional.empty();
104+
}
105+
106+
public static void validateUserHasActions(final MongoClient mongoClient, final MongoCredential credential, final List<String> actions,
107+
final String databaseName, final String collectionName, final String configName,
108+
final Config config) {
109+
110+
if (credential == null) {
111+
return;
112+
}
113+
114+
try {
115+
Document usersInfo = mongoClient.getDatabase(credential.getSource())
116+
.runCommand(Document.parse(format(USERS_INFO, credential.getUserName())));
117+
118+
List<String> unsupportedActions = new ArrayList<>(actions);
119+
for (final Document userInfo : usersInfo.getList("users", Document.class)) {
120+
unsupportedActions = removeUserActions(userInfo, credential.getSource(), databaseName, collectionName, actions);
121+
122+
if (!unsupportedActions.isEmpty() && userInfo.getList("inheritedPrivileges", Document.class, emptyList()).isEmpty()) {
123+
for (final Document inheritedRole : userInfo.getList("inheritedRoles", Document.class, emptyList())) {
124+
Document rolesInfo = mongoClient.getDatabase(inheritedRole.getString("db"))
125+
.runCommand(Document.parse(format(ROLES_INFO, inheritedRole.getString("role"))));
126+
for (final Document roleInfo : rolesInfo.getList("roles", Document.class, emptyList())) {
127+
unsupportedActions = removeUserActions(roleInfo, credential.getSource(), databaseName, collectionName,
128+
unsupportedActions);
129+
}
130+
131+
if (unsupportedActions.isEmpty()) {
132+
return;
133+
}
134+
}
135+
}
136+
if (unsupportedActions.isEmpty()) {
137+
return;
138+
}
139+
}
140+
141+
String missingPermissions = String.join(", ", unsupportedActions);
142+
getConfigByName(config, configName).ifPresent(c ->
143+
c.addErrorMessage(format("Invalid user permissions. Missing the following action permissions: %s", missingPermissions))
144+
);
145+
} catch (MongoSecurityException e) {
146+
getConfigByName(config, configName).ifPresent(c -> c.addErrorMessage("Invalid user permissions authentication failed.")
147+
);
148+
} catch (Exception e) {
149+
LOGGER.warn("Permission validation failed due to: {}", e.getMessage(), e);
150+
}
151+
}
152+
153+
/**
154+
* Checks the roles info document for matching actions and removes them from the provided list
155+
*
156+
* See: https://docs.mongodb.com/manual/reference/command/rolesInfo
157+
* See: https://docs.mongodb.com/manual/reference/resource-document/
158+
*/
159+
private static List<String> removeUserActions(final Document rolesInfo, final String authDatabase, final String databaseName,
160+
final String collectionName, final List<String> userActions) {
161+
List<Document> privileges = rolesInfo.getList("inheritedPrivileges", Document.class, emptyList());
162+
if (privileges.isEmpty() || userActions.isEmpty()) {
163+
return userActions;
164+
}
165+
166+
List<String> unsupportedUserActions = new ArrayList<>(userActions);
167+
for (final Document privilege : privileges) {
168+
Document resource = privilege.get("resource", new Document());
169+
if (resource.containsKey("cluster") && resource.getBoolean("cluster")) {
170+
unsupportedUserActions.removeAll(privilege.getList("actions", String.class, emptyList()));
171+
} else if (resource.containsKey("db") && resource.containsKey("collection")) {
172+
String database = resource.getString("db");
173+
String collection = resource.getString("collection");
174+
175+
boolean resourceMatches = false;
176+
boolean collectionMatches = collection.isEmpty() || collection.equals(collectionName);
177+
if (database.isEmpty() && collectionMatches) {
178+
resourceMatches = true;
179+
} else if (database.equals(authDatabase) && collection.isEmpty()) {
180+
resourceMatches = true;
181+
} else if (database.equals(databaseName) && collectionMatches) {
182+
resourceMatches = true;
183+
}
184+
185+
if (resourceMatches) {
186+
unsupportedUserActions.removeAll(privilege.getList("actions", String.class, emptyList()));
187+
}
188+
}
189+
190+
if (unsupportedUserActions.isEmpty()) {
191+
break;
192+
}
193+
}
194+
195+
return unsupportedUserActions;
196+
}
197+
198+
private static Optional<ConfigValue> getConfigByName(final Config config, final String name) {
199+
for (final ConfigValue configValue : config.configValues()) {
200+
if (configValue.name().equals(name)) {
201+
return Optional.of(configValue);
202+
}
203+
}
204+
return Optional.empty();
205+
}
206+
207+
private ConnectionValidator() {
208+
}
209+
}

0 commit comments

Comments
 (0)