Skip to content

Commit 8b4e678

Browse files
authored
Merge pull request #63 from marklogic-community/feature/contributing-guide
Added CONTRIBUTING guide with Confluent Platform instructions
2 parents da71ae9 + 6418b47 commit 8b4e678

File tree

6 files changed

+256
-45
lines changed

6 files changed

+256
-45
lines changed

CONTRIBUTING.md

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
This guide describes how to develop and contribute pull requests to this connector.
2+
3+
# Testing with Confluent Platform
4+
5+
[Confluent Platform](https://docs.confluent.io/platform/7.2.1/overview.html) provides an easy mechanism for running
6+
Kafka locally via a single application. To try out the MarkLogic Kafka connector via the Confluent Platform, follow
7+
the steps below.
8+
9+
## Install Confluent Platform with the MarkLogic Kafka connector
10+
11+
First, [install the Confluent Platform](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#cp-quickstart-step-1)
12+
via the "Tar archive" option (the "Docker" option has not yet been tested).
13+
14+
**Important!** After step 6 (installing the Datagen source connector) and before step 7 (starting Confluent Platform),
15+
you'll need to install the MarkLogic Kafka connector into your Confluent Platform distribution.
16+
17+
To do so, modify `confluentHome` in `gradle-local.properties` (create this file in the root of this project if it
18+
does not already exist) to point to where you extracted the Confluent Platform distribution - e.g.:
19+
20+
confluentHome=/Users/myusername/confluent-7.2.1
21+
22+
Then build and copy the connector to the Confluent Platform directory that you configured above:
23+
24+
./gradlew copyConnectorToConfluent
25+
26+
Note that any time you modify the MarkLogic Kafka connector code, you'll need to repeat the
27+
`./gradlew copyConnectorToConfluent` step.
28+
29+
Next, start Confluent:
30+
31+
confluent local services start
32+
33+
To verify that your Confluent installation is running properly, you can run `confluent local services status` and
34+
see logging similar to this:
35+
36+
```
37+
Using CONFLUENT_CURRENT: /var/folders/wn/l42pccj17rbfw6h_5b8bt2nnkpch_s/T/confluent.995873
38+
Connect is [UP]
39+
Control Center is [UP]
40+
Kafka is [UP]
41+
Kafka REST is [UP]
42+
ksqlDB Server is [UP]
43+
Schema Registry is [UP]
44+
ZooKeeper is [UP]
45+
```
46+
47+
You can now visit http://localhost:9021 to access [Confluent's Control Center](https://docs.confluent.io/platform/current/control-center/index.html)
48+
application.
49+
50+
Within Control Center, click on "controlcenter.cluster" to access the configuration for the Kafka cluster.
51+
52+
53+
## Load a Datagen connector instance
54+
55+
To test out the MarkLogic Kafka connector, you should first load an instance of the [Kafka Datagen connector]
56+
(https://github.com/confluentinc/kafka-connect-datagen). The Datagen connector is a Kafka source connector that can
57+
generate test data which can then be fed to the MarkLogic Kafka connector. The following Gradle command will automate
58+
loading an instance of the Datagen connector that will write JSON messages to a `purchases` topic every second:
59+
60+
./gradlew loadDatagenPurchasesConnector
61+
62+
In the Control Center GUI, you can verify the Datagen connector instance:
63+
64+
1. Click on "Connect" in the left sidebar
65+
2. Click on the "connect-default" cluster
66+
3. Click on the "datagen-purchases" connector
67+
68+
Additionally, you can examine the data sent by the Datagen connector to the `purchases` topic:
69+
70+
1. Click on "Topics" in the left sidebar
71+
2. Click on the "purchases" topic
72+
3. Click on "Messages" to see the JSON documents being sent by the connector
73+
74+
## Load a MarkLogic Kafka connector instance
75+
76+
Next, load an instance of the MarkLogic Kafka connector that will read data from the `purchases` topic and write
77+
it to MarkLogic. The `src/test/resources/confluent/marklogic-purchases-connector.json` file defines the connection
78+
properties for MarkLogic, and it defaults to writing to the `Documents` database via port 8000. You can adjust this file
79+
to suit your testing needs.
80+
81+
./gradlew loadMarkLogicPurchasesConnector
82+
83+
In the Control Center GUI, you can verify the MarkLogic Kafka connector instance:
84+
85+
1. Click on "Connect" in the left sidebar
86+
2. Click on the "connect-default" cluster
87+
3. Click on the "marklogic-purchases" connector
88+
89+
You can then verify that data is being written to MarkLogic by using MarkLogic's qconsole application to inspect the
90+
contents of the `Documents` database.
91+
92+
You can also manually configure an instance of the MarkLogic Kafka connector as well:
93+
94+
1. Click on "Connect" in the left sidebar
95+
2. Click on the "connect-default" cluster
96+
3. Click on "Add connector"
97+
4. Click on "MarkLogicSinkConnector"
98+
5. Select the topic(s) you wish to read from
99+
6. For "Key converter class" and "Value converter class", enter `org.apache.kafka.connect.storage.StringConverter`
100+
7. Under "General", enter values for the required MarkLogic connection fields and for any optional fields you wish
101+
to configure
102+
8. At the bottom of the page, click "Next"
103+
9. Click "Launch"
104+
105+
In the list of connectors in Control Center, the connector will initially have a status of "Failed" while it starts up.
106+
After it starts successfully, it will have a status of "Running".
107+
108+
## Destroying and setting up the Confluent Platform instance
109+
110+
While developing and testing the MarkLogic Kafka connector, it is common that the "local" instance of Confluent
111+
Platform will become unstable and no longer work. The [Confluent local docs](https://docs.confluent.io/confluent-cli/current/command-reference/local/confluent_local_current.html)
112+
make reference to this - "The data that are produced are transient and are intended to be temporary".
113+
114+
It is thus advisable that after you copy a new instance of the MarkLogic Kafka connector into Confluent Platform (i.e.
115+
by running `./gradlew copyConnectorToConfluent`), you should destroy your local Confluent Platform instance:
116+
117+
./gradlew destroyLocalConfluent
118+
119+
After doing that, you can quickly automate starting Confluent Platform and loading the two connectors via the following:
120+
121+
./gradlew setupLocalConfluent
122+
123+
Remember that if you've modified the connector code, you'll first need to run `./gradlew copyConnectorToConfluent`.
124+
125+
Doing the above will provide the most reliable way to get a new and working instance of Confluent Platform with the
126+
MarkLogic Kafka connector installed.
127+
128+
You may have luck with simply doing `confluent local services stop`, `./gradlew copyConnectorToConfluent`, and
129+
`confluent local services start`, but this has so far not worked reliably - i.e. one of the Confluent Platform
130+
services (sometimes Schema Registry, sometimes Control Center) usually stops working.
131+
132+
# Testing with Apache Kafka
133+
134+
TODO, will borrow a lot of content from the README.

README.md

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,50 @@
1-
# kafka-connect-marklogic
1+
# MarkLogic Kafka connector
22

3-
This is a connector for subscribing to Kafka queues and pushing messages to MarkLogic
3+
The MarkLogic Kafka connector is a [Kafka Connect](https://docs.confluent.io/platform/current/connect/index.html)
4+
sink connector for receiving messages from Kafka topics and writing them to a MarkLogic database.
45

56
## Requirements
7+
68
* MarkLogic 9
79

8-
## Quick Start
10+
## Quick Start with Confluent Platform
11+
12+
TODO, will borrow some content from the CONTRIBUTING file
13+
14+
## Quick Start with Apache Kafka
15+
16+
These instructions assume that you already have an instance of Apache Kafka installed; the [Kafka Quickstart]
17+
(https://kafka.apache.org/quickstart) instructions provide an easy way of accomplishing this. Perform step 1 of these
18+
instructions before proceeding.
19+
20+
Next, if you are running Kafka locally, do the following:
921

10-
#### To try this out locally:
22+
1. Configure `kafkaHome` in gradle-local.properties - e.g. kafkaHome=/Users/myusername/kafka_2.13-2.8.1
23+
1. Run `./gradlew clean deploy` to build a jar and copy it and the below property files into the appropriate Kafka directories
1124

12-
1. Configure kafkaHome in gradle-local.properties - e.g. kafkaHome=/Users/myusername/tools/kafka_2.11-2.1.0
13-
1. Run "./gradlew clean deploy" to build a jar and copy it and the below property files into the appropriate Kafka directories
25+
If you are running Kafka on a remote server, do the following:
1426

15-
#### To try this out on a remote Kafka server
16-
1. Run "./gradlew clean jar" to build the jar.
17-
1. Copy the jar to the <kafkaHome>/libs on the remote server.
18-
1. Copy the two properties (config/marklogic-connect-distributed.properties config/marklogic-sink.properties) to <kafkaHome>/config on the remote server.
27+
1. Run `./gradlew clean shadowJar` to build the jar
28+
1. Copy the jar to the <kafkaHome>/libs on the remote server
29+
1. Copy the two properties (config/marklogic-connect-distributed.properties config/marklogic-sink.properties) to <kafkaHome>/config on the remote server
1930

20-
See https://kafka.apache.org/quickstart for instructions on starting up Zookeeper and Kafka, which as of August 2022
21-
will instruct you to run the following commands (in separate terminal windows, both from the Kafka home directory):
31+
See step 2 in the [Kafka Quickstart guide](https://kafka.apache.org/quickstart) for instructions on starting
32+
Zookeeper and Kafka. As of August 2022, the guide will instruct you to run the following commands (in separate
33+
terminal windows, both from the Kafka home directory):
2234

2335
bin/zookeeper-server-start.sh config/zookeeper.properties
2436

2537
and:
2638

2739
bin/kafka-server-start.sh config/server.properties
2840

29-
To start the Kafka connector in standalone mode (from the Kafka home directory):
41+
Next, start the Kafka connector in standalone mode (also from the Kafka home directory):
3042

3143
bin/connect-standalone.sh config/marklogic-connect-standalone.properties config/marklogic-sink.properties
3244

3345
You'll see a fair amount of logging from Kafka itself; near the end of the logging, look for messages from
34-
MarkLogicSinkTask and the Data Movement classes such as WriteBatcherImpl to ensure that the connector has started up
35-
correctly.
46+
`MarkLogicSinkTask` and MarkLogic Java Client classes such as `WriteBatcherImpl` to ensure that the connector has
47+
started up correctly.
3648

3749
You can also start the connector in distributed mode:
3850

build.gradle

Lines changed: 60 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ dependencies {
5757
assets files('apache_logo.png')
5858
}
5959

60-
// Needed by Gradle 4.6+ - see https://www.petrikainulainen.net/programming/testing/junit-5-tutorial-running-unit-tests-with-gradle/
6160
test {
6261
useJUnitPlatform()
6362
}
@@ -88,73 +87,105 @@ task deploy {
8887
dependsOn = ["copyJarToKafka", "copyPropertyFilesToKafka"]
8988
}
9089

90+
ext {
91+
confluentArchiveGroup = "Confluent Connector Archive"
92+
confluentTestingGroup = "Confluent Platform Local Testing"
93+
baseArchiveBuildDir = "build/connectorArchive"
94+
baseArchiveName = "${componentOwner}-${componentName}-${version}"
95+
}
96+
9197
// Tasks for building the archive required for submitting to the Confluence Connector Hub
9298

9399
import org.apache.tools.ant.filters.ReplaceTokens
94100

95-
def baseArchiveBuildDir = "build/connectorArchive"
96-
def baseArchiveName = "${componentOwner}-${componentName}-${version}"
97-
task connectorArchive_CopyManifestToBuildDirectory(type: Copy) {
101+
task connectorArchive_CopyManifestToBuildDirectory(type: Copy, group: confluentArchiveGroup) {
98102
description = "Copy the project manifest into the root folder"
99-
group = 'connector archive'
100-
101103
from '.'
102104
include 'manifest.json'
103105
into "${baseArchiveBuildDir}/${baseArchiveName}"
104106
filter(ReplaceTokens, tokens: [CONFLUENT_USER: componentOwner, VERSION: version])
105107
}
106108

107-
task connectorArchive_CopyAssetsToBuildDirectory(type: Copy) {
109+
task connectorArchive_CopyAssetsToBuildDirectory(type: Copy, group: confluentArchiveGroup) {
108110
description = "Copy the project assets into the assets folder"
109-
group = 'connector archive'
110-
111111
from configurations.assets
112112
into "${baseArchiveBuildDir}/${baseArchiveName}/assets"
113113
}
114114

115-
task connectorArchive_CopyEtcToBuildDirectory(type: Copy) {
115+
task connectorArchive_CopyEtcToBuildDirectory(type: Copy, group: confluentArchiveGroup) {
116116
description = "Copy the project support files into the etc folder"
117-
group = 'connector archive'
118-
119117
from 'config'
120118
include '*'
121119
into "${baseArchiveBuildDir}/${baseArchiveName}/etc"
122120
}
123121

124-
task connectorArchive_CopyDocumentationToBuildDirectory(type: Copy) {
122+
task connectorArchive_CopyDocumentationToBuildDirectory(type: Copy, group: confluentArchiveGroup) {
125123
description = "Copy the project documentation into the doc folder"
126-
group = 'connector archive'
127-
128124
from configurations.documentation
129125
into "${baseArchiveBuildDir}/${baseArchiveName}/doc"
130126
}
131127

132-
task connectorArchive_CopyDependenciesToBuildDirectory(type: Copy) {
128+
task connectorArchive_CopyDependenciesToBuildDirectory(type: Copy, group: confluentArchiveGroup, dependsOn: jar) {
133129
description = "Copy the dependency jars into the lib folder"
134-
group = 'connector archive'
135-
dependsOn = [jar]
136-
137130
from jar
138131
from configurations.runtimeClasspath.findAll { it.name.endsWith('jar') }
139132
into "${baseArchiveBuildDir}/${baseArchiveName}/lib"
140133
}
141134

142-
task connectorArchive_BuildDirectory() {
135+
task connectorArchive_BuildDirectory(group: confluentArchiveGroup) {
143136
description = "Build the directory that will be used to create the Kafka Connector Archive"
144-
dependsOn = [connectorArchive_CopyManifestToBuildDirectory,
145-
connectorArchive_CopyDependenciesToBuildDirectory,
146-
connectorArchive_CopyDocumentationToBuildDirectory,
147-
connectorArchive_CopyEtcToBuildDirectory,
148-
connectorArchive_CopyAssetsToBuildDirectory]
149-
group = 'connector archive'
137+
dependsOn = [
138+
connectorArchive_CopyManifestToBuildDirectory,
139+
connectorArchive_CopyDependenciesToBuildDirectory,
140+
connectorArchive_CopyDocumentationToBuildDirectory,
141+
connectorArchive_CopyEtcToBuildDirectory,
142+
connectorArchive_CopyAssetsToBuildDirectory
143+
]
150144
}
151145

152-
task connectorArchive(type: Zip, dependsOn: connectorArchive_BuildDirectory) {
146+
task connectorArchive(type: Zip, dependsOn: connectorArchive_BuildDirectory, group: confluentArchiveGroup) {
153147
description = 'Build a Connector Hub for the Confluent Connector Hub'
154-
group = 'connector archive'
155-
156148
from "${baseArchiveBuildDir}"
157149
include '**/*'
158150
archiveName "${baseArchiveName}.zip"
159151
destinationDir(file('build/distro'))
160152
}
153+
154+
task copyConnectorToConfluent(type: Copy, group: confluentTestingGroup, dependsOn: connectorArchive_BuildDirectory) {
155+
description = "Build the connector archive and copy it to your local Confluent Platform"
156+
from baseArchiveBuildDir
157+
into "${confluentHome}/share/confluent-hub-components"
158+
}
159+
160+
// See https://docs.confluent.io/confluent-cli/current/command-reference/local/confluent_local_destroy.html
161+
task destroyLocalConfluent(type: Exec, group: confluentTestingGroup) {
162+
description = "Destroy the local Confluent Platform instance"
163+
commandLine "confluent", "local", "destroy"
164+
}
165+
166+
// See https://docs.confluent.io/confluent-cli/current/command-reference/local/services/confluent_local_services_start.html
167+
task startLocalConfluent(type: Exec, group: confluentTestingGroup) {
168+
description = "Convenience task for starting a local instance of Confluent Platform"
169+
commandLine "confluent", "local", "services", "start"
170+
}
171+
172+
task loadDatagenPurchasesConnector(type: Exec, group: confluentTestingGroup) {
173+
description = "Load an instance of the Datagen connector into Confluent Platform for sending JSON documents to " +
174+
"the 'purchases' topic"
175+
commandLine "confluent", "local", "services", "connect", "connector", "load", "datagen-purchases", "-c",
176+
"src/test/resources/confluent/datagen-purchases-connector.json"
177+
}
178+
179+
task loadMarkLogicPurchasesConnector(type: Exec, group: confluentTestingGroup) {
180+
description = "Load an instance of the MarkLogic Kafka connector into Confluent Platform for writing data to " +
181+
"MarkLogic from the 'purchases' topic"
182+
commandLine "confluent", "local", "services", "connect", "connector", "load", "marklogic-purchases", "-c",
183+
"src/test/resources/confluent/marklogic-purchases-connector.json"
184+
}
185+
186+
task setupLocalConfluent(group: confluentTestingGroup) {
187+
description = "Start a local Confluent Platform instance and load the Datagen and MarkLogic connectors"
188+
}
189+
setupLocalConfluent.dependsOn startLocalConfluent, loadDatagenPurchasesConnector, loadMarkLogicPurchasesConnector
190+
loadDatagenPurchasesConnector.mustRunAfter startLocalConfluent
191+
loadMarkLogicPurchasesConnector.mustRunAfter startLocalConfluent

gradle.properties

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,8 @@ version=1.7.0-SNAPSHOT
55
componentOwner=marklogic
66
componentName=kafka-marklogic-connector
77

8-
# Override this in gradle-local.properties
8+
# Override in gradle-local.properties if testing with regular Apache Kafka
99
kafkaHome=
10+
11+
# Override in gradle-local.properties if testing with Confluent Platform
12+
confluentHome=
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"name": "datagen-purchases",
3+
"config": {
4+
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
5+
"kafka.topic": "purchases",
6+
"quickstart": "purchases",
7+
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
8+
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
9+
"value.converter.schemas.enable": "false",
10+
"value.converter.decimal.format": "NUMERIC",
11+
"max.interval": 1000,
12+
"tasks.max": "1"
13+
}
14+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"name": "marklogic-purchases",
3+
"config": {
4+
"topics": "purchases",
5+
"connector.class": "com.marklogic.kafka.connect.sink.MarkLogicSinkConnector",
6+
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
7+
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
8+
"tasks.max": "1",
9+
"ml.connection.host": "localhost",
10+
"ml.connection.port": 8000,
11+
"ml.connection.username": "admin",
12+
"ml.connection.password": "admin",
13+
"ml.connection.securityContextType": "DIGEST",
14+
"ml.document.format": "JSON",
15+
"ml.document.uriSuffix": ".json"
16+
}
17+
}

0 commit comments

Comments
 (0)