Skip to content

Commit cfd6591

Browse files
committed
Merge remote-tracking branch 'remotes/origin/develop' into develop
2 parents 5fb66dc + f5c321b commit cfd6591

13 files changed

+539
-58
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ This is a connector for subscribing to Kafka queues and pushing messages to Mark
1010
#### To try this out locally:
1111

1212
1. Configure kafkaHome in gradle-local.properties - e.g. kafkaHome=/Users/myusername/tools/kafka_2.11-2.1.0
13-
1. Run "./gradlew deploy" to build a jar and copy it and the below property files into the appropriate Kafka directories
13+
1. Run "./gradlew clean deploy" to build a jar and copy it and the below property files into the appropriate Kafka directories
1414

1515
#### To try this out on a remote Kafka server
16-
1. Run "./gradlew jar" to build the jar.
16+
1. Run "./gradlew clean jar" to build the jar.
1717
1. Copy the jar to the <kafkaHome>/libs on the remote server.
1818
1. Copy the two properties (config/marklogic-connect-distributed.properties config/marklogic-sink.properties) to <kafkaHome>/config on the remote server.
1919

build.gradle

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,16 @@ configurations {
1818

1919
dependencies {
2020
compileOnly "org.apache.kafka:connect-api:2.3.0"
21-
compile ("com.marklogic:ml-javaclient-util:3.13.4") {
22-
// These are excluded simply to reduce the size of the connector; if included, they do not cause any issues
23-
exclude module: "jdom2"
24-
exclude module: "marklogic-xcc"
25-
exclude module: "spring-context"
21+
22+
compile ("com.marklogic:marklogic-data-hub:5.2.0") {
23+
// Excluding these because there's no need for them
24+
exclude module: "spring-boot-autoconfigure"
25+
exclude module: "spring-integration-http"
26+
exclude module: "jaeger-core"
27+
exclude module: "jaeger-thrift"
28+
29+
// Excluding because it causes Kafka Connect to complain mightily if included
30+
exclude module: "logback-classic"
2631
}
2732

2833
testCompile "org.junit.jupiter:junit-jupiter-api:5.3.0"

config/marklogic-sink.properties

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,5 +78,22 @@ ml.document.uriPrefix=/kafka-data/
7878
# Optional - a suffix to append to each URI
7979
ml.document.uriSuffix=.json
8080

81+
# Optional - name of a REST transform to use when writing documents
82+
# For Data Hub, can use mlRunIngest
8183
ml.dmsdk.transform=
82-
ml.dmsdk.transformParams=
84+
85+
# Optional - delimited set of transform names and values
86+
# Data Hub example = flow-name,ingestion_mapping_mastering-flow,step,1
87+
ml.dmsdk.transformParams=
88+
89+
# Optional - delimiter for transform parameter names and values
90+
ml.dmsdk.transformParamsDelimiter=,
91+
92+
# Properties for running a Data Hub flow
93+
# Using examples/dh-5-example in the DH project, could use the following config:
94+
# ml.datahub.flow.name=ingestion_mapping_mastering-flow
95+
# ml.datahub.flow.steps=2,3,4
96+
ml.datahub.flow.name=
97+
ml.datahub.flow.steps=
98+
# Whether or not the response data from running a flow should be logged at the info level
99+
ml.datahub.flow.logResponse=true
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.marklogic.kafka.connect;
2+
3+
import com.marklogic.client.ext.DatabaseClientConfig;
4+
5+
import java.util.Map;
6+
7+
/**
8+
* Defines how a map of properties read in by Kafka are used to build an instance of DatabaseClientConfig.
9+
*/
10+
public interface DatabaseClientConfigBuilder {
11+
12+
DatabaseClientConfig buildDatabaseClientConfig(Map<String, String> kafkaConfig);
13+
14+
}

src/main/java/com/marklogic/kafka/connect/DatabaseClientCreator.java

Lines changed: 0 additions & 17 deletions
This file was deleted.

src/main/java/com/marklogic/kafka/connect/DefaultDatabaseClientCreator.java renamed to src/main/java/com/marklogic/kafka/connect/DefaultDatabaseClientConfigBuilder.java

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,19 @@
11
package com.marklogic.kafka.connect;
22

33
import com.marklogic.client.DatabaseClient;
4-
import com.marklogic.client.ext.ConfiguredDatabaseClientFactory;
4+
import com.marklogic.client.DatabaseClientFactory;
55
import com.marklogic.client.ext.DatabaseClientConfig;
6-
import com.marklogic.client.ext.DefaultConfiguredDatabaseClientFactory;
76
import com.marklogic.client.ext.SecurityContextType;
87
import com.marklogic.kafka.connect.sink.MarkLogicSinkConfig;
98

109
import javax.net.ssl.SSLContext;
1110
import java.security.NoSuchAlgorithmException;
1211
import java.util.Map;
1312

14-
public class DefaultDatabaseClientCreator implements DatabaseClientCreator {
13+
public class DefaultDatabaseClientConfigBuilder implements DatabaseClientConfigBuilder {
1514

16-
private ConfiguredDatabaseClientFactory configuredDatabaseClientFactory;
17-
18-
public DefaultDatabaseClientCreator() {
19-
this.configuredDatabaseClientFactory = new DefaultConfiguredDatabaseClientFactory();
20-
}
21-
22-
/**
23-
* @param kafkaConfig
24-
* @return
25-
*/
2615
@Override
27-
public DatabaseClient createDatabaseClient(Map<String, String> kafkaConfig) {
28-
DatabaseClientConfig clientConfig = buildDatabaseClientConfig(kafkaConfig);
29-
return configuredDatabaseClientFactory.newDatabaseClient(clientConfig);
30-
}
31-
32-
protected DatabaseClientConfig buildDatabaseClientConfig(Map<String, String> kafkaConfig) {
16+
public DatabaseClientConfig buildDatabaseClientConfig(Map<String, String> kafkaConfig) {
3317
DatabaseClientConfig clientConfig = new DatabaseClientConfig();
3418
clientConfig.setCertFile(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_CERT_FILE));
3519
clientConfig.setCertPassword(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD));
@@ -76,11 +60,6 @@ protected void configureSimpleSsl(DatabaseClientConfig clientConfig) {
7660
throw new RuntimeException("Unable to get default SSLContext: " + e.getMessage(), e);
7761
}
7862

79-
clientConfig.setSslHostnameVerifier((hostname, cns, subjectAlts) -> {
80-
});
81-
}
82-
83-
public void setConfiguredDatabaseClientFactory(ConfiguredDatabaseClientFactory configuredDatabaseClientFactory) {
84-
this.configuredDatabaseClientFactory = configuredDatabaseClientFactory;
63+
clientConfig.setSslHostnameVerifier(DatabaseClientFactory.SSLHostnameVerifier.ANY);
8564
}
8665
}

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ public class MarkLogicSinkConfig extends AbstractConfig {
2424
public static final String CONNECTION_CERT_PASSWORD = "ml.connection.certPassword";
2525
public static final String CONNECTION_EXTERNAL_NAME = "ml.connection.externalName";
2626

27+
public static final String DATAHUB_FLOW_NAME = "ml.datahub.flow.name";
28+
public static final String DATAHUB_FLOW_STEPS = "ml.datahub.flow.steps";
29+
public static final String DATAHUB_FLOW_LOG_RESPONSE = "ml.datahub.flow.logResponse";
30+
2731
public static final String DMSDK_BATCH_SIZE = "ml.dmsdk.batchSize";
2832
public static final String DMSDK_THREAD_COUNT = "ml.dmsdk.threadCount";
2933
public static final String DMSDK_TRANSFORM = "ml.dmsdk.transform";
@@ -50,6 +54,9 @@ public class MarkLogicSinkConfig extends AbstractConfig {
5054
.define(CONNECTION_CERT_FILE, Type.STRING, Importance.LOW, "Path to a certificate file")
5155
.define(CONNECTION_CERT_PASSWORD, Type.STRING, Importance.LOW, "Password for the certificate file")
5256
.define(CONNECTION_EXTERNAL_NAME, Type.STRING, Importance.LOW, "External name for Kerberos authentication")
57+
.define(DATAHUB_FLOW_NAME, Type.STRING, null, Importance.MEDIUM, "Name of a Data Hub flow to run")
58+
.define(DATAHUB_FLOW_STEPS, Type.STRING, null, Importance.MEDIUM, "Comma-delimited names of steps to run")
59+
.define(DATAHUB_FLOW_LOG_RESPONSE, Type.BOOLEAN, false, Importance.LOW, "If set to true, the response from running a flow on each ingested batch will be logged at the info level")
5360
.define(DMSDK_BATCH_SIZE, Type.INT, 100, Importance.HIGH, "Number of documents to write in each batch")
5461
.define(DMSDK_THREAD_COUNT, Type.INT, 8, Importance.HIGH, "Number of threads for DMSDK to use")
5562
.define(DMSDK_TRANSFORM, Type.STRING, Importance.MEDIUM, "Name of a REST transform to use when writing documents")

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkTask.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,17 @@
44
import com.marklogic.client.datamovement.DataMovementManager;
55
import com.marklogic.client.datamovement.WriteBatcher;
66
import com.marklogic.client.document.ServerTransform;
7-
import com.marklogic.kafka.connect.DefaultDatabaseClientCreator;
7+
import com.marklogic.client.ext.DatabaseClientConfig;
8+
import com.marklogic.client.ext.DefaultConfiguredDatabaseClientFactory;
9+
import com.marklogic.kafka.connect.DefaultDatabaseClientConfigBuilder;
810
import org.apache.kafka.connect.sink.SinkRecord;
911
import org.apache.kafka.connect.sink.SinkTask;
1012
import org.slf4j.Logger;
1113
import org.slf4j.LoggerFactory;
1214

15+
import java.util.Arrays;
1316
import java.util.Collection;
17+
import java.util.List;
1418
import java.util.Map;
1519

1620
/**
@@ -32,7 +36,9 @@ public void start(final Map<String, String> config) {
3236

3337
sinkRecordConverter = new DefaultSinkRecordConverter(config);
3438

35-
databaseClient = new DefaultDatabaseClientCreator().createDatabaseClient(config);
39+
DatabaseClientConfig databaseClientConfig = new DefaultDatabaseClientConfigBuilder().buildDatabaseClientConfig(config);
40+
databaseClient = new DefaultConfiguredDatabaseClientFactory().newDatabaseClient(databaseClientConfig);
41+
3642
dataMovementManager = databaseClient.newDataMovementManager();
3743
writeBatcher = dataMovementManager.newWriteBatcher()
3844
.withBatchSize(Integer.parseInt(config.get(MarkLogicSinkConfig.DMSDK_BATCH_SIZE)))
@@ -43,11 +49,41 @@ public void start(final Map<String, String> config) {
4349
writeBatcher.withTransform(transform);
4450
}
4551

52+
final String flowName = config.get(MarkLogicSinkConfig.DATAHUB_FLOW_NAME);
53+
if (flowName != null && flowName.trim().length() > 0) {
54+
writeBatcher.onBatchSuccess(buildSuccessListener(flowName, config, databaseClientConfig));
55+
}
56+
4657
dataMovementManager.startJob(writeBatcher);
4758

4859
logger.info("Started");
4960
}
5061

62+
/**
63+
* This is all specific to Kafka, as it involves reading inputs from the Kafka config map and then using them to
64+
* construct the reusable RunFlowWriteBatchListener.
65+
*
66+
* @param flowName
67+
* @param kafkaConfig
68+
* @param databaseClientConfig
69+
*/
70+
protected RunFlowWriteBatchListener buildSuccessListener(String flowName, Map<String, String> kafkaConfig, DatabaseClientConfig databaseClientConfig) {
71+
String logMessage = String.format("After ingesting a batch, will run flow '%s'", flowName);
72+
final String flowSteps = kafkaConfig.get(MarkLogicSinkConfig.DATAHUB_FLOW_STEPS);
73+
List<String> steps = null;
74+
if (flowSteps != null && flowSteps.trim().length() > 0) {
75+
steps = Arrays.asList(flowSteps.split(","));
76+
logMessage += String.format(" with steps '%s' constrained to the URIs in that batch", steps.toString());
77+
}
78+
logger.info(logMessage);
79+
80+
RunFlowWriteBatchListener listener = new RunFlowWriteBatchListener(flowName, steps, databaseClientConfig);
81+
if (kafkaConfig.containsKey(MarkLogicSinkConfig.DATAHUB_FLOW_LOG_RESPONSE)) {
82+
listener.setLogResponse(Boolean.parseBoolean(kafkaConfig.get(MarkLogicSinkConfig.DATAHUB_FLOW_LOG_RESPONSE)));
83+
}
84+
return listener;
85+
}
86+
5187
/**
5288
* Builds a REST ServerTransform object based on the DMSDK parameters in the given config. If no transform name
5389
* is configured, then null will be returned.
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package com.marklogic.kafka.connect.sink;
2+
3+
import com.marklogic.client.datamovement.WriteBatch;
4+
import com.marklogic.client.datamovement.WriteBatchListener;
5+
import com.marklogic.client.datamovement.WriteEvent;
6+
import com.marklogic.client.ext.DatabaseClientConfig;
7+
import com.marklogic.client.ext.helper.LoggingObject;
8+
import com.marklogic.hub.flow.FlowInputs;
9+
import com.marklogic.hub.flow.FlowRunner;
10+
import com.marklogic.hub.flow.RunFlowResponse;
11+
import com.marklogic.hub.flow.impl.FlowRunnerImpl;
12+
13+
import java.util.HashMap;
14+
import java.util.List;
15+
import java.util.Map;
16+
17+
/**
18+
* This class is not specific to Kafka and theoretically could be moved to DHF.
19+
*/
20+
public class RunFlowWriteBatchListener extends LoggingObject implements WriteBatchListener {
21+
22+
private String flowName;
23+
private List<String> steps;
24+
private DatabaseClientConfig databaseClientConfig;
25+
private boolean logResponse;
26+
27+
/**
28+
* The flowName and steps are assumed to have been read in by the client that is reading from system configuration
29+
* - in the Kafka case, this will be from the Kafka config map that is passed to a source task.
30+
* <p>
31+
* The DatabaseClientConfig object is needed because it's not yet possible for DHF to reuse the DatabaseClient that
32+
* Kafka constructs. While it's assumed that that DatabaseClient will write to staging, DHF needs to be able to
33+
* connect to the staging, final, and job app servers. And in order to do that, it needs all of the authentication
34+
* information that can be held by a DatabaseClientConfig. Though as of 5.2.0, DHF only supports basic/digest
35+
* authentication, and thus it's assumed that username/password will be used for authentication.
36+
*
37+
* @param flowName required name of the flow to run
38+
* @param steps optional list of steps
39+
* @param databaseClientConfig
40+
*/
41+
public RunFlowWriteBatchListener(String flowName, List<String> steps, DatabaseClientConfig databaseClientConfig) {
42+
this.databaseClientConfig = databaseClientConfig;
43+
this.flowName = flowName;
44+
this.steps = steps;
45+
}
46+
47+
/**
48+
* None of this is specific to Kafka. It assumes a pattern of - given the URIs that were just ingested (and are
49+
* available in the given WriteBatch), override the source query for each step to be executed with a document query
50+
* that constrains on those URIs.
51+
* <p>
52+
* The need to construct a source query is unfortunate. When DHF executes a non-ingestion step, it always runs the
53+
* collector. Thus, it's not yet possible to tell DHF - just process these URIs (specifically, it's not yet
54+
* possible to do that via FlowRunner). So it's necessary to use the URIs to construct a document query and override
55+
* each step's source query with that. Ideally, DHF can be enhanced here so a client can just pass in the URIs to
56+
* process, and then there's no call to the collector nor need to override the source query.
57+
*
58+
* @param batch
59+
*/
60+
@Override
61+
public void processEvent(WriteBatch batch) {
62+
FlowInputs inputs = buildFlowInputs(batch);
63+
64+
// DHF 5.2.0 only supports basic/digest auth, so this can safely be done.
65+
FlowRunner flowRunner = new FlowRunnerImpl(
66+
databaseClientConfig.getHost(),
67+
databaseClientConfig.getUsername(),
68+
databaseClientConfig.getPassword()
69+
);
70+
71+
RunFlowResponse response = flowRunner.runFlow(inputs);
72+
flowRunner.awaitCompletion();
73+
if (logResponse) {
74+
logger.info(format("Flow response for batch number %d:\n%s", batch.getJobBatchNumber(), response.toJson()));
75+
}
76+
}
77+
78+
protected FlowInputs buildFlowInputs(WriteBatch batch) {
79+
FlowInputs inputs = new FlowInputs(flowName);
80+
if (steps != null) {
81+
inputs.setSteps(steps);
82+
}
83+
inputs.setJobId(batch.getBatcher().getJobId() + "-" + batch.getJobBatchNumber());
84+
85+
Map<String, Object> options = new HashMap<>();
86+
options.put("sourceQuery", buildSourceQuery(batch));
87+
inputs.setOptions(options);
88+
89+
return inputs;
90+
}
91+
92+
protected String buildSourceQuery(WriteBatch batch) {
93+
StringBuilder sb = new StringBuilder("cts.documentQuery([");
94+
boolean firstOne = true;
95+
for (WriteEvent event : batch.getItems()) {
96+
if (!firstOne) {
97+
sb.append(",");
98+
}
99+
sb.append(String.format("'%s'", event.getTargetUri()));
100+
firstOne = false;
101+
}
102+
return sb.append("])").toString();
103+
}
104+
105+
public void setLogResponse(boolean logResponse) {
106+
this.logResponse = logResponse;
107+
}
108+
109+
public String getFlowName() {
110+
return flowName;
111+
}
112+
113+
public List<String> getSteps() {
114+
return steps;
115+
}
116+
117+
public DatabaseClientConfig getDatabaseClientConfig() {
118+
return databaseClientConfig;
119+
}
120+
121+
public boolean isLogResponse() {
122+
return logResponse;
123+
}
124+
}

0 commit comments

Comments
 (0)