Skip to content

Commit 0a7f067

Browse files
committed
Added the ml.dmsdk.transform and ml.dmsdk.transformParams properties because they must be declared.
Added an example using a server-side transform from the connector. Added a basic SJS transform and a shell command for installing the transform.
1 parent cf40b5c commit 0a7f067

File tree

4 files changed

+95
-0
lines changed

4 files changed

+95
-0
lines changed

config/marklogic-sink.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,6 @@ ml.document.uriPrefix=/kafka-data/
6868

6969
# Optional - a suffix to append to each URI
7070
ml.document.uriSuffix=.json
71+
72+
ml.dmsdk.transform=
73+
ml.dmsdk.transformParams=
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
curl --anyauth --user admin:admin -X PUT -i --data-binary @"./trans-ex.sjs" -H "Content-type: application/vnd.marklogic-javascript" 'http://ec2-54-173-231-36.compute-1.amazonaws.com:8000/LATEST/config/transforms/js-example'
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# Kafka-specific properties
2+
3+
name=marklogic-sink
4+
connector.class=com.marklogic.kafka.connect.sink.MarkLogicSinkConnector
5+
6+
# Should only need one task since it's using a WriteBatcher, which is multi-threaded
7+
tasks.max=1
8+
9+
# Topics to consume from [comma separated list for multiple topics]
10+
topics=marklogic
11+
12+
13+
# MarkLogic connector-specific properties
14+
15+
# A MarkLogic host to connect to. The connector uses the Data Movement SDK, and thus it will connect to each of the
16+
# hosts in a cluster.
17+
ml.connection.host=localhost
18+
19+
# The port of a REST API server to connect to.
20+
ml.connection.port=8000
21+
22+
# Optional - the name of a database to connect to. If your REST API server has a content database matching that of the
23+
# one that you want to write documents to, you do not need to set this.
24+
ml.connection.database=Documents
25+
26+
# Optional - set to "gateway" when using a load balancer, else leave blank. See https://docs.marklogic.com/guide/java/data-movement#id_26583 for more information.
27+
ml.connection.type=
28+
29+
# Either DIGEST, BASIC, CERTIFICATE, KERBEROS, or NONE
30+
ml.connection.securityContextType=DIGEST
31+
32+
# Set these based on the security context type defined above
33+
ml.connection.username=admin
34+
ml.connection.password=admin
35+
ml.connection.certFile=
36+
ml.connection.certPassword=
37+
ml.connection.externalName=
38+
39+
# Set to "true" for a "simple" SSL strategy that uses the JVM's default SslContext and X509TrustManager and a
40+
# "trust everything" HostnameVerifier. Further customization of an SSL connection via properties is not supported. If
41+
# you need to do so, consider using the source code for this connector as a starting point.
42+
ml.connection.simpleSsl=false
43+
44+
# Sets the number of documents to be written in a batch to MarkLogic. This may not have any impact depending on the
45+
# connector receives data from Kafka, as the connector calls flushAsync on the DMSDK WriteBatcher after processing every
46+
# collection of records. Thus, if the connector never receives at one time more than the value of this property, then
47+
# the value of this property will have no impact.
48+
ml.dmsdk.batchSize=100
49+
50+
# Sets the number of threads used by the Data Movement SDK for parallelizing writes to MarkLogic. Similar to the batch
51+
# size property above, this may never come into play depending on how many records the connector receives at once.
52+
ml.dmsdk.threadCount=8
53+
54+
# Optional - a comma-separated list of collections that each document should be written to
55+
ml.document.collections=kafka-data
56+
57+
# Optional - specify the format of each document; either JSON, XML, BINARY, TEXT, or UNKNOWN
58+
ml.document.format=JSON
59+
60+
# Optional - specify a mime type for each document; typically the format property above will be used instead of this
61+
ml.document.mimeType=
62+
63+
# Optional - a comma-separated list of roles and capabilities that define the permissions for each document written to MarkLogic
64+
ml.document.permissions=rest-reader,read,rest-writer,update
65+
66+
# Optional - a prefix to prepend to each URI; the URI itself is a UUID
67+
ml.document.uriPrefix=/kafka-data/
68+
69+
# Optional - a suffix to append to each URI
70+
ml.document.uriSuffix=.json
71+
72+
# Optional - specify a MarkLogic server-side transform and required parameters
73+
ml.dmsdk.transform=js-example
74+
ml.dmsdk.transformParams=
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
function insertTimestamp(context, params, content)
2+
{
3+
if (context.inputType.search('json') >= 0) {
4+
var result = content.toObject();
5+
if (context.acceptTypes) { /* read */
6+
result.readTimestamp = fn.currentDateTime();
7+
} else { /* write */
8+
result.writeTimestamp = fn.currentDateTime();
9+
}
10+
return result;
11+
} else {
12+
/* Pass thru for non-JSON documents */
13+
return content;
14+
}
15+
};
16+
17+
exports.transform = insertTimestamp;

0 commit comments

Comments
 (0)