Skip to content

Commit d0c8844

Browse files
committed
Prepare for 1.9 release.
Update the User Documentation. Add examples of JSON Connector configuration. Cleanup outdated examples files. Update the version number. Convert the transform example to a JUnit test.
1 parent fc18a5e commit d0c8844

File tree

11 files changed

+179
-96
lines changed

11 files changed

+179
-96
lines changed

examples/ScalingConsiderations.md renamed to docs/ScalingConsiderations.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
---
2+
layout: default
3+
title: Scaling Considerations
4+
nav_order: 7
5+
---
6+
17
# Scaling Considerations
28
Each of the three parts of parts (Kafka, MarkLogic, and this connector) of this system maybe easily scaled to handle
39
your throughput requirements. To use the connector in a clustered environment you only need to ensure a couple of
@@ -11,8 +17,8 @@ started or shutdown, this information is also relayed to the connectors so that
1117
## MarkLogic
1218
MarkLogic is designed to be used in large clusters of servers. In order to spread the load of data I/O across the
1319
cluster, a load balancer is typically used. In this case, the connector should be configured to be aware of the use
14-
of a load balancer. This is accomplished by setting the "ml.connection.host" to point to the load balancer, and by setting "ml.connection.type" to "gateway" in the marklogic-sink.properties
15-
file.
20+
of a load balancer. This is accomplished by setting the "ml.connection.host" to point to the load balancer, and by
21+
setting "ml.connection.type" to "gateway" in the marklogic-sink.properties file.
1622

1723
<pre><code># A MarkLogic host to connect to. The connector uses the Data Movement SDK, and thus it will connect to each of the
1824
# hosts in a cluster.
@@ -22,6 +28,9 @@ ml.connection.host=MarkLogic-LoadBalancer-1024238516.us-east-1.elb.amazonaws.com
2228
# See https://docs.marklogic.com/guide/java/data-movement#id_26583 for more information.
2329
ml.connection.type=gateway</code></pre>
2430

31+
For additional information regarding scaling a MarkLogic cluster, please see the MarkLogic
32+
[Scalability, Availability, and Failover Guide](https://docs.marklogic.com/guide/cluster/scalability).
33+
2534
## Connector
2635
When configuring multiple instances of the connector to consume the same topic(s), the Kafka Connect framework
2736
automatically handles dividing up the connections by assigning specific topic partitions (spread across the Kafka

docs/writing-data.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ endpointConstants = fn.head(xdmp.fromJSON(endpointConstants));
229229
230230
for (let item of inputSequence) {
231231
item = fn.head(xdmp.fromJSON(item));
232-
// TODO Determine what to do with each item
232+
// Determine what to do with each item
233233
}
234234
```
235235

@@ -329,3 +329,10 @@ required to catch any error that occurs, an unexpected error in the sink connect
329329
and logged by Kafka. However, nothing will be sent to the user-configured DLQ topic in this scenario as the error will
330330
not be associated with a particular sink record. Kafka and MarkLogic server logs should be examined to determine the
331331
cause of the error.
332+
333+
## JSON-based Connector Configuration
334+
335+
Some Kafka environments permit REST-based instantiation of connectors. Confluent is one of those environments.
336+
[Please see the Confluent documentation](https://docs.confluent.io/kafka-connectors/maprdb/current/map_r_d_b_sink_connector_example.html)
337+
to read about this technique. Examples of JSON files to use with the REST service can be found in
338+
examples/ConfluentConnectorConfigs.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"name": "marklogic-purchases-sink",
3+
"config": {
4+
"topics": "purchases",
5+
"connector.class": "com.marklogic.kafka.connect.sink.MarkLogicSinkConnector",
6+
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
7+
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
8+
"tasks.max": "1",
9+
"ml.connection.host": "marklogic",
10+
"ml.connection.port": 8011,
11+
"ml.connection.username": "kafka-test-user",
12+
"ml.connection.password": "kafkatest",
13+
"ml.connection.securityContextType": "DIGEST",
14+
"ml.document.format": "JSON",
15+
"ml.document.uriPrefix": "/purchase/",
16+
"ml.document.uriSuffix": ".json",
17+
"ml.document.collections": "purchases,kafka-data",
18+
"ml.document.permissions": "kafka-test-minimal-user,read,kafka-test-minimal-user,update",
19+
"ml.dmsdk.includeKafkaMetadata": "false",
20+
"ml.dmsdk.includeKafkaHeaders": "true",
21+
"ml.dmsdk.includeKafkaHeaders.prefix": ""
22+
}
23+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"name": "marklogic-purchases-source",
3+
"config": {
4+
"connector.class": "com.marklogic.kafka.connect.source.MarkLogicSourceConnector",
5+
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
6+
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
7+
"tasks.max": "1",
8+
"ml.connection.host": "marklogic",
9+
"ml.connection.port": 8011,
10+
"ml.connection.username": "kafka-test-user",
11+
"ml.connection.password": "kafkatest",
12+
"ml.connection.securityContextType": "DIGEST",
13+
"ml.source.optic.dsl": "op.fromView('demo', 'purchases')",
14+
"ml.source.topic": "marklogic-purchases",
15+
"ml.source.waitTime": "5000",
16+
"ml.source.optic.outputFormat": "JSON"
17+
}
18+
}
-1.71 MB
Binary file not shown.

examples/dmsdk-transform/installTransform.sh

Lines changed: 0 additions & 1 deletion
This file was deleted.

examples/dmsdk-transform/marklogic-sink-with-transform.properties

Lines changed: 0 additions & 74 deletions
This file was deleted.

examples/dmsdk-transform/trans-ex.sjs

Lines changed: 0 additions & 17 deletions
This file was deleted.

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
group=com.marklogic
2-
version=1.9-SNAPSHOT
2+
version=1.9
33

44
# For the Confluent Connector Archive
55
componentOwner=marklogic
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright (c) 2023 MarkLogic Corporation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.marklogic.kafka.connect.sink;
17+
18+
import com.marklogic.client.io.SearchHandle;
19+
import com.marklogic.client.query.StructuredQueryBuilder;
20+
import com.marklogic.client.query.StructuredQueryDefinition;
21+
import kafka.server.KafkaConfig$;
22+
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
23+
import net.mguenther.kafka.junit.KeyValue;
24+
import net.mguenther.kafka.junit.SendKeyValues;
25+
import org.junit.jupiter.api.AfterEach;
26+
import org.junit.jupiter.api.BeforeEach;
27+
import org.junit.jupiter.api.Test;
28+
29+
import java.util.ArrayList;
30+
import java.util.List;
31+
import java.util.Properties;
32+
import java.util.UUID;
33+
34+
import static net.mguenther.kafka.junit.EmbeddedConnectConfig.kafkaConnect;
35+
import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
36+
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.newClusterConfig;
37+
import static org.junit.jupiter.api.Assertions.assertEquals;
38+
39+
class WriteTransformDocumentTest extends AbstractIntegrationSinkTest {
40+
41+
private final String ML_COLLECTION = "kafka-data";
42+
private final String TOPIC = "test-topic";
43+
private final String KEY = String.format("key-%s", UUID.randomUUID());
44+
45+
private EmbeddedKafkaCluster kafka;
46+
47+
@BeforeEach
48+
void setupKafka() {
49+
provisionKafkaWithConnectAndMarkLogicConnector();
50+
kafka.start();
51+
}
52+
53+
@AfterEach
54+
void tearDownKafka() {
55+
kafka.stop();
56+
}
57+
58+
@Test
59+
void shouldWaitForKeyedRecordsToBePublished() throws InterruptedException {
60+
Integer NUM_RECORDS = 2;
61+
sendSomeJsonMessages(NUM_RECORDS);
62+
retryIfNotSuccessful(() -> assertMarkLogicDocumentsExistInCollection(ML_COLLECTION, NUM_RECORDS,
63+
format("Expected to find %d records in the ML database", NUM_RECORDS)));
64+
}
65+
66+
private void provisionKafkaWithConnectAndMarkLogicConnector() {
67+
kafka = provisionWith(
68+
newClusterConfig()
69+
.configure(
70+
kafkaConnect()
71+
.deployConnector(connectorConfig(TOPIC, KEY))
72+
.with(KafkaConfig$.MODULE$.NumPartitionsProp(), "5")
73+
)
74+
);
75+
}
76+
77+
private Properties connectorConfig(final String topic, final String key) {
78+
return MarkLogicSinkConnectorConfigBuilder.create()
79+
.withTopic(topic)
80+
.withKey(key)
81+
.with(MarkLogicSinkConfig.CONNECTION_HOST, testConfig.getHost())
82+
.with(MarkLogicSinkConfig.CONNECTION_PORT, testConfig.getRestPort())
83+
.with(MarkLogicSinkConfig.CONNECTION_USERNAME, testConfig.getUsername())
84+
.with(MarkLogicSinkConfig.CONNECTION_PASSWORD, testConfig.getPassword())
85+
.with(MarkLogicSinkConfig.DOCUMENT_COLLECTIONS, ML_COLLECTION)
86+
.with(MarkLogicSinkConfig.DMSDK_BATCH_SIZE, 1)
87+
.with(MarkLogicSinkConfig.DMSDK_TRANSFORM, "exampleTransform")
88+
.build();
89+
}
90+
91+
private void sendSomeJsonMessages(Integer numberOfRecords) throws InterruptedException {
92+
List<KeyValue<String, String>> records = new ArrayList<>();
93+
for (int i = 0; i < numberOfRecords; i++) {
94+
records.add(new KeyValue<>("aggregate", "{\"A\": \"" + i + "\"}"));
95+
}
96+
kafka.send(SendKeyValues.to(TOPIC, records));
97+
}
98+
99+
private void assertMarkLogicDocumentsExistInCollection(String collection, Integer numRecords, String message) {
100+
StructuredQueryBuilder qb = new StructuredQueryBuilder();
101+
StructuredQueryDefinition queryDefinition = qb.collection(collection).withCriteria("Chartreuse");
102+
SearchHandle results = getDatabaseClient().newQueryManager().search(queryDefinition, new SearchHandle());
103+
assertEquals(numRecords.longValue(), results.getTotalResults(), message);
104+
}
105+
}

0 commit comments

Comments
 (0)