Skip to content

Commit 6418b47

Browse files
committed
Added CONTRIBUTING guide with Confluent Platform instructions
Before 1.7.0 is released, the CONTRIBUTING guide will be updated to capture how to develop and test with regular Kafka. And the README will be updated to focus on using a connector release in both Kafka and Confluent.
1 parent da71ae9 commit 6418b47

File tree

6 files changed

+256
-45
lines changed

6 files changed

+256
-45
lines changed

CONTRIBUTING.md

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
This guide describes how to develop and contribute pull requests to this connector.
2+
3+
# Testing with Confluent Platform
4+
5+
[Confluent Platform](https://docs.confluent.io/platform/7.2.1/overview.html) provides an easy mechanism for running
6+
Kafka locally via a single application. To try out the MarkLogic Kafka connector via the Confluent Platform, follow
7+
the steps below.
8+
9+
## Install Confluent Platform with the MarkLogic Kafka connector
10+
11+
First, [install the Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#cp-quickstart-step-1)
12+
via the "Tar archive" option (the "Docker" option has not yet been tested).
13+
14+
**Important!** After step 6 (installing the Datagen source connector) and before step 7 (starting Confluent Platform),
15+
you'll need to install the MarkLogic Kafka connector into your Confluent Platform distribution.
16+
17+
To do so, modify `confluentHome` in `gradle-local.properties` (create this file in the root of this project if it
18+
does not already exist) to point to where you extracted the Confluent Platform distribution - e.g.:
19+
20+
confluentHome=/Users/myusername/confluent-7.2.1
21+
22+
Then build and copy the connector to the Confluent Platform directory that you configured above:
23+
24+
./gradlew copyConnectorToConfluent
25+
26+
Note that any time you modify the MarkLogic Kafka connector code, you'll need to repeat the
27+
`./gradlew copyConnectorToConfluent` step.
28+
29+
Next, start Confluent:
30+
31+
confluent local services start
32+
33+
To verify that your Confluent installation is running properly, you can run `confluent local services status` and
34+
see logging similar to this:
35+
36+
```
37+
Using CONFLUENT_CURRENT: /var/folders/wn/l42pccj17rbfw6h_5b8bt2nnkpch_s/T/confluent.995873
38+
Connect is [UP]
39+
Control Center is [UP]
40+
Kafka is [UP]
41+
Kafka REST is [UP]
42+
ksqlDB Server is [UP]
43+
Schema Registry is [UP]
44+
ZooKeeper is [UP]
45+
```
46+
47+
You can now visit http://localhost:9021 to access [Confluent's Control Center](https://docs.confluent.io/platform/current/control-center/index.html)
48+
application.
49+
50+
Within Control Center, click on "controlcenter.cluster" to access the configuration for the Kafka cluster.
51+
52+
53+
## Load a Datagen connector instance
54+
55+
To test out the MarkLogic Kafka connector, you should first load an instance of the [Kafka Datagen connector]
56+
(https://github.com/confluentinc/kafka-connect-datagen). The Datagen connector is a Kafka source connector that can
57+
generate test data which can then be fed to the MarkLogic Kafka connector. The following Gradle command will automate
58+
loading an instance of the Datagen connector that will write JSON messages to a `purchases` topic every second:
59+
60+
./gradlew loadDatagenPurchasesConnector
61+
62+
In the Control Center GUI, you can verify the Datagen connector instance:
63+
64+
1. Click on "Connect" in the left sidebar
65+
2. Click on the "connect-default" cluster
66+
3. Click on the "datagen-purchases" connector
67+
68+
Additionally, you can examine the data sent by the Datagen connector to the `purchases` topic:
69+
70+
1. Click on "Topics" in the left sidebar
71+
2. Click on the "purchases" topic
72+
3. Click on "Messages" to see the JSON documents being sent by the connector
73+
74+
## Load a MarkLogic Kafka connector instance
75+
76+
Next, load an instance of the MarkLogic Kafka connector that will read data from the `purchases` topic and write
77+
it to MarkLogic. The `src/test/resources/confluent/marklogic-purchases-connector.json` file defines the connection
78+
properties for MarkLogic, and it defaults to writing to the `Documents` database via port 8000. You can adjust this file
79+
to suit your testing needs.
80+
81+
./gradlew loadMarkLogicPurchasesConnector
82+
83+
In the Control Center GUI, you can verify the MarkLogic Kafka connector instance:
84+
85+
1. Click on "Connect" in the left sidebar
86+
2. Click on the "connect-default" cluster
87+
3. Click on the "marklogic-purchases" connector
88+
89+
You can then verify that data is being written to MarkLogic by using MarkLogic's qconsole application to inspect the
90+
contents of the `Documents` database.
91+
92+
You can also manually configure an instance of the MarkLogic Kafka connector as well:
93+
94+
1. Click on "Connect" in the left sidebar
95+
2. Click on the "connect-default" cluster
96+
3. Click on "Add connector"
97+
4. Click on "MarkLogicSinkConnector"
98+
5. Select the topic(s) you wish to read from
99+
6. For "Key converter class" and "Value converter class", enter `org.apache.kafka.connect.storage.StringConverter`
100+
7. Under "General", enter values for the required MarkLogic connection fields and for any optional fields you wish
101+
to configure
102+
8. At the bottom of the page, click "Next"
103+
9. Click "Launch"
104+
105+
In the list of connectors in Control Center, the connector will initially have a status of "Failed" while it starts up.
106+
After it starts successfully, it will have a status of "Running".
107+
108+
## Destroying and setting up the Confluent Platform instance
109+
110+
While developing and testing the MarkLogic Kafka connector, it is common that the "local" instance of Confluent
111+
Platform will become unstable and no longer work. The [Confluent local docs](https://docs.confluent.io/confluent-cli/current/command-reference/local/confluent_local_current.html)
112+
make reference to this - "The data that are produced are transient and are intended to be temporary".
113+
114+
It is thus advisable that after you copy a new instance of the MarkLogic Kafka connector into Confluent Platform (i.e.
115+
by running `./gradlew copyConnectorToConfluent`), you should destroy your local Confluent Platform instance:
116+
117+
./gradlew destroyLocalConfluent
118+
119+
After doing that, you can quickly automate starting Confluent Platform and loading the two connectors via the following:
120+
121+
./gradlew setupLocalConfluent
122+
123+
Remember that if you've modified the connector code, you'll first need to run `./gradlew copyConnectorToConfluent`.
124+
125+
Doing the above will provide the most reliable way to get a new and working instance of Confluent Platform with the
126+
MarkLogic Kafka connector installed.
127+
128+
You may have luck with simply doing `confluent local services stop`, `./gradlew copyConnectorToConfluent`, and
129+
`confluent local services start`, but this has so far not worked reliably - i.e. one of the Confluent Platform
130+
services (sometimes Schema Registry, sometimes Control Center) usually stops working.
131+
132+
# Testing with Apache Kafka
133+
134+
TODO, will borrow a lot of content from the README.

README.md

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,50 @@
1-
# kafka-connect-marklogic
1+
# MarkLogic Kafka connector
22

3-
This is a connector for subscribing to Kafka queues and pushing messages to MarkLogic
3+
The MarkLogic Kafka connector is a [Kafka Connect](https://docs.confluent.io/platform/current/connect/index.html)
4+
sink connector for receiving messages from Kafka topics and writing them to a MarkLogic database.
45

56
## Requirements
7+
68
* MarkLogic 9
79

8-
## Quick Start
10+
## Quick Start with Confluent Platform
11+
12+
TODO, will borrow some content from the CONTRIBUTING file
13+
14+
## Quick Start with Apache Kafka
15+
16+
These instructions assume that you already have an instance of Apache Kafka installed; the [Kafka Quickstart]
17+
(https://kafka.apache.org/quickstart) instructions provide an easy way of accomplishing this. Perform step 1 of these
18+
instructions before proceeding.
19+
20+
Next, if you are running Kafka locally, do the following:
921

10-
#### To try this out locally:
22+
1. Configure `kafkaHome` in gradle-local.properties - e.g. kafkaHome=/Users/myusername/kafka_2.13-2.8.1
23+
1. Run `./gradlew clean deploy` to build a jar and copy it and the below property files into the appropriate Kafka directories
1124

12-
1. Configure kafkaHome in gradle-local.properties - e.g. kafkaHome=/Users/myusername/tools/kafka_2.11-2.1.0
13-
1. Run "./gradlew clean deploy" to build a jar and copy it and the below property files into the appropriate Kafka directories
25+
If you are running Kafka on a remote server, do the following:
1426

15-
#### To try this out on a remote Kafka server
16-
1. Run "./gradlew clean jar" to build the jar.
17-
1. Copy the jar to the <kafkaHome>/libs on the remote server.
18-
1. Copy the two properties (config/marklogic-connect-distributed.properties config/marklogic-sink.properties) to <kafkaHome>/config on the remote server.
27+
1. Run `./gradlew clean shadowJar` to build the jar
28+
1. Copy the jar to the <kafkaHome>/libs on the remote server
29+
1. Copy the two properties (config/marklogic-connect-distributed.properties config/marklogic-sink.properties) to <kafkaHome>/config on the remote server
1930

20-
See https://kafka.apache.org/quickstart for instructions on starting up Zookeeper and Kafka, which as of August 2022
21-
will instruct you to run the following commands (in separate terminal windows, both from the Kafka home directory):
31+
See step 2 in the [Kafka Quickstart guide](https://kafka.apache.org/quickstart) for instructions on starting
32+
Zookeeper and Kafka. As of August 2022, the guide will instruct you to run the following commands (in separate
33+
terminal windows, both from the Kafka home directory):
2234

2335
bin/zookeeper-server-start.sh config/zookeeper.properties
2436

2537
and:
2638

2739
bin/kafka-server-start.sh config/server.properties
2840

29-
To start the Kafka connector in standalone mode (from the Kafka home directory):
41+
Next, start the Kafka connector in standalone mode (also from the Kafka home directory):
3042

3143
bin/connect-standalone.sh config/marklogic-connect-standalone.properties config/marklogic-sink.properties
3244

3345
You'll see a fair amount of logging from Kafka itself; near the end of the logging, look for messages from
34-
MarkLogicSinkTask and the Data Movement classes such as WriteBatcherImpl to ensure that the connector has started up
35-
correctly.
46+
`MarkLogicSinkTask` and MarkLogic Java Client classes such as `WriteBatcherImpl` to ensure that the connector has
47+
started up correctly.
3648

3749
You can also start the connector in distributed mode:
3850

build.gradle

Lines changed: 60 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ dependencies {
5757
assets files('apache_logo.png')
5858
}
5959

60-
// Needed by Gradle 4.6+ - see https://www.petrikainulainen.net/programming/testing/junit-5-tutorial-running-unit-tests-with-gradle/
6160
test {
6261
useJUnitPlatform()
6362
}
@@ -88,73 +87,105 @@ task deploy {
8887
dependsOn = ["copyJarToKafka", "copyPropertyFilesToKafka"]
8988
}
9089

90+
ext {
91+
confluentArchiveGroup = "Confluent Connector Archive"
92+
confluentTestingGroup = "Confluent Platform Local Testing"
93+
baseArchiveBuildDir = "build/connectorArchive"
94+
baseArchiveName = "${componentOwner}-${componentName}-${version}"
95+
}
96+
9197
// Tasks for building the archive required for submitting to the Confluence Connector Hub
9298

9399
import org.apache.tools.ant.filters.ReplaceTokens
94100

95-
def baseArchiveBuildDir = "build/connectorArchive"
96-
def baseArchiveName = "${componentOwner}-${componentName}-${version}"
97-
task connectorArchive_CopyManifestToBuildDirectory(type: Copy) {
101+
task connectorArchive_CopyManifestToBuildDirectory(type: Copy, group: confluentArchiveGroup) {
98102
description = "Copy the project manifest into the root folder"
99-
group = 'connector archive'
100-
101103
from '.'
102104
include 'manifest.json'
103105
into "${baseArchiveBuildDir}/${baseArchiveName}"
104106
filter(ReplaceTokens, tokens: [CONFLUENT_USER: componentOwner, VERSION: version])
105107
}
106108

107-
task connectorArchive_CopyAssetsToBuildDirectory(type: Copy) {
109+
task connectorArchive_CopyAssetsToBuildDirectory(type: Copy, group: confluentArchiveGroup) {
108110
description = "Copy the project assets into the assets folder"
109-
group = 'connector archive'
110-
111111
from configurations.assets
112112
into "${baseArchiveBuildDir}/${baseArchiveName}/assets"
113113
}
114114

115-
task connectorArchive_CopyEtcToBuildDirectory(type: Copy) {
115+
task connectorArchive_CopyEtcToBuildDirectory(type: Copy, group: confluentArchiveGroup) {
116116
description = "Copy the project support files into the etc folder"
117-
group = 'connector archive'
118-
119117
from 'config'
120118
include '*'
121119
into "${baseArchiveBuildDir}/${baseArchiveName}/etc"
122120
}
123121

124-
task connectorArchive_CopyDocumentationToBuildDirectory(type: Copy) {
122+
task connectorArchive_CopyDocumentationToBuildDirectory(type: Copy, group: confluentArchiveGroup) {
125123
description = "Copy the project documentation into the doc folder"
126-
group = 'connector archive'
127-
128124
from configurations.documentation
129125
into "${baseArchiveBuildDir}/${baseArchiveName}/doc"
130126
}
131127

132-
task connectorArchive_CopyDependenciesToBuildDirectory(type: Copy) {
128+
task connectorArchive_CopyDependenciesToBuildDirectory(type: Copy, group: confluentArchiveGroup, dependsOn: jar) {
133129
description = "Copy the dependency jars into the lib folder"
134-
group = 'connector archive'
135-
dependsOn = [jar]
136-
137130
from jar
138131
from configurations.runtimeClasspath.findAll { it.name.endsWith('jar') }
139132
into "${baseArchiveBuildDir}/${baseArchiveName}/lib"
140133
}
141134

142-
task connectorArchive_BuildDirectory() {
135+
task connectorArchive_BuildDirectory(group: confluentArchiveGroup) {
143136
description = "Build the directory that will be used to create the Kafka Connector Archive"
144-
dependsOn = [connectorArchive_CopyManifestToBuildDirectory,
145-
connectorArchive_CopyDependenciesToBuildDirectory,
146-
connectorArchive_CopyDocumentationToBuildDirectory,
147-
connectorArchive_CopyEtcToBuildDirectory,
148-
connectorArchive_CopyAssetsToBuildDirectory]
149-
group = 'connector archive'
137+
dependsOn = [
138+
connectorArchive_CopyManifestToBuildDirectory,
139+
connectorArchive_CopyDependenciesToBuildDirectory,
140+
connectorArchive_CopyDocumentationToBuildDirectory,
141+
connectorArchive_CopyEtcToBuildDirectory,
142+
connectorArchive_CopyAssetsToBuildDirectory
143+
]
150144
}
151145

152-
task connectorArchive(type: Zip, dependsOn: connectorArchive_BuildDirectory) {
146+
task connectorArchive(type: Zip, dependsOn: connectorArchive_BuildDirectory, group: confluentArchiveGroup) {
153147
description = 'Build a Connector Hub for the Confluent Connector Hub'
154-
group = 'connector archive'
155-
156148
from "${baseArchiveBuildDir}"
157149
include '**/*'
158150
archiveName "${baseArchiveName}.zip"
159151
destinationDir(file('build/distro'))
160152
}
153+
154+
task copyConnectorToConfluent(type: Copy, group: confluentTestingGroup, dependsOn: connectorArchive_BuildDirectory) {
155+
description = "Build the connector archive and copy it to your local Confluent Platform"
156+
from baseArchiveBuildDir
157+
into "${confluentHome}/share/confluent-hub-components"
158+
}
159+
160+
// See https://docs.confluent.io/confluent-cli/current/command-reference/local/confluent_local_destroy.html
161+
task destroyLocalConfluent(type: Exec, group: confluentTestingGroup) {
162+
description = "Destroy the local Confluent Platform instance"
163+
commandLine "confluent", "local", "destroy"
164+
}
165+
166+
// See https://docs.confluent.io/confluent-cli/current/command-reference/local/services/confluent_local_services_start.html
167+
task startLocalConfluent(type: Exec, group: confluentTestingGroup) {
168+
description = "Convenience task for starting a local instance of Confluent Platform"
169+
commandLine "confluent", "local", "services", "start"
170+
}
171+
172+
task loadDatagenPurchasesConnector(type: Exec, group: confluentTestingGroup) {
173+
description = "Load an instance of the Datagen connector into Confluent Platform for sending JSON documents to " +
174+
"the 'purchases' topic"
175+
commandLine "confluent", "local", "services", "connect", "connector", "load", "datagen-purchases", "-c",
176+
"src/test/resources/confluent/datagen-purchases-connector.json"
177+
}
178+
179+
task loadMarkLogicPurchasesConnector(type: Exec, group: confluentTestingGroup) {
180+
description = "Load an instance of the MarkLogic Kafka connector into Confluent Platform for writing data to " +
181+
"MarkLogic from the 'purchases' topic"
182+
commandLine "confluent", "local", "services", "connect", "connector", "load", "marklogic-purchases", "-c",
183+
"src/test/resources/confluent/marklogic-purchases-connector.json"
184+
}
185+
186+
task setupLocalConfluent(group: confluentTestingGroup) {
187+
description = "Start a local Confluent Platform instance and load the Datagen and MarkLogic connectors"
188+
}
189+
setupLocalConfluent.dependsOn startLocalConfluent, loadDatagenPurchasesConnector, loadMarkLogicPurchasesConnector
190+
loadDatagenPurchasesConnector.mustRunAfter startLocalConfluent
191+
loadMarkLogicPurchasesConnector.mustRunAfter startLocalConfluent

gradle.properties

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,8 @@ version=1.7.0-SNAPSHOT
55
componentOwner=marklogic
66
componentName=kafka-marklogic-connector
77

8-
# Override this in gradle-local.properties
8+
# Override in gradle-local.properties if testing with regular Apache Kafka
99
kafkaHome=
10+
11+
# Override in gradle-local.properties if testing with Confluent Platform
12+
confluentHome=
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"name": "datagen-purchases",
3+
"config": {
4+
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
5+
"kafka.topic": "purchases",
6+
"quickstart": "purchases",
7+
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
8+
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
9+
"value.converter.schemas.enable": "false",
10+
"value.converter.decimal.format": "NUMERIC",
11+
"max.interval": 1000,
12+
"tasks.max": "1"
13+
}
14+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"name": "marklogic-purchases",
3+
"config": {
4+
"topics": "purchases",
5+
"connector.class": "com.marklogic.kafka.connect.sink.MarkLogicSinkConnector",
6+
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
7+
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
8+
"tasks.max": "1",
9+
"ml.connection.host": "localhost",
10+
"ml.connection.port": 8000,
11+
"ml.connection.username": "admin",
12+
"ml.connection.password": "admin",
13+
"ml.connection.securityContextType": "DIGEST",
14+
"ml.document.format": "JSON",
15+
"ml.document.uriSuffix": ".json"
16+
}
17+
}

0 commit comments

Comments
 (0)