Skip to content

Commit bb28f14

Browse files
BillFarberaclavio
andauthored
Develop (#43)
* Feature/enhanced logging (#39) * added options for logging incoming record keys and headers * added simple batch write failure listener * cleaned up logging messages for headers, and added hashed log messages. * Updates to the AWS CloudFormation Quickstart. Also, added documentation for using the connector with AWS MSK (from Biju). (#42) Co-authored-by: Anthony Clavio <[email protected]>
1 parent 5e1047b commit bb28f14

File tree

10 files changed

+150
-39
lines changed

10 files changed

+150
-39
lines changed

AWS-CloudFormation/CloudFormation-QuickStart.md

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ The three servers are the Kafka/Zookeeper server, the MarkLogic server, and a se
44

55
_This is not intended to be a description of setting up a production environment._
66

7-
_All the resource files have been configure and compiled specifically for this example.
7+
_All the resource files have been configured and compiled specifically for this example.
88
In particular, IP addresses are in the config files._
99

1010
## Requirements
@@ -31,40 +31,53 @@ Please note: if you are using the default values in the resource files, you will
3131

3232
## Build the AWS Resources
3333
1. Navigate to the [AWS CloudFormation](https://console.aws.amazon.com/cloudformation/home?region=us-east-1) page.
34-
1. Click "Create Stack"
34+
1. Click "Create Stack" and "With new resources (standard)"
3535
1. Click the "Upload a template file" radio button
3636
1. Click "Upload File" and upload "readyToGo.json"
3737
1. Click "Next"
3838
1. Give your stack a name, and click "Next"
3939
1. Click "Next"
4040
1. Click "Create Stack"
4141
1. Go to the list of your [AWS Instances](https://console.aws.amazon.com/ec2/v2/home?region=us-east-1#Instances:sort=tag:Name)
42-
1. Now wait until initialization is complete.
42+
1. <strong>Now wait until initialization is complete.<strong> (Instance Status Check says "2/2 checks passed")
43+
44+
## View the empty Kafka database in MarkLogic
45+
46+
1. On the list of your AWS instances, click on the instance named "instanceMarkLogicA".
47+
2. Copy the Public DNS (IPv4)
48+
3. In a browser, open the page http://\<public DNS>:8000
49+
* Enter the username "admin"
50+
* Enter the password "admin"
51+
* (If this page does not open for you, wait 5 minutes to allow server initialization to complete)
52+
4. In the QConsole UI, change the *Database* to "Kafka"
53+
5. Click *Explore* to verify that the database has no documents.
4354

4455
## Start the Connector
45-
1. On the list of your AWS Instances, click on the instance named, "instanceKafkaWorkerA".
56+
1. On the list of your AWS Instances, click on the instance named, "TemplateBased-Kafka-Worker-A".
4657
1. Copy the Public DNS (IPv4).
4758
1. Ssh to the TemplateBased-Kafka-Worker-A server.
48-
`ssh -i kafka.pem bitnami@<Public DNS>`
59+
```
60+
ssh -i kafka.pem bitnami@<Public DNS>
61+
```
4962
1. Start the connector
50-
`sudo /opt/bitnami/kafka/bin/connect-standalone.sh /opt/bitnami/kafka/config/marklogic-connect-standalone.properties /opt/bitnami/kafka/config/marklogic-sink.properties`
63+
```
64+
sudo /opt/bitnami/kafka/bin/connect-standalone.sh /opt/bitnami/kafka/config/marklogic-connect-standalone.properties /opt/bitnami/kafka/config/marklogic-sink.properties
65+
```
5166

5267
## Generate some Messages
5368
This step uses the JAR file from a small project for producing test messages. It can be found in [my GitHub account](https://github.com/BillFarber/KafkaProducer)
54-
1. On the list of your AWS Instances, click on the instance named, "instanceKafkaWorkerA".
69+
1. On the list of your AWS Instances, click on the instance named, "TemplateBased-Kafka-Worker-A".
5570
1. Copy the Public DNS (IPv4).
5671
1. Ssh to the TemplateBased-Kafka-Worker-A server.
57-
`ssh -i kafka.pem bitnami@<Public DNS>`
72+
```
73+
ssh -i kafka.pem bitnami@<Public DNS>
74+
```
5875
1. Send some messages to the Kafka topic
59-
`java -jar /home/bitnami/kafka-producer-1.0-SNAPSHOT.jar -c 4 -m 5 -h ip-172-31-48-44.ec2.internal:9092 -t marklogic`
76+
```
77+
java -jar /home/bitnami/kafka-producer-1.0-SNAPSHOT.jar -c 4 -m 5 -h ip-172-31-48-44.ec2.internal:9092 -t marklogic
78+
```
6079

6180
## View the Messages in MarkLogic
6281

63-
1. On the list of your AWS instances, click on the instance named "instanceMarkLogicA".
64-
2. Copy the Public DNS (IPv4)
65-
3. In a browser, open the page http://\<public DNS>:8000
66-
* Enter the username "admin"
67-
* Enter the password "admin"
68-
69-
4. In the QConsole UI, change the *Database* to "Kafka"
70-
5. Click *Explore* to view the documents you created from "Kafka-Worker-A"
82+
1. Go back to the MarkLogic QConsole web page (see previous instructions)
83+
1. Click *Explore* to view the documents you created from "Kafka-Worker-A"

AWS-CloudFormation/cloudFormationTemplates/readyToGo.json

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@
111111
"sudo wget https://cf-templates-3tc878fhthic-us-east-1.s3.amazonaws.com/Kafka-Initialization/kafka-producer-1.0-SNAPSHOT.jar -O /home/bitnami/kafka-producer-1.0-SNAPSHOT.jar\n",
112112
"sudo chmod 644 /home/bitnami/kafka-producer-1.0-SNAPSHOT.jar\n",
113113
"sudo mkdir -p /home/bitnami/kafka/config\n",
114-
"sudo wget https://cf-templates-3tc878fhthic-us-east-1.s3.amazonaws.com/Kafka-Initialization/kafka-connect-marklogic-0.9.0.jar -O /opt/bitnami/kafka/libs/kafka-connect-marklogic-0.9.0.jar\n",
114+
"sudo wget https://cf-templates-3tc878fhthic-us-east-1.s3.amazonaws.com/Kafka-Initialization/kafka-connect-marklogic-1.4.0.jar -O /opt/bitnami/kafka/libs/kafka-connect-marklogic-1.4.0.jar\n",
115115
"sudo wget https://cf-templates-3tc878fhthic-us-east-1.s3.amazonaws.com/Kafka-Initialization/marklogic-connect-standalone.properties -O /opt/bitnami/kafka/config/marklogic-connect-standalone.properties\n",
116116
"sudo wget https://cf-templates-3tc878fhthic-us-east-1.s3.amazonaws.com/Kafka-Initialization/marklogic-sink.properties -O /opt/bitnami/kafka/config/marklogic-sink.properties\n"
117117
]]}}
@@ -160,6 +160,7 @@
160160
],
161161
"UserData" : { "Fn::Base64" : { "Fn::Join" : ["", [
162162
"#!/bin/bash -xe\n",
163+
"sleep 180s\n",
163164
"sudo apt-get update && sudo apt-get -y upgrade\n",
164165
"sudo apt -y install sysstat\n",
165166
"sudo wget https://cf-templates-3tc878fhthic-us-east-1.s3.amazonaws.com/Kafka-Initialization/server.properties -O /opt/bitnami/kafka/config/server.properties\n",
@@ -221,16 +222,18 @@
221222
],
222223
"UserData" : { "Fn::Base64" : { "Fn::Join" : ["", [
223224
"#!/bin/bash -xe\n",
225+
"sleep 180s\n",
224226
"sudo yum -y update\n",
225227
"sudo echo \"MARKLOGIC_EC2_HOST=0\" > /etc/marklogic.conf\n",
226228
"sudo service MarkLogic start\n",
227229
"sudo yum install sysstat\n",
230+
"sleep 60s\n",
228231
"curl -X POST -d \"\" http://localhost:8001/admin/v1/init\n",
229-
"sleep 30s\n",
232+
"sleep 60s\n",
230233
"curl -i -X POST -d \"admin-username=admin&admin-password=admin&realm=public\" http://localhost:8001/admin/v1/instance-admin\n",
231-
"sleep 30s\n",
232-
"sudo echo \"<rest-api xmlns=\\\"http://marklogic.com/rest-api\\\"><name>Kafka</name><database>Kafka</database><port>8003</port></rest-api>\" > /etc/kafkaServer.conf\n",
233-
"curl --anyauth --user admin:admin -X POST -d@'/etc/kafkaServer.conf' -i -H \"Content-type: application/xml\" http://localhost:8002/v1/rest-apis\n"
234+
"sleep 60s\n",
235+
"sudo echo \"<rest-api xmlns=\\\"http://marklogic.com/rest-api\\\"><name>Kafka</name><database>Kafka</database><port>8003</port></rest-api>\" > /tmp/kafkaServer.conf\n",
236+
"curl --anyauth --user admin:admin -X POST -d@'/tmp/kafkaServer.conf' -i -H \"Content-type: application/xml\" http://localhost:8002/v1/rest-apis\n"
234237
]]}}
235238
},
236239
"Metadata": {
@@ -672,4 +675,4 @@
672675
}
673676
}
674677
}
675-
}
678+
}
-4.48 MB
Binary file not shown.
18.3 MB
Binary file not shown.

AWS-CloudFormation/s3Resources/marklogic-connect-standalone.properties

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,39 @@
1515

1616
# These are defaults. This file just demonstrates how to override some settings.
1717
bootstrap.servers=172.31.48.44:9092
18+
# (Optional) The group id provides a way to logically group connectors to distribute the load across multiple instances.
1819
group.id=marklogic
1920

20-
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
21-
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
21+
# The next two sections are necessary to establish an SSL connection to the Kafka servers.
22+
# To enable SSL, uncomment and customize the lines that start with "#* " in those two sections.
23+
24+
# SSL connection properties
25+
# For more information, see https://docs.confluent.io/current/kafka/encryption.html#encryption-ssl-connect
26+
# These top-level settings are used by the Connect worker for group coordination and to read and write to the internal
27+
# topics which are used to track the cluster's state (e.g. configs and offsets).
28+
#* security.protocol=SSL
29+
# You must create a truststore that contains either the server certificate or a trusted CA that signed the server cert.
30+
# This is how I did it with keytools:
31+
# keytool -keystore kafka.truststore.jks -alias caroot -import -file ca-cert -storepass "XXXXX" -keypass "XXXXX" -noprompt
32+
#* ssl.truststore.location=/secure/path/to/certs/kafka.client.truststore.jks
33+
#* ssl.truststore.password=truststorePassphrase
34+
# For now, turn off hostname verification since we're using self-signed certificates
35+
# This might also be fixable by fixing the "Subject Alternative Name (SAN)", but I'm not a cert expert.
36+
#* ssl.endpoint.identification.algorithm=
2237

23-
# Set the converters to the JsonConverter for JSON
24-
#key.converter=org.apache.kafka.connect.json.JsonConverter
25-
#value.converter=org.apache.kafka.connect.json.JsonConverter
38+
# Yes, both of these sections are required.
39+
# Connect workers manage the producers used by source connectors and the consumers used by sink connectors.
40+
# So, for the connectors to leverage security, you also have to override the default producer/consumer
41+
# configuration that the worker uses.
42+
#* consumer.bootstrap.servers=localhost:9093
43+
#* consumer.security.protocol=SSL
44+
#* consumer.ssl.truststore.location=/secure/path/to/certs/kafka.client.truststore.jks
45+
#* consumer.ssl.truststore.password=truststorePassphrase
46+
#* consumer.ssl.endpoint.identification.algorithm=
2647

27-
# Set the converters to the StringConverter for XML
48+
49+
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
50+
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
2851
key.converter=org.apache.kafka.connect.storage.StringConverter
2952
value.converter=org.apache.kafka.connect.storage.StringConverter
3053

AWS-CloudFormation/s3Resources/marklogic-sink.properties

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ ml.connection.port=8003
2323
# one that you want to write documents to, you do not need to set this.
2424
ml.connection.database=Kafka
2525

26-
# Optional - set to "gateway" when using a load balancer, else leave blank. See https://docs.marklogic.com/guide/java/data-movement#id_26583 for more information.
26+
# Optional - set to "gateway" when using a load balancer, else leave blank.
27+
# See https://docs.marklogic.com/guide/java/data-movement#id_26583 for more information.
2728
ml.connection.type=
2829

2930
# Either DIGEST, BASIC, CERTIFICATE, KERBEROS, or NONE
@@ -36,10 +37,15 @@ ml.connection.certFile=
3637
ml.connection.certPassword=
3738
ml.connection.externalName=
3839

39-
# Set to "true" for a "simple" SSL strategy that uses the JVM's default SslContext and X509TrustManager and a
40-
# "trust everything" HostnameVerifier. Further customization of an SSL connection via properties is not supported. If
41-
# you need to do so, consider using the source code for this connector as a starting point.
40+
# Set "ml.connection.simpleSsl" to "true" for a "simple" SSL strategy that uses the JVM's default SslContext and
41+
# X509TrustManager and a "trust everything" HostnameVerifier. Further customization of an SSL connection via properties
42+
# is not supported. If you need to do so, consider using the source code for this connector as a starting point.
4243
ml.connection.simpleSsl=false
44+
# You must also ensure that the server cert or the signing CA cert is imported in the JVMs cacerts file.
45+
# These commands may be used to get the server cert and to import it into your cacerts file.
46+
# Don't forget to customize the commands for your particular case.
47+
# openssl x509 -in <(openssl s_client -connect <server>:8004 -prexit 2>/dev/null) -out ~/example.crt
48+
# sudo keytool -importcert -file ~/example.crt -alias <server> -keystore /path/to/java/lib/security/cacerts -storepass <storepass-password>
4349

4450
# Sets the number of documents to be written in a batch to MarkLogic. This may not have any impact depending on the
4551
# connector receives data from Kafka, as the connector calls flushAsync on the DMSDK WriteBatcher after processing every
@@ -54,6 +60,9 @@ ml.dmsdk.threadCount=8
5460
# Optional - a comma-separated list of collections that each document should be written to
5561
ml.document.collections=kafka-data
5662

63+
# Optional - set this to true so that the name of the topic that the connector reads from is added as a collection to each document inserted by the connector
64+
ml.document.addTopicToCollections=false
65+
5766
# Optional - specify the format of each document; either JSON, XML, BINARY, TEXT, or UNKNOWN
5867
ml.document.format=JSON
5968

@@ -68,3 +77,30 @@ ml.document.uriPrefix=/kafka-data/
6877

6978
# Optional - a suffix to append to each URI
7079
ml.document.uriSuffix=.json
80+
81+
# Optional - name of a REST transform to use when writing documents
82+
# For Data Hub, can use mlRunIngest
83+
ml.dmsdk.transform=
84+
85+
# Optional - delimited set of transform names and values
86+
# Data Hub example = flow-name,ingestion_mapping_mastering-flow,step,1
87+
ml.dmsdk.transformParams=
88+
89+
# Optional - delimiter for transform parameter names and values
90+
ml.dmsdk.transformParamsDelimiter=,
91+
92+
# Properties for running a Data Hub flow
93+
# Using examples/dh-5-example in the DH project, could use the following config:
94+
# ml.datahub.flow.name=ingestion_mapping_mastering-flow
95+
# ml.datahub.flow.steps=2,3,4
96+
ml.datahub.flow.name=
97+
ml.datahub.flow.steps=
98+
# Whether or not the response data from running a flow should be logged at the info level
99+
ml.datahub.flow.logResponse=true
100+
101+
ml.id.strategy=
102+
ml.id.strategy.paths=
103+
ml.connection.enableCustomSsl=false
104+
ml.connection.customSsl.tlsVersion=
105+
ml.connection.customSsl.hostNameVerifier=
106+
ml.connection.customSsl.mutualAuth=false

config/marklogic-sink.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,7 @@ ml.datahub.flow.name=
9797
ml.datahub.flow.steps=
9898
# Whether or not the response data from running a flow should be logged at the info level
9999
ml.datahub.flow.logResponse=true
100+
101+
# Logging configurations
102+
ml.log.record.key=false
103+
ml.log.record.headers=false
1.71 MB
Binary file not shown.

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConfig.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,11 @@ public class MarkLogicSinkConfig extends AbstractConfig {
4646
public static final String TLS_VERSION = "ml.connection.customSsl.tlsVersion";
4747
public static final String SSL_HOST_VERIFIER = "ml.connection.customSsl.hostNameVerifier";
4848
public static final String SSL_MUTUAL_AUTH = "ml.connection.customSsl.mutualAuth";
49-
public static final String ID_STRATEGY = "ml.id.strategy";
49+
50+
public static final String LOGGING_RECORD_KEY = "ml.log.record.key";
51+
public static final String LOGGING_RECORD_HEADERS = "ml.log.record.headers";
52+
53+
public static final String ID_STRATEGY = "ml.id.strategy";
5054
public static final String ID_STRATEGY_PATH = "ml.id.strategy.paths";
5155

5256
public static ConfigDef CONFIG_DEF = new ConfigDef()
@@ -80,7 +84,11 @@ public class MarkLogicSinkConfig extends AbstractConfig {
8084
.define(TLS_VERSION, Type.STRING, Importance.LOW, "Version of TLS to connect to MarkLogic SSL enabled App server. Ex. TLSv1.2")
8185
.define(SSL_HOST_VERIFIER, Type.STRING, Importance.LOW, "The strictness of Host Verifier - ANY, COMMON, STRICT")
8286
.define(SSL_MUTUAL_AUTH, Type.BOOLEAN, Importance.LOW, "Mutual Authentication for Basic or Digest : true or false")
83-
.define(ID_STRATEGY, Type.STRING, Importance.LOW, "The ID Strategy for URI.")
87+
88+
.define(LOGGING_RECORD_KEY, Type.BOOLEAN, false, Importance.LOW, "Log incoming record keys")
89+
.define(LOGGING_RECORD_HEADERS, Type.BOOLEAN, false, Importance.LOW, "Log incoming record headers")
90+
91+
.define(ID_STRATEGY, Type.STRING, Importance.LOW, "The ID Strategy for URI.")
8492
.define(ID_STRATEGY_PATH, Type.STRING, Importance.LOW, "The JSON path for ID Strategy")
8593
;
8694

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkTask.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@
66
import com.marklogic.client.document.ServerTransform;
77
import com.marklogic.client.ext.DatabaseClientConfig;
88
import com.marklogic.client.ext.DefaultConfiguredDatabaseClientFactory;
9+
import com.marklogic.client.io.DocumentMetadataHandle;
10+
import com.marklogic.client.io.Format;
11+
import com.marklogic.client.io.StringHandle;
912
import com.marklogic.kafka.connect.DefaultDatabaseClientConfigBuilder;
1013
import org.apache.kafka.connect.sink.SinkRecord;
1114
import org.apache.kafka.connect.sink.SinkTask;
1215
import org.slf4j.Logger;
1316
import org.slf4j.LoggerFactory;
1417

15-
import java.util.Arrays;
16-
import java.util.Collection;
17-
import java.util.List;
18-
import java.util.Map;
18+
import java.util.*;
1919
import java.io.IOException;
2020

2121
/**
@@ -30,11 +30,16 @@ public class MarkLogicSinkTask extends SinkTask {
3030
private DataMovementManager dataMovementManager;
3131
private WriteBatcher writeBatcher;
3232
private SinkRecordConverter sinkRecordConverter;
33+
private boolean logKeys = false;
34+
private boolean logHeaders = false;
3335

3436
@Override
3537
public void start(final Map<String, String> config) {
3638
logger.info("Starting");
3739

40+
logKeys = Boolean.parseBoolean(config.get(MarkLogicSinkConfig.LOGGING_RECORD_KEY));
41+
logHeaders = Boolean.parseBoolean(config.get(MarkLogicSinkConfig.LOGGING_RECORD_HEADERS));
42+
3843
sinkRecordConverter = new DefaultSinkRecordConverter(config);
3944

4045
DatabaseClientConfig databaseClientConfig = new DefaultDatabaseClientConfigBuilder().buildDatabaseClientConfig(config);
@@ -50,6 +55,12 @@ public void start(final Map<String, String> config) {
5055
writeBatcher.withTransform(transform);
5156
}
5257

58+
writeBatcher.onBatchFailure((writeBatch, throwable) -> {
59+
int batchSize = writeBatch.getItems().length;
60+
logger.error("#error failed to write {} records", batchSize);
61+
logger.error("#error batch failure:", throwable);
62+
});
63+
5364
final String flowName = config.get(MarkLogicSinkConfig.DATAHUB_FLOW_NAME);
5465
if (flowName != null && flowName.trim().length() > 0) {
5566
writeBatcher.onBatchSuccess(buildSuccessListener(flowName, config, databaseClientConfig));
@@ -147,10 +158,23 @@ public void put(final Collection<SinkRecord> records) {
147158
return;
148159
}
149160

161+
final List<String> headers = new ArrayList<>();
162+
150163
records.forEach(record -> {
164+
151165
if (record == null) {
152166
logger.warn("Skipping null record object.");
153167
} else {
168+
if (logKeys) {
169+
logger.info("#record key {}", record.key());
170+
}
171+
if (logHeaders) {
172+
headers.clear();
173+
record.headers().forEach(header -> {
174+
headers.add(String.format("%s:%s", header.key(), header.value().toString()));
175+
});
176+
logger.info("#record headers: {}", headers);
177+
}
154178
if (logger.isDebugEnabled()) {
155179
logger.debug("Processing record value {} in topic {}", record.value(), record.topic());
156180
}

0 commit comments

Comments
 (0)