Skip to content

Commit 1b0f002

Browse files
authored
Merge pull request #64 from marklogic-community/feature/58-password
Changing password fields to be of type PASSWORD
2 parents 8b4e678 + 0ff798e commit 1b0f002

File tree

5 files changed

+44
-16
lines changed

5 files changed

+44
-16
lines changed

CONTRIBUTING.md

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,18 +105,31 @@ You can also manually configure an instance of the MarkLogic Kafka connector as
105105
In the list of connectors in Control Center, the connector will initially have a status of "Failed" while it starts up.
106106
After it starts successfully, it will have a status of "Running".
107107

108+
## Debugging the MarkLogic Kafka connector
109+
110+
The main mechanism for debugging an instance of the MarkLogic Kafka connector is by examining logs from the
111+
connector. You can access those, along with logging from Kafka Connect and all other connectors, by running the
112+
following:
113+
114+
confluent local services connect log -f
115+
116+
See [the log command docs](https://docs.confluent.io/confluent-cli/current/command-reference/local/services/connect/confluent_local_services_connect_log.html)
117+
for more information.
118+
108119
## Destroying and setting up the Confluent Platform instance
109120

110121
While developing and testing the MarkLogic Kafka connector, it is common that the "local" instance of Confluent
111122
Platform will become unstable and no longer work. The [Confluent local docs](https://docs.confluent.io/confluent-cli/current/command-reference/local/confluent_local_current.html)
112123
make reference to this - "The data that are produced are transient and are intended to be temporary".
113124

114125
It is thus advisable that after you copy a new instance of the MarkLogic Kafka connector into Confluent Platform (i.e.
115-
by running `./gradlew copyConnectorToConfluent`), you should destroy your local Confluent Platform instance:
126+
by running `./gradlew copyConnectorToConfluent`), you should destroy your local Confluent Platform instance (this
127+
will usually finish in around 15s):
116128

117129
./gradlew destroyLocalConfluent
118130

119-
After doing that, you can quickly automate starting Confluent Platform and loading the two connectors via the following:
131+
After doing that, you can quickly automate starting Confluent Platform and loading the two connectors via the
132+
following (this will usually finish in around 1m):
120133

121134
./gradlew setupLocalConfluent
122135

@@ -125,6 +138,11 @@ Remember that if you've modified the connector code, you'll first need to run `.
125138
Doing the above will provide the most reliable way to get a new and working instance of Confluent Platform with the
126139
MarkLogic Kafka connector installed.
127140

141+
For brevity, you may prefer this (Gradle will figure out the tasks as long as only one task starts with "destroy"
142+
and one task starts with "setup"):
143+
144+
./gradlew destroy setup
145+
128146
You may have luck with simply doing `confluent local services stop`, `./gradlew copyConnectorToConfluent`, and
129147
`confluent local services start`, but this has so far not worked reliably - i.e. one of the Confluent Platform
130148
services (sometimes Schema Registry, sometimes Control Center) usually stops working.

src/main/java/com/marklogic/kafka/connect/DefaultDatabaseClientConfigBuilder.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
import com.marklogic.client.DatabaseClientFactory;
55
import com.marklogic.client.ext.DatabaseClientConfig;
66
import com.marklogic.client.ext.SecurityContextType;
7+
import com.marklogic.client.ext.helper.LoggingObject;
78
import com.marklogic.client.ext.modulesloader.ssl.SimpleX509TrustManager;
89
import com.marklogic.kafka.connect.sink.MarkLogicSinkConfig;
10+
import org.apache.kafka.common.config.types.Password;
911

1012
import javax.net.ssl.*;
1113
import java.io.FileInputStream;
@@ -16,13 +18,16 @@
1618
import java.security.NoSuchAlgorithmException;
1719
import java.util.Map;
1820

19-
public class DefaultDatabaseClientConfigBuilder implements DatabaseClientConfigBuilder {
21+
public class DefaultDatabaseClientConfigBuilder extends LoggingObject implements DatabaseClientConfigBuilder {
2022

2123
@Override
2224
public DatabaseClientConfig buildDatabaseClientConfig(Map<String, Object> parsedConfig) {
2325
DatabaseClientConfig clientConfig = new DatabaseClientConfig();
2426
clientConfig.setCertFile((String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_CERT_FILE));
25-
clientConfig.setCertPassword((String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD));
27+
Password certPassword = (Password) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD);
28+
if (certPassword != null) {
29+
clientConfig.setCertPassword(certPassword.value());
30+
}
2631
clientConfig.setTrustManager(new SimpleX509TrustManager());
2732
clientConfig = configureHostNameVerifier(clientConfig, parsedConfig);
2833
String securityContextType = ((String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE)).toUpperCase();
@@ -37,7 +42,10 @@ public DatabaseClientConfig buildDatabaseClientConfig(Map<String, Object> parsed
3742
}
3843
clientConfig.setExternalName((String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_EXTERNAL_NAME));
3944
clientConfig.setHost((String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_HOST));
40-
clientConfig.setPassword((String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_PASSWORD));
45+
Password password = (Password) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_PASSWORD);
46+
if (password != null) {
47+
clientConfig.setPassword(password.value());
48+
}
4149
clientConfig.setPort((Integer) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_PORT));
4250
Boolean customSsl = (Boolean) parsedConfig.get(MarkLogicSinkConfig.SSL);
4351
if (customSsl != null && customSsl) {

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConfig.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,13 @@ public class MarkLogicSinkConfig extends AbstractConfig {
5858
.define(CONNECTION_PORT, Type.INT, Importance.HIGH, "The REST app server port to connect to")
5959
.define(CONNECTION_DATABASE, Type.STRING, "", Importance.LOW, "Database to connect, if different from the one associated with the port")
6060
.define(CONNECTION_SECURITY_CONTEXT_TYPE, Type.STRING, "NONE", Importance.HIGH, "Type of MarkLogic security context to create - either digest, basic, kerberos, certificate, or none")
61-
.define(CONNECTION_USERNAME, Type.STRING, Importance.HIGH, "Name of MarkLogic user to authenticate as")
62-
.define(CONNECTION_PASSWORD, Type.STRING, Importance.HIGH, "Password for the MarkLogic user")
61+
.define(CONNECTION_USERNAME, Type.STRING, null, Importance.HIGH, "Name of MarkLogic user to " +
62+
"authenticate as")
63+
.define(CONNECTION_PASSWORD, Type.PASSWORD, null, Importance.HIGH, "Password for the MarkLogic user")
6364
.define(CONNECTION_TYPE, Type.STRING, "DIRECT", Importance.LOW, "Connection type; DIRECT or GATEWAY")
6465
.define(CONNECTION_SIMPLE_SSL, Type.BOOLEAN, false, Importance.LOW, "Set to true to use a trust-everything SSL connection")
6566
.define(CONNECTION_CERT_FILE, Type.STRING, "", Importance.LOW, "Path to a certificate file")
66-
.define(CONNECTION_CERT_PASSWORD, Type.STRING, "", Importance.LOW, "Password for the certificate file")
67+
.define(CONNECTION_CERT_PASSWORD, Type.PASSWORD, null, Importance.LOW, "Password for the certificate file")
6768
.define(CONNECTION_EXTERNAL_NAME, Type.STRING, "", Importance.LOW, "External name for Kerberos authentication")
6869
.define(DATAHUB_FLOW_NAME, Type.STRING, null, Importance.MEDIUM, "Name of a Data Hub flow to run")
6970
.define(DATAHUB_FLOW_STEPS, Type.STRING, null, Importance.MEDIUM, "Comma-delimited names of steps to run")

src/test/java/com/marklogic/kafka/connect/BuildDatabaseClientConfigTest.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.marklogic.client.ext.SecurityContextType;
77
import com.marklogic.kafka.connect.sink.MarkLogicSinkConfig;
88
import org.apache.kafka.common.config.ConfigException;
9+
import org.apache.kafka.common.config.types.Password;
910
import org.junit.jupiter.api.Assertions;
1011
import org.junit.jupiter.api.BeforeEach;
1112
import org.junit.jupiter.api.Test;
@@ -34,7 +35,7 @@ public void setup() {
3435
public void basicAuthentication() {
3536
config.put(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE, "basic");
3637
config.put(MarkLogicSinkConfig.CONNECTION_USERNAME, "some-user");
37-
config.put(MarkLogicSinkConfig.CONNECTION_PASSWORD, "some-password");
38+
config.put(MarkLogicSinkConfig.CONNECTION_PASSWORD, new Password("some-password"));
3839

3940
DatabaseClientConfig clientConfig = builder.buildDatabaseClientConfig(config);
4041
assertEquals("some-host", clientConfig.getHost());
@@ -49,7 +50,7 @@ public void basicAuthentication() {
4950
public void certificateAuthentication() {
5051
config.put(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE, "certificate");
5152
config.put(MarkLogicSinkConfig.CONNECTION_CERT_FILE, "/path/to/file");
52-
config.put(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD, "cert-password");
53+
config.put(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD, new Password("cert-password"));
5354

5455
DatabaseClientConfig clientConfig = builder.buildDatabaseClientConfig(config);
5556
assertEquals(SecurityContextType.CERTIFICATE, clientConfig.getSecurityContextType());
@@ -102,7 +103,7 @@ public void basicAuthenticationAndMutualSSL() {
102103
config.put(MarkLogicSinkConfig.SSL_HOST_VERIFIER, "STRICT");
103104
config.put(MarkLogicSinkConfig.SSL_MUTUAL_AUTH, "true");
104105
config.put(MarkLogicSinkConfig.CONNECTION_CERT_FILE, absolutePath);
105-
config.put(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD, "abc");
106+
config.put(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD, new Password("abc"));
106107

107108
DatabaseClientConfig clientConfig = builder.buildDatabaseClientConfig(config);
108109
assertEquals(SecurityContextType.BASIC, clientConfig.getSecurityContextType());
@@ -122,7 +123,7 @@ public void basicAuthenticationAndMutualSSLWithInvalidHost() {
122123
config.put(MarkLogicSinkConfig.SSL_HOST_VERIFIER, "SOMETHING");
123124
config.put(MarkLogicSinkConfig.SSL_MUTUAL_AUTH, "true");
124125
config.put(MarkLogicSinkConfig.CONNECTION_CERT_FILE, absolutePath);
125-
config.put(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD, "abc");
126+
config.put(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD, new Password("abc"));
126127

127128
DatabaseClientConfig clientConfig = builder.buildDatabaseClientConfig(config);
128129
assertEquals(SecurityContextType.BASIC, clientConfig.getSecurityContextType());
@@ -165,8 +166,6 @@ public void testMissingRequired() {
165166
Map<String, Object> allRequiredValuesConfig = new HashMap<>();
166167
allRequiredValuesConfig.put(MarkLogicSinkConfig.CONNECTION_HOST, "");
167168
allRequiredValuesConfig.put(MarkLogicSinkConfig.CONNECTION_PORT, 8000);
168-
allRequiredValuesConfig.put(MarkLogicSinkConfig.CONNECTION_USERNAME, "");
169-
allRequiredValuesConfig.put(MarkLogicSinkConfig.CONNECTION_PASSWORD, "");
170169
MarkLogicSinkConfig.CONFIG_DEF.parse(allRequiredValuesConfig);
171170

172171
Set<String> keys = allRequiredValuesConfig.keySet();

src/test/resources/confluent/marklogic-purchases-connector.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
"ml.connection.password": "admin",
1313
"ml.connection.securityContextType": "DIGEST",
1414
"ml.document.format": "JSON",
15-
"ml.document.uriSuffix": ".json"
15+
"ml.document.uriPrefix": "/purchase/",
16+
"ml.document.uriSuffix": ".json",
17+
"ml.document.collections": "purchases,kafka-data"
1618
}
17-
}
19+
}

0 commit comments

Comments
 (0)