Skip to content

Commit 1d6d3ee

Browse files
committed
Tweaks to Bulk Data Services feature
Did some testing and made improvements to the docs, modified logging a bit, and added the properties to marklogic-sink.properties.
1 parent 8aaed31 commit 1d6d3ee

File tree

4 files changed

+25
-7
lines changed

4 files changed

+25
-7
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,17 @@ name starts with `ml.document.`. These properties are included to support an end
337337
dynamic endpoint that is driven by the values of these properties instead of them being hardcoded within the
338338
endpoint module. This can allow for reusing the same endpoint module across multiple connector instances.
339339

340+
Note that when using this approach, none of the properties starting with the following prefixes will have any impact,
341+
as they are specific to the approach that uses DMSDK:
342+
343+
- `ml.datahub`
344+
- `ml.dmsdk`
345+
- `ml.document`
346+
- `ml.id.strategy`
347+
348+
As noted above, the `ml.document` properties will be included in `endpointConstants`, but they otherwise have no impact
349+
because the endpoint developer chooses whether to apply them or not in their endpoint module.
350+
340351
#### Configuring Bulk Data Services performance
341352

342353
MarkLogic's Bulk Data Services feature is designed to leverage the multi-threading and parallelization support provided

config/marklogic-sink.properties

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,11 @@ ml.dmsdk.threadCount=8
127127
# ml.datahub.flow.steps=
128128
# Set to true to log at the info level the response data from running a flow
129129
# ml.datahub.flow.logResponse=true
130+
131+
# Defines the URI of a Bulk Data Services API declaration. Requires that ml.connection.modulesDatabase be set. See the
132+
# user guide for more information on using Bulk Data Services instead of DMSDK for writing data to MarkLogic.
133+
# ml.sink.bulkds.apiUri=
134+
135+
# Required when using Bulk Data Services so that the API declaration can be retrieved from MarkLogic.
136+
# ml.connection.modulesDatabase=
137+

src/main/java/com/marklogic/kafka/connect/sink/BulkDataServicesSinkTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ protected void onStart(Map<String, Object> parsedConfig) {
6767
@Override
6868
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
6969
if (bulkInputCaller != null) {
70-
logger.info("Flushing BulkInputCaller");
70+
logger.debug("Flushing BulkInputCaller");
7171
bulkInputCaller.awaitCompletion();
72-
logger.info("Finished flushing BulkInputCaller");
72+
logger.debug("Finished flushing BulkInputCaller");
7373
}
7474
}
7575

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConfig.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ public class MarkLogicSinkConfig extends AbstractConfig {
9494
.define(SSL_MUTUAL_AUTH, Type.BOOLEAN, null, Importance.LOW,
9595
"Set this to true for 2-way SSL; defaults to 1-way SSL")
9696

97+
.define(BULK_DS_API_URI, Type.STRING, null, Importance.LOW,
98+
"Defines the URI of a Bulk Data Services API declaration. Requires that ml.connection.modulesDatabase be set. See the " +
99+
"user guide for more information on using Bulk Data Services instead of DMSDK for writing data to MarkLogic.")
100+
97101
.define(DOCUMENT_FORMAT, Type.STRING, null, Importance.MEDIUM,
98102
"Specify the format of each document; either 'JSON', 'XML', 'BINARY', 'TEXT', or 'UNKNOWN'. If not set, MarkLogic will determine the document type based on the ml.document.uriSuffix property.")
99103
.define(DOCUMENT_COLLECTIONS, Type.STRING, null, Importance.MEDIUM,
@@ -129,11 +133,6 @@ public class MarkLogicSinkConfig extends AbstractConfig {
129133
.define(DMSDK_INCLUDE_KAFKA_METADATA, Type.BOOLEAN, null, Importance.LOW,
130134
"Set to true so that Kafka record metadata is added to document metadata before it is written. If the document fails to be written, the Kafka record metadata will be logged as well.")
131135

132-
// TODO Need more info here on the API declaration itself?
133-
.define(BULK_DS_API_URI, Type.STRING, null, Importance.LOW,
134-
"Defines the URI of a Bulk Data Services API declaration. If set, all DMSDK properties will be ignored as Bulk Data Services will be used instead of DMSDK. " +
135-
"Also, ml.connection.modulesDatabase must be defined so that the API declaration can be retrieved from the modules database.")
136-
137136
.define(LOGGING_RECORD_KEY, Type.BOOLEAN, null, Importance.LOW,
138137
"Set to true to log at the info level the key of each record")
139138
.define(LOGGING_RECORD_HEADERS, Type.BOOLEAN, null, Importance.LOW,

0 commit comments

Comments
 (0)