Skip to content

Commit d221e44

Browse files
BillFarberrjrudin
andauthored
Can now run a flow using DHF 5.2.0 (#24)
* Updating the AWS quickstart document. * Updating the AWS quickstart document. (#19) * Can now run a flow using DHF 5.2.0 (#21) The key changes are in MarkLogicSinkTask and RunFlowWriteBatchListener. See the comments in those classes for the changes. There are 3 new properties for using this - a DHF flow name, and optional set of steps, and an option for logging each flow response. If the flow name is set, then that flow will be run. Otherwise, only the regular ingestion happens. I had to make one plumbing change - DatabaseClientCreator is now DatabaseClientConfigBuilder, as I needed to reuse the DatabaseClientConfig object. That's a small plumbing change though that could be made independently of this change. Co-authored-by: Rob Rudin <[email protected]> * Updating the AWS quickstart document. (#23) Co-authored-by: rjrudin <[email protected]> Co-authored-by: Rob Rudin <[email protected]>
1 parent a02e836 commit d221e44

13 files changed

+539
-58
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ This is a connector for subscribing to Kafka queues and pushing messages to Mark
1010
#### To try this out locally:
1111

1212
1. Configure kafkaHome in gradle-local.properties - e.g. kafkaHome=/Users/myusername/tools/kafka_2.11-2.1.0
13-
1. Run "./gradlew deploy" to build a jar and copy it and the below property files into the appropriate Kafka directories
13+
1. Run "./gradlew clean deploy" to build a jar and copy it and the below property files into the appropriate Kafka directories
1414

1515
#### To try this out on a remote Kafka server
16-
1. Run "./gradlew jar" to build the jar.
16+
1. Run "./gradlew clean jar" to build the jar.
1717
1. Copy the jar to the <kafkaHome>/libs on the remote server.
1818
1. Copy the two properties (config/marklogic-connect-distributed.properties config/marklogic-sink.properties) to <kafkaHome>/config on the remote server.
1919

build.gradle

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,16 @@ configurations {
1818

1919
dependencies {
2020
compileOnly "org.apache.kafka:connect-api:2.3.0"
21-
compile ("com.marklogic:ml-javaclient-util:3.13.4") {
22-
// These are excluded simply to reduce the size of the connector; if included, they do not cause any issues
23-
exclude module: "jdom2"
24-
exclude module: "marklogic-xcc"
25-
exclude module: "spring-context"
21+
22+
compile ("com.marklogic:marklogic-data-hub:5.2.0") {
23+
// Excluding these because there's no need for them
24+
exclude module: "spring-boot-autoconfigure"
25+
exclude module: "spring-integration-http"
26+
exclude module: "jaeger-core"
27+
exclude module: "jaeger-thrift"
28+
29+
// Excluding because it causes Kafka Connect to complain mightily if included
30+
exclude module: "logback-classic"
2631
}
2732

2833
testCompile "org.junit.jupiter:junit-jupiter-api:5.3.0"

config/marklogic-sink.properties

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,5 +78,22 @@ ml.document.uriPrefix=/kafka-data/
7878
# Optional - a suffix to append to each URI
7979
ml.document.uriSuffix=.json
8080

81+
# Optional - name of a REST transform to use when writing documents
82+
# For Data Hub, can use mlRunIngest
8183
ml.dmsdk.transform=
82-
ml.dmsdk.transformParams=
84+
85+
# Optional - delimited set of transform names and values
86+
# Data Hub example = flow-name,ingestion_mapping_mastering-flow,step,1
87+
ml.dmsdk.transformParams=
88+
89+
# Optional - delimiter for transform parameter names and values
90+
ml.dmsdk.transformParamsDelimiter=,
91+
92+
# Properties for running a Data Hub flow
93+
# Using examples/dh-5-example in the DH project, could use the following config:
94+
# ml.datahub.flow.name=ingestion_mapping_mastering-flow
95+
# ml.datahub.flow.steps=2,3,4
96+
ml.datahub.flow.name=
97+
ml.datahub.flow.steps=
98+
# Whether or not the response data from running a flow should be logged at the info level
99+
ml.datahub.flow.logResponse=true
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.marklogic.kafka.connect;
2+
3+
import com.marklogic.client.ext.DatabaseClientConfig;
4+
5+
import java.util.Map;
6+
7+
/**
8+
* Defines how a map of properties read in by Kafka are used to build an instance of DatabaseClientConfig.
9+
*/
10+
public interface DatabaseClientConfigBuilder {
11+
12+
DatabaseClientConfig buildDatabaseClientConfig(Map<String, String> kafkaConfig);
13+
14+
}

src/main/java/com/marklogic/kafka/connect/DatabaseClientCreator.java

Lines changed: 0 additions & 17 deletions
This file was deleted.

src/main/java/com/marklogic/kafka/connect/DefaultDatabaseClientCreator.java renamed to src/main/java/com/marklogic/kafka/connect/DefaultDatabaseClientConfigBuilder.java

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,19 @@
11
package com.marklogic.kafka.connect;
22

33
import com.marklogic.client.DatabaseClient;
4-
import com.marklogic.client.ext.ConfiguredDatabaseClientFactory;
4+
import com.marklogic.client.DatabaseClientFactory;
55
import com.marklogic.client.ext.DatabaseClientConfig;
6-
import com.marklogic.client.ext.DefaultConfiguredDatabaseClientFactory;
76
import com.marklogic.client.ext.SecurityContextType;
87
import com.marklogic.kafka.connect.sink.MarkLogicSinkConfig;
98

109
import javax.net.ssl.SSLContext;
1110
import java.security.NoSuchAlgorithmException;
1211
import java.util.Map;
1312

14-
public class DefaultDatabaseClientCreator implements DatabaseClientCreator {
13+
public class DefaultDatabaseClientConfigBuilder implements DatabaseClientConfigBuilder {
1514

16-
private ConfiguredDatabaseClientFactory configuredDatabaseClientFactory;
17-
18-
public DefaultDatabaseClientCreator() {
19-
this.configuredDatabaseClientFactory = new DefaultConfiguredDatabaseClientFactory();
20-
}
21-
22-
/**
23-
* @param kafkaConfig
24-
* @return
25-
*/
2615
@Override
27-
public DatabaseClient createDatabaseClient(Map<String, String> kafkaConfig) {
28-
DatabaseClientConfig clientConfig = buildDatabaseClientConfig(kafkaConfig);
29-
return configuredDatabaseClientFactory.newDatabaseClient(clientConfig);
30-
}
31-
32-
protected DatabaseClientConfig buildDatabaseClientConfig(Map<String, String> kafkaConfig) {
16+
public DatabaseClientConfig buildDatabaseClientConfig(Map<String, String> kafkaConfig) {
3317
DatabaseClientConfig clientConfig = new DatabaseClientConfig();
3418
clientConfig.setCertFile(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_CERT_FILE));
3519
clientConfig.setCertPassword(kafkaConfig.get(MarkLogicSinkConfig.CONNECTION_CERT_PASSWORD));
@@ -76,11 +60,6 @@ protected void configureSimpleSsl(DatabaseClientConfig clientConfig) {
7660
throw new RuntimeException("Unable to get default SSLContext: " + e.getMessage(), e);
7761
}
7862

79-
clientConfig.setSslHostnameVerifier((hostname, cns, subjectAlts) -> {
80-
});
81-
}
82-
83-
public void setConfiguredDatabaseClientFactory(ConfiguredDatabaseClientFactory configuredDatabaseClientFactory) {
84-
this.configuredDatabaseClientFactory = configuredDatabaseClientFactory;
63+
clientConfig.setSslHostnameVerifier(DatabaseClientFactory.SSLHostnameVerifier.ANY);
8564
}
8665
}

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ public class MarkLogicSinkConfig extends AbstractConfig {
2424
public static final String CONNECTION_CERT_PASSWORD = "ml.connection.certPassword";
2525
public static final String CONNECTION_EXTERNAL_NAME = "ml.connection.externalName";
2626

27+
public static final String DATAHUB_FLOW_NAME = "ml.datahub.flow.name";
28+
public static final String DATAHUB_FLOW_STEPS = "ml.datahub.flow.steps";
29+
public static final String DATAHUB_FLOW_LOG_RESPONSE = "ml.datahub.flow.logResponse";
30+
2731
public static final String DMSDK_BATCH_SIZE = "ml.dmsdk.batchSize";
2832
public static final String DMSDK_THREAD_COUNT = "ml.dmsdk.threadCount";
2933
public static final String DMSDK_TRANSFORM = "ml.dmsdk.transform";
@@ -50,6 +54,9 @@ public class MarkLogicSinkConfig extends AbstractConfig {
5054
.define(CONNECTION_CERT_FILE, Type.STRING, Importance.LOW, "Path to a certificate file")
5155
.define(CONNECTION_CERT_PASSWORD, Type.STRING, Importance.LOW, "Password for the certificate file")
5256
.define(CONNECTION_EXTERNAL_NAME, Type.STRING, Importance.LOW, "External name for Kerberos authentication")
57+
.define(DATAHUB_FLOW_NAME, Type.STRING, null, Importance.MEDIUM, "Name of a Data Hub flow to run")
58+
.define(DATAHUB_FLOW_STEPS, Type.STRING, null, Importance.MEDIUM, "Comma-delimited names of steps to run")
59+
.define(DATAHUB_FLOW_LOG_RESPONSE, Type.BOOLEAN, false, Importance.LOW, "If set to true, the response from running a flow on each ingested batch will be logged at the info level")
5360
.define(DMSDK_BATCH_SIZE, Type.INT, 100, Importance.HIGH, "Number of documents to write in each batch")
5461
.define(DMSDK_THREAD_COUNT, Type.INT, 8, Importance.HIGH, "Number of threads for DMSDK to use")
5562
.define(DMSDK_TRANSFORM, Type.STRING, Importance.MEDIUM, "Name of a REST transform to use when writing documents")

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkTask.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,17 @@
44
import com.marklogic.client.datamovement.DataMovementManager;
55
import com.marklogic.client.datamovement.WriteBatcher;
66
import com.marklogic.client.document.ServerTransform;
7-
import com.marklogic.kafka.connect.DefaultDatabaseClientCreator;
7+
import com.marklogic.client.ext.DatabaseClientConfig;
8+
import com.marklogic.client.ext.DefaultConfiguredDatabaseClientFactory;
9+
import com.marklogic.kafka.connect.DefaultDatabaseClientConfigBuilder;
810
import org.apache.kafka.connect.sink.SinkRecord;
911
import org.apache.kafka.connect.sink.SinkTask;
1012
import org.slf4j.Logger;
1113
import org.slf4j.LoggerFactory;
1214

15+
import java.util.Arrays;
1316
import java.util.Collection;
17+
import java.util.List;
1418
import java.util.Map;
1519

1620
/**
@@ -32,7 +36,9 @@ public void start(final Map<String, String> config) {
3236

3337
sinkRecordConverter = new DefaultSinkRecordConverter(config);
3438

35-
databaseClient = new DefaultDatabaseClientCreator().createDatabaseClient(config);
39+
DatabaseClientConfig databaseClientConfig = new DefaultDatabaseClientConfigBuilder().buildDatabaseClientConfig(config);
40+
databaseClient = new DefaultConfiguredDatabaseClientFactory().newDatabaseClient(databaseClientConfig);
41+
3642
dataMovementManager = databaseClient.newDataMovementManager();
3743
writeBatcher = dataMovementManager.newWriteBatcher()
3844
.withBatchSize(Integer.parseInt(config.get(MarkLogicSinkConfig.DMSDK_BATCH_SIZE)))
@@ -43,11 +49,41 @@ public void start(final Map<String, String> config) {
4349
writeBatcher.withTransform(transform);
4450
}
4551

52+
final String flowName = config.get(MarkLogicSinkConfig.DATAHUB_FLOW_NAME);
53+
if (flowName != null && flowName.trim().length() > 0) {
54+
writeBatcher.onBatchSuccess(buildSuccessListener(flowName, config, databaseClientConfig));
55+
}
56+
4657
dataMovementManager.startJob(writeBatcher);
4758

4859
logger.info("Started");
4960
}
5061

62+
/**
63+
* This is all specific to Kafka, as it involves reading inputs from the Kafka config map and then using them to
64+
* construct the reusable RunFlowWriteBatchListener.
65+
*
66+
* @param flowName
67+
* @param kafkaConfig
68+
* @param databaseClientConfig
69+
*/
70+
protected RunFlowWriteBatchListener buildSuccessListener(String flowName, Map<String, String> kafkaConfig, DatabaseClientConfig databaseClientConfig) {
71+
String logMessage = String.format("After ingesting a batch, will run flow '%s'", flowName);
72+
final String flowSteps = kafkaConfig.get(MarkLogicSinkConfig.DATAHUB_FLOW_STEPS);
73+
List<String> steps = null;
74+
if (flowSteps != null && flowSteps.trim().length() > 0) {
75+
steps = Arrays.asList(flowSteps.split(","));
76+
logMessage += String.format(" with steps '%s' constrained to the URIs in that batch", steps.toString());
77+
}
78+
logger.info(logMessage);
79+
80+
RunFlowWriteBatchListener listener = new RunFlowWriteBatchListener(flowName, steps, databaseClientConfig);
81+
if (kafkaConfig.containsKey(MarkLogicSinkConfig.DATAHUB_FLOW_LOG_RESPONSE)) {
82+
listener.setLogResponse(Boolean.parseBoolean(kafkaConfig.get(MarkLogicSinkConfig.DATAHUB_FLOW_LOG_RESPONSE)));
83+
}
84+
return listener;
85+
}
86+
5187
/**
5288
* Builds a REST ServerTransform object based on the DMSDK parameters in the given config. If no transform name
5389
* is configured, then null will be returned.
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package com.marklogic.kafka.connect.sink;
2+
3+
import com.marklogic.client.datamovement.WriteBatch;
4+
import com.marklogic.client.datamovement.WriteBatchListener;
5+
import com.marklogic.client.datamovement.WriteEvent;
6+
import com.marklogic.client.ext.DatabaseClientConfig;
7+
import com.marklogic.client.ext.helper.LoggingObject;
8+
import com.marklogic.hub.flow.FlowInputs;
9+
import com.marklogic.hub.flow.FlowRunner;
10+
import com.marklogic.hub.flow.RunFlowResponse;
11+
import com.marklogic.hub.flow.impl.FlowRunnerImpl;
12+
13+
import java.util.HashMap;
14+
import java.util.List;
15+
import java.util.Map;
16+
17+
/**
18+
* This class is not specific to Kafka and theoretically could be moved to DHF.
19+
*/
20+
public class RunFlowWriteBatchListener extends LoggingObject implements WriteBatchListener {
21+
22+
private String flowName;
23+
private List<String> steps;
24+
private DatabaseClientConfig databaseClientConfig;
25+
private boolean logResponse;
26+
27+
/**
28+
* The flowName and steps are assumed to have been read in by the client that is reading from system configuration
29+
* - in the Kafka case, this will be from the Kafka config map that is passed to a source task.
30+
* <p>
31+
* The DatabaseClientConfig object is needed because it's not yet possible for DHF to reuse the DatabaseClient that
32+
* Kafka constructs. While it's assumed that that DatabaseClient will write to staging, DHF needs to be able to
33+
* connect to the staging, final, and job app servers. And in order to do that, it needs all of the authentication
34+
* information that can be held by a DatabaseClientConfig. Though as of 5.2.0, DHF only supports basic/digest
35+
* authentication, and thus it's assumed that username/password will be used for authentication.
36+
*
37+
* @param flowName required name of the flow to run
38+
* @param steps optional list of steps
39+
* @param databaseClientConfig
40+
*/
41+
public RunFlowWriteBatchListener(String flowName, List<String> steps, DatabaseClientConfig databaseClientConfig) {
42+
this.databaseClientConfig = databaseClientConfig;
43+
this.flowName = flowName;
44+
this.steps = steps;
45+
}
46+
47+
/**
48+
* None of this is specific to Kafka. It assumes a pattern of - given the URIs that were just ingested (and are
49+
* available in the given WriteBatch), override the source query for each step to be executed with a document query
50+
* that constrains on those URIs.
51+
* <p>
52+
* The need to construct a source query is unfortunate. When DHF executes a non-ingestion step, it always runs the
53+
* collector. Thus, it's not yet possible to tell DHF - just process these URIs (specifically, it's not yet
54+
* possible to do that via FlowRunner). So it's necessary to use the URIs to construct a document query and override
55+
* each step's source query with that. Ideally, DHF can be enhanced here so a client can just pass in the URIs to
56+
* process, and then there's no call to the collector nor need to override the source query.
57+
*
58+
* @param batch
59+
*/
60+
@Override
61+
public void processEvent(WriteBatch batch) {
62+
FlowInputs inputs = buildFlowInputs(batch);
63+
64+
// DHF 5.2.0 only supports basic/digest auth, so this can safely be done.
65+
FlowRunner flowRunner = new FlowRunnerImpl(
66+
databaseClientConfig.getHost(),
67+
databaseClientConfig.getUsername(),
68+
databaseClientConfig.getPassword()
69+
);
70+
71+
RunFlowResponse response = flowRunner.runFlow(inputs);
72+
flowRunner.awaitCompletion();
73+
if (logResponse) {
74+
logger.info(format("Flow response for batch number %d:\n%s", batch.getJobBatchNumber(), response.toJson()));
75+
}
76+
}
77+
78+
protected FlowInputs buildFlowInputs(WriteBatch batch) {
79+
FlowInputs inputs = new FlowInputs(flowName);
80+
if (steps != null) {
81+
inputs.setSteps(steps);
82+
}
83+
inputs.setJobId(batch.getBatcher().getJobId() + "-" + batch.getJobBatchNumber());
84+
85+
Map<String, Object> options = new HashMap<>();
86+
options.put("sourceQuery", buildSourceQuery(batch));
87+
inputs.setOptions(options);
88+
89+
return inputs;
90+
}
91+
92+
protected String buildSourceQuery(WriteBatch batch) {
93+
StringBuilder sb = new StringBuilder("cts.documentQuery([");
94+
boolean firstOne = true;
95+
for (WriteEvent event : batch.getItems()) {
96+
if (!firstOne) {
97+
sb.append(",");
98+
}
99+
sb.append(String.format("'%s'", event.getTargetUri()));
100+
firstOne = false;
101+
}
102+
return sb.append("])").toString();
103+
}
104+
105+
public void setLogResponse(boolean logResponse) {
106+
this.logResponse = logResponse;
107+
}
108+
109+
public String getFlowName() {
110+
return flowName;
111+
}
112+
113+
public List<String> getSteps() {
114+
return steps;
115+
}
116+
117+
public DatabaseClientConfig getDatabaseClientConfig() {
118+
return databaseClientConfig;
119+
}
120+
121+
public boolean isLogResponse() {
122+
return logResponse;
123+
}
124+
}

0 commit comments

Comments
 (0)