Skip to content

Commit 9d81f46

Browse files
authored
Merge pull request #65 from marklogic-community/feature/50-temporal
Can now configure a temporal collection
2 parents 1b0f002 + c497fe0 commit 9d81f46

File tree

14 files changed

+373
-40
lines changed

14 files changed

+373
-40
lines changed

CONTRIBUTING.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,29 @@
11
This guide describes how to develop and contribute pull requests to this connector.
22

3+
# Running the test suite
4+
5+
The test suite for the MarkLogic Kafka connector, found at `src/test/resources`, requires that an application first be
6+
deployed to a MarkLogic instance. This application is deployed via
7+
[ml-gradle](https://github.com/marklogic-community/ml-gradle). Before deploying, first create `gradle-local.properties`
8+
if it does not yet exist in the root directory of this project and configure `mlPassword` for your `admin` user - e.g.
9+
10+
mlPassword=changeme
11+
12+
Then deploy the application:
13+
14+
./gradlew -i mlDeploy
15+
16+
The application deploys a single REST API app server listening on port 8019; please ensure you have this port available
17+
before deploying.
18+
19+
You can then run the tests via:
20+
21+
./gradlew test
22+
23+
Alternatively, you can import this project into an IDE such as IntelliJ and run each of the tests found under
24+
`src/test/java`.
25+
26+
327
# Testing with Confluent Platform
428

529
[Confluent Platform](https://docs.confluent.io/platform/7.2.1/overview.html) provides an easy mechanism for running

README.md

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -77,29 +77,30 @@ When a document is received and written by the connector, you'll see logging lik
7777
| offset.flush.interval.ms | 10000 | Interval at which to try committing offsets for tasks. |
7878

7979
#### MarkLogic-specific properties are defined in config/marklogic-sink.properties
80-
| Property | Default Value | Description |
81-
|:-------- |:--------------|:------------|
82-
| name | marklogic-sink | The name of the connector |
83-
| connector.class | <div>com.marklogic.kafka.connect.</div>sink.MarkLogicSinkConnector | The FQ name of the connector class |
84-
| tasks.max | 1 | The maximum number of concurrent tasks |
85-
| topics | marklogic | The name of the topic(s) to subscribe to |
86-
| ml.connection.host | localhost | A MarkLogic host to connect to. The connector uses the Data Movement SDK, and thus it will connect to each of the hosts in a cluster. |
87-
| ml.connection.port | 8000 | The port of a REST API server to connect to. |
88-
| ml.connection.database | Documents | Optional - the name of a database to connect to. If your REST API server has a content database matching that of the one that you want to write documents to, you do not need to set this. |
89-
| ml.connection.type | (empty) | Optional - set to "gateway" when using a load balancer, else leave blank. See https://docs.marklogic.com/guide/java/data-movement#id_26583 for more information. |
90-
| ml.connection.securityContextType | DIGEST | Either DIGEST, BASIC, CERTIFICATE, KERBEROS, or NONE |
91-
| ml.connection.username | admin | MarkLogic username |
92-
| ml.connection.password | admin | MarkLogic password |
93-
| ml.connection.certFile | (empty) | Certificate file for Certificate based authentication |
94-
| ml.connection.certPassword | (empty) | Certificate password for Certificate based authentication |
95-
| ml.connection.externalName | (empty) | The external name to use to connect to MarkLogic |
96-
| ml.connection.simpleSsl | false | Set to "true" for a "simple" SSL strategy that uses the JVM's default SslContext and X509TrustManager and a "trust everything" HostnameVerifier. Further customization of an SSL connection via properties is not supported. If you need to do so, consider using the source code for this connector as a starting point. |
97-
| ml.dmsdk.batchSize | 100 | Sets the number of documents to be written in a batch to MarkLogic. This may not have any impact depending on the connector receives data from Kafka, as the connector calls flushAsync on the DMSDK WriteBatcher after processing every collection of records. Thus, if the connector never receives at one time more than the value of this property, then the value of this property will have no impact. |
98-
| ml.dmsdk.threadCount | 8 | Sets the number of threads used by the Data Movement SDK for parallelizing writes to MarkLogic. Similar to the batch size property above, this may never come into play depending on how many records the connector receives at once. |
99-
| ml.document.collections | kafka-data | Optional - a comma-separated list of collections that each document should be written to |
100-
| ml.document.addTopicToCollections | false | Set this to true so that the name of the topic that the connector reads from is added as a collection to each document inserted by the connector
101-
| ml.document.format | JSON | Optional - specify the format of each document; either JSON, XML, BINARY, TEXT, or UNKNOWN |
102-
| ml.document.mimeType | (empty) | Optional - specify a mime type for each document; typically the format property above will be used instead of this |
103-
| ml.document.permissions | rest-reader,read,rest-writer,update | Optional - a comma-separated list of roles and capabilities that define the permissions for each document written to MarkLogic |
104-
| ml.document.uriPrefix | /kafka-data/ | Optional - a prefix to prepend to each URI; the URI itself is a UUID |
105-
| ml.document.uriSuffix | .json | Optional - a suffix to append to each URI |
80+
| Property | Default Value | Description |
81+
|:----------------------------------|:-------------------------------------------------------------------|:------------|
82+
| name | marklogic-sink | The name of the connector |
83+
| connector.class | <div>com.marklogic.kafka.connect.</div>sink.MarkLogicSinkConnector | The FQ name of the connector class |
84+
| tasks.max | 1 | The maximum number of concurrent tasks |
85+
| topics | marklogic | The name of the topic(s) to subscribe to |
86+
| ml.connection.host | localhost | A MarkLogic host to connect to. The connector uses the Data Movement SDK, and thus it will connect to each of the hosts in a cluster. |
87+
| ml.connection.port | 8000 | The port of a REST API server to connect to. |
88+
| ml.connection.database | Documents | Optional - the name of a database to connect to. If your REST API server has a content database matching that of the one that you want to write documents to, you do not need to set this. |
89+
| ml.connection.type | (empty) | Optional - set to "gateway" when using a load balancer, else leave blank. See https://docs.marklogic.com/guide/java/data-movement#id_26583 for more information. |
90+
| ml.connection.securityContextType | DIGEST | Either DIGEST, BASIC, CERTIFICATE, KERBEROS, or NONE |
91+
| ml.connection.username | admin | MarkLogic username |
92+
| ml.connection.password | admin | MarkLogic password |
93+
| ml.connection.certFile | (empty) | Certificate file for Certificate based authentication |
94+
| ml.connection.certPassword | (empty) | Certificate password for Certificate based authentication |
95+
| ml.connection.externalName | (empty) | The external name to use to connect to MarkLogic |
96+
| ml.connection.simpleSsl | false | Set to "true" for a "simple" SSL strategy that uses the JVM's default SslContext and X509TrustManager and a "trust everything" HostnameVerifier. Further customization of an SSL connection via properties is not supported. If you need to do so, consider using the source code for this connector as a starting point. |
97+
| ml.dmsdk.batchSize | 100 | Sets the number of documents to be written in a batch to MarkLogic. This may not have any impact depending on the connector receives data from Kafka, as the connector calls flushAsync on the DMSDK WriteBatcher after processing every collection of records. Thus, if the connector never receives at one time more than the value of this property, then the value of this property will have no impact. |
98+
| ml.dmsdk.threadCount | 8 | Sets the number of threads used by the Data Movement SDK for parallelizing writes to MarkLogic. Similar to the batch size property above, this may never come into play depending on how many records the connector receives at once. |
99+
| ml.document.collections | kafka-data | Optional - a comma-separated list of collections that each document should be written to |
100+
| ml.document.addTopicToCollections | false | Set this to true so that the name of the topic that the connector reads from is added as a collection to each document inserted by the connector |
101+
| ml.document.temporalCollection | (empty) | Specify the name of a temporal collection for documents to be inserted into |
102+
| ml.document.format | JSON | Optional - specify the format of each document; either JSON, XML, BINARY, TEXT, or UNKNOWN |
103+
| ml.document.mimeType | (empty) | Optional - specify a mime type for each document; typically the format property above will be used instead of this |
104+
| ml.document.permissions | rest-reader,read,rest-writer,update | Optional - a comma-separated list of roles and capabilities that define the permissions for each document written to MarkLogic |
105+
| ml.document.uriPrefix | /kafka-data/ | Optional - a prefix to prepend to each URI; the URI itself is a UUID |
106+
| ml.document.uriSuffix | .json | Optional - a suffix to append to each URI |

build.gradle

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ plugins {
33
id 'net.saliman.properties' version '1.5.2'
44
id 'com.github.johnrengelman.shadow' version '7.1.2'
55
id "com.github.jk1.dependency-license-report" version "1.3"
6+
7+
// Only used for testing
8+
id 'com.marklogic.ml-gradle' version '4.3.5'
69
}
710

811
java {
@@ -12,6 +15,9 @@ java {
1215

1316
repositories {
1417
mavenCentral()
18+
19+
// For testing
20+
mavenLocal()
1521
}
1622

1723
configurations {
@@ -40,14 +46,15 @@ dependencies {
4046
exclude module: "logback-classic"
4147
}
4248

43-
testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0'
49+
// 1.2.1 will be published soon, and then we'll switch to that
50+
testImplementation 'com.marklogic:marklogic-junit5:1.2.1-SNAPSHOT'
51+
4452
testImplementation "org.apache.kafka:connect-api:${kafkaVersion}"
4553
testImplementation "org.apache.kafka:connect-json:${kafkaVersion}"
4654

4755
// Forcing logback to be used for test logging
4856
testImplementation "ch.qos.logback:logback-classic:1.2.11"
4957
testImplementation "org.slf4j:jcl-over-slf4j:1.7.36"
50-
testImplementation "org.slf4j:slf4j-api:1.7.36"
5158

5259
documentation files('LICENSE.txt')
5360
documentation files('NOTICE.txt')

config/marklogic-sink.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ ml.document.collections=kafka-data
6363
# Optional - set this to true so that the name of the topic that the connector reads from is added as a collection to each document inserted by the connector
6464
ml.document.addTopicToCollections=false
6565

66+
# Specify the name of a temporal collection for documents to be inserted into
67+
ml.document.temporalCollection=
68+
6669
# Optional - specify the format of each document; either JSON, XML, BINARY, TEXT, or UNKNOWN
6770
ml.document.format=JSON
6871

gradle.properties

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,12 @@ kafkaHome=
1010

1111
# Override in gradle-local.properties if testing with Confluent Platform
1212
confluentHome=
13+
14+
# Only used for testing
15+
mlConfigPaths=src/test/ml-config
16+
mlRestPort=8018
17+
mlTestRestPort=8019
18+
mlAppName=kafka-test
19+
mlUsername=admin
20+
mlPassword=changeme-in-gradle-local.properties
21+
mlContentForestsPerHost=1

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConfig.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class MarkLogicSinkConfig extends AbstractConfig {
3636

3737
public static final String DOCUMENT_COLLECTIONS_ADD_TOPIC = "ml.document.addTopicToCollections";
3838
public static final String DOCUMENT_COLLECTIONS = "ml.document.collections";
39+
public static final String DOCUMENT_TEMPORAL_COLLECTION = "ml.document.temporalCollection";
3940
public static final String DOCUMENT_PERMISSIONS = "ml.document.permissions";
4041
public static final String DOCUMENT_FORMAT = "ml.document.format";
4142
public static final String DOCUMENT_MIMETYPE = "ml.document.mimeType";
@@ -69,13 +70,14 @@ public class MarkLogicSinkConfig extends AbstractConfig {
6970
.define(DATAHUB_FLOW_NAME, Type.STRING, null, Importance.MEDIUM, "Name of a Data Hub flow to run")
7071
.define(DATAHUB_FLOW_STEPS, Type.STRING, null, Importance.MEDIUM, "Comma-delimited names of steps to run")
7172
.define(DATAHUB_FLOW_LOG_RESPONSE, Type.BOOLEAN, false, Importance.LOW, "If set to true, the response from running a flow on each ingested batch will be logged at the info level")
72-
.define(DMSDK_BATCH_SIZE, Type.INT, 100, Importance.HIGH, "Number of documents to write in each batch")
73-
.define(DMSDK_THREAD_COUNT, Type.INT, 8, Importance.HIGH, "Number of threads for DMSDK to use")
74-
.define(DMSDK_TRANSFORM, Type.STRING, "", Importance.MEDIUM, "Name of a REST transform to use when writing documents")
75-
.define(DMSDK_TRANSFORM_PARAMS, Type.STRING, "", Importance.MEDIUM, "Delimited set of transform names and values")
73+
.define(DMSDK_BATCH_SIZE, Type.INT, null, Importance.HIGH, "Number of documents to write in each batch")
74+
.define(DMSDK_THREAD_COUNT, Type.INT, null, Importance.HIGH, "Number of threads for DMSDK to use")
75+
.define(DMSDK_TRANSFORM, Type.STRING, null, Importance.MEDIUM, "Name of a REST transform to use when writing documents")
76+
.define(DMSDK_TRANSFORM_PARAMS, Type.STRING, null, Importance.MEDIUM, "Delimited set of transform names and values")
7677
.define(DMSDK_TRANSFORM_PARAMS_DELIMITER, Type.STRING, ",", Importance.LOW, "Delimiter for transform parameter names and values; defaults to a comma")
7778
.define(DOCUMENT_COLLECTIONS_ADD_TOPIC, Type.BOOLEAN, false, Importance.LOW, "Indicates if the topic name should be added to the set of collections for a document")
7879
.define(DOCUMENT_COLLECTIONS, Type.STRING, "", Importance.MEDIUM, "String-delimited collections to add each document to")
80+
.define(DOCUMENT_TEMPORAL_COLLECTION, Type.STRING, null, Importance.LOW, "Specify the name of a temporal collection for documents to be inserted into")
7981
.define(DOCUMENT_FORMAT, Type.STRING, "", Importance.LOW, "Defines format of each document; can be one of json, xml, text, binary, or unknown")
8082
.define(DOCUMENT_MIMETYPE, Type.STRING, "", Importance.LOW, "Defines the mime type of each document; optional, and typically the format is set instead of the mime type")
8183
.define(DOCUMENT_PERMISSIONS, Type.STRING, "", Importance.MEDIUM, "String-delimited permissions to add to each document; role1,capability1,role2,capability2,etc")

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkTask.java

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.kafka.connect.sink.SinkTask;
1212
import org.slf4j.Logger;
1313
import org.slf4j.LoggerFactory;
14+
import org.springframework.util.StringUtils;
1415

1516
import java.io.IOException;
1617
import java.util.*;
@@ -45,14 +46,9 @@ public void start(final Map<String, String> config) {
4546
databaseClient = new DefaultConfiguredDatabaseClientFactory().newDatabaseClient(databaseClientConfig);
4647

4748
dataMovementManager = databaseClient.newDataMovementManager();
48-
writeBatcher = dataMovementManager.newWriteBatcher()
49-
.withBatchSize((Integer) parsedConfig.get(MarkLogicSinkConfig.DMSDK_BATCH_SIZE))
50-
.withThreadCount((Integer) parsedConfig.get(MarkLogicSinkConfig.DMSDK_THREAD_COUNT));
5149

52-
ServerTransform transform = buildServerTransform(parsedConfig);
53-
if (transform != null) {
54-
writeBatcher.withTransform(transform);
55-
}
50+
writeBatcher = dataMovementManager.newWriteBatcher();
51+
configureWriteBatcher(parsedConfig, writeBatcher);
5652

5753
writeBatcher.onBatchFailure((writeBatch, throwable) -> {
5854
int batchSize = writeBatch.getItems().length;
@@ -70,6 +66,39 @@ public void start(final Map<String, String> config) {
7066
logger.info("Started");
7167
}
7268

69+
/**
70+
* Configure the given WriteBatcher based on DMSDK-related options in the parsedConfig.
71+
*
72+
* @param parsedConfig
73+
* @param writeBatcher
74+
*/
75+
protected void configureWriteBatcher(Map<String, Object> parsedConfig, WriteBatcher writeBatcher) {
76+
Integer batchSize = (Integer)parsedConfig.get(MarkLogicSinkConfig.DMSDK_BATCH_SIZE);
77+
if (batchSize != null) {
78+
logger.info("DMSDK batch size: " + batchSize);
79+
writeBatcher.withBatchSize(batchSize);
80+
}
81+
82+
Integer threadCount = (Integer) parsedConfig.get(MarkLogicSinkConfig.DMSDK_THREAD_COUNT);
83+
if (threadCount != null) {
84+
logger.info("DMSDK thread count: " + threadCount);
85+
writeBatcher.withThreadCount(threadCount);
86+
}
87+
88+
ServerTransform transform = buildServerTransform(parsedConfig);
89+
if (transform != null) {
90+
// Not logging transform parameters as they may contain sensitive values
91+
logger.info("Will apply server transform: " + transform.getName());
92+
writeBatcher.withTransform(transform);
93+
}
94+
95+
String temporalCollection = (String)parsedConfig.get(MarkLogicSinkConfig.DOCUMENT_TEMPORAL_COLLECTION);
96+
if (StringUtils.hasText(temporalCollection)) {
97+
logger.info("Will add documents to temporal collection: " + temporalCollection);
98+
writeBatcher.withTemporalCollection(temporalCollection);
99+
}
100+
}
101+
73102
/**
74103
* This is all specific to Kafka, as it involves reading inputs from the Kafka config map and then using them to
75104
* construct the reusable RunFlowWriteBatchListener.
@@ -104,7 +133,7 @@ protected RunFlowWriteBatchListener buildSuccessListener(String flowName, Map<St
104133
*/
105134
protected ServerTransform buildServerTransform(final Map<String, Object> parsedConfig) {
106135
String transform = (String) parsedConfig.get(MarkLogicSinkConfig.DMSDK_TRANSFORM);
107-
if (transform != null && transform.trim().length() > 0) {
136+
if (StringUtils.hasText(transform)) {
108137
ServerTransform t = new ServerTransform(transform);
109138
String params = (String) parsedConfig.get(MarkLogicSinkConfig.DMSDK_TRANSFORM_PARAMS);
110139
if (params != null && params.trim().length() > 0) {
@@ -199,4 +228,13 @@ public void put(final Collection<SinkRecord> records) {
199228
public String version() {
200229
return MarkLogicSinkConnector.MARKLOGIC_SINK_CONNECTOR_VERSION;
201230
}
231+
232+
/**
233+
* Exposed to facilitate testing.
234+
*
235+
* @return
236+
*/
237+
protected WriteBatcher getWriteBatcher() {
238+
return writeBatcher;
239+
}
202240
}

0 commit comments

Comments
 (0)