Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/marklogic-sink.properties
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ ml.connection.password=
# Set to 'true' to customize how an SSL connection is created. Only supported if securityContextType is 'BASIC' or 'DIGEST'.
# ml.connection.enableCustomSsl=true
# The TLS version to use for custom SSL
# ml.connection.customSsl.tlsVersion=TLSv1.2
# ml.connection.customSsl.tlsVersion=TLS
# The host verification strategy for custom SSL; either 'ANY', 'COMMON', or 'STRICT'
# ml.connection.customSsl.hostNameVerifier=ANY
# Set this to true for 2-way SSL; defaults to 1-way SSL
Expand Down
2 changes: 1 addition & 1 deletion config/marklogic-source.properties
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ ml.connection.password=kafkatest
# Set to 'true' to customize how an SSL connection is created. Only supported if securityContextType is 'BASIC' or 'DIGEST'.
# ml.connection.enableCustomSsl=true
# The TLS version to use for custom SSL
# ml.connection.customSsl.tlsVersion=TLSv1.2
# ml.connection.customSsl.tlsVersion=TLS
# The host verification strategy for custom SSL; either 'ANY', 'COMMON', or 'STRICT'
# ml.connection.customSsl.hostNameVerifier=ANY
# Set this to true for 2-way SSL; defaults to 1-way SSL
Expand Down
2 changes: 1 addition & 1 deletion docs/configuring-the-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ The MarkLogic connector provides two options for configuring the usage of SSL wh

If a custom SSL approach is used, you can use the following properties to configure this approach:

- `ml.connection.customSsl.tlsVersion` = the TLS version to use for constructing an `SSLContext`. Defaults to `TLSv1.2`.
- `ml.connection.customSsl.tlsVersion` = the TLS version to use for constructing an `SSLContext`. Defaults to `TLS`, permitting the JVM to use the highest version possible.
- `ml.connection.customSsl.mutualAuth` = `true` to configure mutual, or "2-way", SSL authentication

If `ml.connection.customSsl.mutualAuth` is set to `true`, you must also configure these properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public DatabaseClientConfig buildDatabaseClientConfig(Map<String, Object> parsed
* @param clientConfig
*/
private void configureSimpleSsl(DatabaseClientConfig clientConfig) {
clientConfig.setSslContext(SimpleX509TrustManager.newSSLContext());
clientConfig.setSslContext(SimpleX509TrustManager.newSSLContext("TLS"));
clientConfig.setTrustManager(new SimpleX509TrustManager());
clientConfig.setSslHostnameVerifier(DatabaseClientFactory.SSLHostnameVerifier.ANY);
}
Expand Down
88 changes: 44 additions & 44 deletions src/main/java/com/marklogic/kafka/connect/MarkLogicConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,54 +47,54 @@ public static void addDefinitions(ConfigDef configDef) {
GROUP, -1, ConfigDef.Width.MEDIUM, "Host")
.define(CONNECTION_PORT, Type.INT, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Range.atLeast(0), Importance.HIGH,
"Required; the port of a REST API app server to connect to; if using Bulk Data Services, can be a plain HTTP app server",
GROUP, -1, ConfigDef.Width.MEDIUM, "Port")
.define(CONNECTION_BASE_PATH, Type.STRING, null, Importance.MEDIUM,
"Base path for all calls to MarkLogic; typically used when a reverse proxy is in front of MarkLogic",
GROUP, -1, ConfigDef.Width.MEDIUM, "Base Path")
GROUP, -1, ConfigDef.Width.MEDIUM, "Port")
.define(CONNECTION_BASE_PATH, Type.STRING, null, Importance.MEDIUM,
"Base path for all calls to MarkLogic; typically used when a reverse proxy is in front of MarkLogic",
GROUP, -1, ConfigDef.Width.MEDIUM, "Base Path")
.define(CONNECTION_SECURITY_CONTEXT_TYPE, Type.STRING, "DIGEST", CONNECTION_SECURITY_CONTEXT_TYPE_RV, Importance.HIGH,
"Required; the authentication scheme used by the server defined by ml.connection.port; either 'DIGEST', 'BASIC', 'CERTIFICATE', 'KERBEROS', or 'NONE'",
GROUP, -1, ConfigDef.Width.MEDIUM, "Security Context Type", CONNECTION_SECURITY_CONTEXT_TYPE_RV)
.define(CONNECTION_USERNAME, Type.STRING, null, Importance.MEDIUM,
"MarkLogic username for 'DIGEST' and 'BASIC' authentication",
GROUP, -1, ConfigDef.Width.MEDIUM, "Username")
.define(CONNECTION_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
"MarkLogic password for 'DIGEST' and 'BASIC' authentication",
GROUP, -1, ConfigDef.Width.MEDIUM, "Password")
.define(CONNECTION_DATABASE, Type.STRING, null, Importance.LOW,
"Name of a database to connect to. If your REST API server has a content database matching that of the one that you want to write documents to, you do not need to set this.",
GROUP, -1, ConfigDef.Width.MEDIUM, "Database")
.define(CONNECTION_CERT_FILE, Type.STRING, null, Importance.MEDIUM,
"Path to PKCS12 file for 'CERTIFICATE' authentication",
GROUP, -1, ConfigDef.Width.MEDIUM, "Certificate File")
.define(CONNECTION_CERT_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
"Password for PKCS12 file for 'CERTIFICATE' authentication",
GROUP, -1, ConfigDef.Width.MEDIUM, "Certificate Password")
.define(CONNECTION_EXTERNAL_NAME, Type.STRING, null, Importance.MEDIUM,
"External name for 'KERBEROS' authentication",
GROUP, -1, ConfigDef.Width.MEDIUM, "Kerberos External Name")
.define(CONNECTION_CLOUD_API_KEY, Type.STRING, null, Importance.MEDIUM,
"API key for connecting to MarkLogic Cloud. Should set port to 443 when connecting to MarkLogic Cloud.",
GROUP, -1, ConfigDef.Width.MEDIUM, "Cloud API Key")
.define(CONNECTION_TYPE, Type.STRING, "", CONNECTION_TYPE_RV, Importance.MEDIUM,
"Set to 'GATEWAY' when the host identified by ml.connection.host is a load balancer. See https://docs.marklogic.com/guide/java/data-movement#id_26583 for more information.",
GROUP, -1, ConfigDef.Width.MEDIUM, "Connection Type", CONNECTION_TYPE_RV)
GROUP, -1, ConfigDef.Width.MEDIUM, "Security Context Type", CONNECTION_SECURITY_CONTEXT_TYPE_RV)
.define(CONNECTION_USERNAME, Type.STRING, null, Importance.MEDIUM,
"MarkLogic username for 'DIGEST' and 'BASIC' authentication",
GROUP, -1, ConfigDef.Width.MEDIUM, "Username")
.define(CONNECTION_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
"MarkLogic password for 'DIGEST' and 'BASIC' authentication",
GROUP, -1, ConfigDef.Width.MEDIUM, "Password")
.define(CONNECTION_DATABASE, Type.STRING, null, Importance.LOW,
"Name of a database to connect to. If your REST API server has a content database matching that of the one that you want to write documents to, you do not need to set this.",
GROUP, -1, ConfigDef.Width.MEDIUM, "Database")
.define(CONNECTION_CERT_FILE, Type.STRING, null, Importance.MEDIUM,
"Path to PKCS12 file for 'CERTIFICATE' authentication",
GROUP, -1, ConfigDef.Width.MEDIUM, "Certificate File")
.define(CONNECTION_CERT_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
"Password for PKCS12 file for 'CERTIFICATE' authentication",
GROUP, -1, ConfigDef.Width.MEDIUM, "Certificate Password")
.define(CONNECTION_EXTERNAL_NAME, Type.STRING, null, Importance.MEDIUM,
"External name for 'KERBEROS' authentication",
GROUP, -1, ConfigDef.Width.MEDIUM, "Kerberos External Name")
.define(CONNECTION_CLOUD_API_KEY, Type.STRING, null, Importance.MEDIUM,
"API key for connecting to MarkLogic Cloud. Should set port to 443 when connecting to MarkLogic Cloud.",
GROUP, -1, ConfigDef.Width.MEDIUM, "Cloud API Key")
.define(CONNECTION_TYPE, Type.STRING, "", CONNECTION_TYPE_RV, Importance.MEDIUM,
"Set to 'GATEWAY' when the host identified by ml.connection.host is a load balancer. See https://docs.marklogic.com/guide/java/data-movement#id_26583 for more information.",
GROUP, -1, ConfigDef.Width.MEDIUM, "Connection Type", CONNECTION_TYPE_RV)
// Boolean fields must have a default value of null; otherwise, Confluent Platform, at least in version 7.2.1,
// will show a default value of "true"
.define(CONNECTION_SIMPLE_SSL, Type.BOOLEAN, null, Importance.LOW,
"Set to 'true' for a simple SSL strategy that uses the JVM's default SslContext and X509TrustManager and an 'any' host verification strategy",
GROUP, -1, ConfigDef.Width.MEDIUM, "Use Simple SSL")
.define(ENABLE_CUSTOM_SSL, Type.BOOLEAN, null, Importance.LOW,
"Set to 'true' to customize how an SSL connection is created. Only supported if securityContextType is 'BASIC' or 'DIGEST'.",
GROUP, -1, ConfigDef.Width.MEDIUM, "Enable Custom SSL")
.define(TLS_VERSION, Type.STRING, "TLSv1.2", Importance.LOW,
"The TLS version to use for custom SSL",
GROUP, -1, ConfigDef.Width.MEDIUM, "TLS Version for Custom SSL")
.define(SSL_HOST_VERIFIER, Type.STRING, "ANY", SSL_HOST_VERIFIER_RV, Importance.LOW,
"The host verification strategy for custom SSL; either 'ANY', 'COMMON', or 'STRICT'",
GROUP, -1, ConfigDef.Width.SHORT, "SSL Hostname Verifier", SSL_HOST_VERIFIER_RV)
.define(SSL_MUTUAL_AUTH, Type.BOOLEAN, null, Importance.LOW,
"Set this to true for 2-way SSL; defaults to 1-way SSL",
GROUP, -1, ConfigDef.Width.MEDIUM, "Use 2-way SSL");
.define(CONNECTION_SIMPLE_SSL, Type.BOOLEAN, null, Importance.LOW,
"Set to 'true' for a simple SSL strategy that uses the JVM's default SslContext and X509TrustManager and an 'any' host verification strategy",
GROUP, -1, ConfigDef.Width.MEDIUM, "Use Simple SSL")
.define(ENABLE_CUSTOM_SSL, Type.BOOLEAN, null, Importance.LOW,
"Set to 'true' to customize how an SSL connection is created. Only supported if securityContextType is 'BASIC' or 'DIGEST'.",
GROUP, -1, ConfigDef.Width.MEDIUM, "Enable Custom SSL")
.define(TLS_VERSION, Type.STRING, "TLS", Importance.LOW,
"The TLS version to use for custom SSL",
GROUP, -1, ConfigDef.Width.MEDIUM, "TLS Version for Custom SSL")
.define(SSL_HOST_VERIFIER, Type.STRING, "ANY", SSL_HOST_VERIFIER_RV, Importance.LOW,
"The host verification strategy for custom SSL; either 'ANY', 'COMMON', or 'STRICT'",
GROUP, -1, ConfigDef.Width.SHORT, "SSL Hostname Verifier", SSL_HOST_VERIFIER_RV)
.define(SSL_MUTUAL_AUTH, Type.BOOLEAN, null, Importance.LOW,
"Set this to true for 2-way SSL; defaults to 1-way SSL",
GROUP, -1, ConfigDef.Width.MEDIUM, "Use 2-way SSL");
}

protected MarkLogicConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@
import com.marklogic.client.io.DOMHandle;
import com.marklogic.kafka.connect.MarkLogicConnectorException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import org.w3c.dom.Element;
import org.w3c.dom.NamedNodeMap;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

import javax.xml.XMLConstants;
import javax.xml.transform.OutputKeys;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerConfigurationException;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
Expand All @@ -27,11 +31,13 @@

class XmlPlanInvoker extends AbstractPlanInvoker implements PlanInvoker {

private static final Logger logger = LoggerFactory.getLogger(XmlPlanInvoker.class);

private static final String TABLE_NS_URI = "http://marklogic.com/table";

// While a Transformer is not thread-safe and must therefore be created for each batch - though we could consider
// a pooling strategy in the future - a TransformerFactory is thread-safe and can thus be reused
private static final TransformerFactory transformerFactory = TransformerFactory.newInstance();
private static final TransformerFactory transformerFactory = makeNewTransformerFactory();

public XmlPlanInvoker(DatabaseClient client, Map<String, Object> parsedConfig) {
super(client, parsedConfig);
Expand Down Expand Up @@ -77,7 +83,7 @@ private String getKeyFromRow(Node row) {
NamedNodeMap attributes = column.getAttributes();
// The 'name' attribute is expected to exist; trust but verify
if (attributes != null && attributes.getNamedItem("name") != null &&
keyColumn.equals(attributes.getNamedItem("name").getTextContent())) {
keyColumn.equals(attributes.getNamedItem("name").getTextContent())) {
return column.getTextContent();
}
}
Expand All @@ -94,4 +100,35 @@ private String documentToString(Node newDoc, Transformer transformer) {
throw new MarkLogicConnectorException("Unable to transform XML to string: " + ex.getMessage(), ex);
}
}

private static TransformerFactory makeNewTransformerFactory() {
TransformerFactory factory = TransformerFactory.newInstance();
// Avoids Polaris warning related to
// https://cwe.mitre.org/data/definitions/611.html .
// From
// https://stackoverflow.com/questions/32178558/how-to-prevent-xml-external-entity-injection-on-transformerfactory
// .
try {
factory.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, true);
} catch (TransformerConfigurationException e) {
logTransformerFactoryWarning(XMLConstants.FEATURE_SECURE_PROCESSING, e.getMessage());
}
try {
factory.setAttribute(XMLConstants.ACCESS_EXTERNAL_DTD, "");
} catch (IllegalArgumentException e) {
logTransformerFactoryWarning(XMLConstants.ACCESS_EXTERNAL_DTD, e.getMessage());
}
try {
factory.setAttribute(XMLConstants.ACCESS_EXTERNAL_STYLESHEET, "");
} catch (IllegalArgumentException e) {
logTransformerFactoryWarning(XMLConstants.ACCESS_EXTERNAL_STYLESHEET, e.getMessage());
}
return factory;
}

private static void logTransformerFactoryWarning(String xmlConstant, String errorMessage) {
String baseTransformerFactoryWarningMessage = "Unable to set {} on TransformerFactory; cause: {}";
logger.warn(baseTransformerFactoryWarningMessage, xmlConstant, errorMessage);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void basicAuthenticationAndMutualSSL() {
config.put(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE, "basic");
config.put(MarkLogicSinkConfig.CONNECTION_SIMPLE_SSL, false);
config.put(MarkLogicSinkConfig.ENABLE_CUSTOM_SSL, true);
config.put(MarkLogicSinkConfig.TLS_VERSION, "TLSv1.2");
config.put(MarkLogicSinkConfig.TLS_VERSION, "TLS");
config.put(MarkLogicSinkConfig.SSL_HOST_VERIFIER, "STRICT");
config.put(MarkLogicSinkConfig.SSL_MUTUAL_AUTH, true);
config.put(MarkLogicSinkConfig.CONNECTION_CERT_FILE, absolutePath);
Expand All @@ -134,7 +134,7 @@ void basicAuthenticationAndMutualSSLWithInvalidHost() {
config.put(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE, "basic");
config.put(MarkLogicSinkConfig.CONNECTION_SIMPLE_SSL, false);
config.put(MarkLogicSinkConfig.ENABLE_CUSTOM_SSL, true);
config.put(MarkLogicSinkConfig.TLS_VERSION, "TLSv1.2");
config.put(MarkLogicSinkConfig.TLS_VERSION, "TLS");
config.put(MarkLogicSinkConfig.SSL_HOST_VERIFIER, "SOMETHING");
config.put(MarkLogicSinkConfig.SSL_MUTUAL_AUTH, true);
config.put(MarkLogicSinkConfig.CONNECTION_CERT_FILE, absolutePath);
Expand All @@ -154,7 +154,7 @@ void digestAuthenticationAnd1WaySSL() {
config.put(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE, "digest");
config.put(MarkLogicSinkConfig.CONNECTION_SIMPLE_SSL, false);
config.put(MarkLogicSinkConfig.ENABLE_CUSTOM_SSL, true);
config.put(MarkLogicSinkConfig.TLS_VERSION, "TLSv1.2");
config.put(MarkLogicSinkConfig.TLS_VERSION, "TLS");
config.put(MarkLogicSinkConfig.SSL_HOST_VERIFIER, "STRICT");
config.put(MarkLogicSinkConfig.SSL_MUTUAL_AUTH, false);

Expand Down