Skip to content

Commit b1cdb41

Browse files
authored
Merge pull request #78 from marklogic-community/feature/bulkds-improvements
Reworked Bulk DS approach to avoid reading from modules database
2 parents f8d2612 + e31153a commit b1cdb41

File tree

9 files changed

+96
-103
lines changed

9 files changed

+96
-103
lines changed

README.md

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,12 @@ recommended only for users with experience in writing and deploying custom code
164164
### Writing data via configuration (DMSDK)
165165

166166
The intent behind using DMSDK with the MarkLogic REST API is that as many aspects of writing data to MarkLogic can be
167-
controlled via properties without the need to write any code. The following sections describe the various ways in
168-
which this can be achieved.
167+
controlled via properties without the need to write any code.
168+
169+
#### Security requirements
170+
171+
The user that the connector authenticates as must have the `rest-reader` and `rest-writer` privileges in order to
172+
write data via the MarkLogic REST API, which the connector depends upon.
169173

170174
#### Configuring document URIs
171175

@@ -268,13 +272,23 @@ typically a dataflow framework like Kafka that can support multiple workers writ
268272
MarkLogic Kafka connector utilizes Bulk Data Services to send Kafka record data to a custom endpoint in which a
269273
developer can write any code they like to control how the data is processed.
270274

271-
Configuring the MarkLogic Kafka connector to use Bulk Data Services requires the following two properties:
275+
#### Security requirements
276+
277+
Unlike when using the MarkLogic REST API, no specific privileges or roles are required in order for the connector to
278+
invoke a Bulk Data Services endpoint. Instead, the required privileges and/or roles for the MarkLogic user that the
279+
connector authenticates as will be entirely determined by the Bulk Data Services endpoint implementation.
272280

273-
- `ml.sink.bulkds.apiUri` = the URI of the Bulk Data Services API declaration
274-
- `ml.connection.modulesDatabase` = the name of the modules database containing both the API declaration and the
275-
endpoint module that it refers to
281+
#### Configuring Bulk Data Services usage
276282

277-
The MarkLogic Kafka connector expects the API declaration to be:
283+
Configuring the MarkLogic Kafka connector to use Bulk Data Services involves the following properties:
284+
285+
- `ml.sink.bulkds.endpointUri` = the URI of the Bulk Data Services endpoint module
286+
- `ml.sink.bulkds.batchSize` = an optional batch size; defaults to 100. Note that if you include `$bulk/inputBatchSize`
287+
in your API declaration, it will be ignored in favor of this property.
288+
289+
Bulk Data Services requires that your endpoint module have an API declaration. The URI of the API declaration must
290+
match that of your endpoint, but with `.api` as a suffix instead of `.sjs` or `.xqy`. The MarkLogic Kafka connector
291+
expects the API declaration to have the following configuration:
278292

279293
```
280294
{
@@ -292,15 +306,11 @@ The MarkLogic Kafka connector expects the API declaration to be:
292306
"multiple": true,
293307
"nullable": true
294308
}
295-
],
296-
"$bulk": {
297-
"inputBatchSize": 100
298-
}
309+
]
299310
}
300311
```
301312

302-
The `endpoint` field must be the URI of the associated Bulk Data Services endpoint module. You are free to set the
303-
`inputBatchSize` to any numeric value you want based on the expected workload for your connector.
313+
The `endpoint` field should have the same value as the `ml.sink.bulkds.endpointUri` property.
304314

305315
It is recommended to start your endpoint module implementation with the following code:
306316

@@ -357,15 +367,12 @@ in parallel.
357367

358368
A key design feature of Bulk Data Services to understand is that, unlike MarkLogic's Data Movement SDK, it does not
359369
support asynchronous flushing of data. Bulk Data Services will not write any data to MarkLogic until it has a number of
360-
documents equalling that of the batch size in the API declaration. Importantly, partial batches will not be written
361-
until either enough records are received to meet the batch size, or until Kafka invokes the "flush" operation on the
370+
documents equalling that of the `ml.sink.bulkds.batchSize` property. Importantly, partial batches will not be written
371+
until either enough records are received to meet the batch size, or until Kafka invokes the `flush` operation on the
362372
MarkLogic Kafka connector. You can use the Kafka connector property named `offset.flush.interval` to control how often
363373
the flush operation is invoked. This is a synchronous operation, but you may wish to have this occur fairly regularly,
364374
such as every 5 or 10 seconds, to ensure that partial batches of data are not waiting too long to be written to
365375
MarkLogic.
366376

367-
You may also want to configure the `inputBatchSize` field in your API declaration to see if increasing or decreasing
368-
this value results in greater performance.
369-
370377
As always with MarkLogic applications, use the [MarkLogic Monitoring dashboard](https://docs.marklogic.com/guide/monitoring/intro)
371378
to understand resource consumption and server performance while testing various connector settings.

config/marklogic-sink.properties

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,10 @@ ml.dmsdk.threadCount=8
128128
# Set to true to log at the info level the response data from running a flow
129129
# ml.datahub.flow.logResponse=true
130130

131-
# Defines the URI of a Bulk Data Services API declaration. Requires that ml.connection.modulesDatabase be set. See the
132-
# user guide for more information on using Bulk Data Services instead of DMSDK for writing data to MarkLogic.
133-
# ml.sink.bulkds.apiUri=
134-
135-
# Required when using Bulk Data Services so that the API declaration can be retrieved from MarkLogic.
136-
# ml.connection.modulesDatabase=
131+
# Defines the URI of a Bulk Data Services endpoint for writing data.
132+
# See the user guide for more information on using Bulk Data Services instead of DMSDK for writing data to MarkLogic.
133+
# ml.sink.bulkds.endpointUri=
137134

135+
# Sets the number of documents to be sent in a batch to the Bulk Data Services endpoint. The connector will not send any
136+
# documents to MarkLogic until it has a number matching this property or until Kafka invokes the 'flush' operation on the connector.
137+
# ml.sink.bulkds.batchSize=100

gradle.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,3 @@ mlAppName=kafka-test
2222
mlUsername=admin
2323
mlPassword=changeme-in-gradle-local.properties
2424
mlContentForestsPerHost=1
25-
mlModulePermissions=rest-extension-user,read,rest-extension-user,execute,rest-admin,update

src/main/java/com/marklogic/kafka/connect/sink/BulkDataServicesSinkTask.java

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.fasterxml.jackson.databind.JsonNode;
44
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.databind.node.ArrayNode;
56
import com.fasterxml.jackson.databind.node.ObjectNode;
67
import com.marklogic.client.DatabaseClient;
78
import com.marklogic.client.dataservices.IOEndpoint;
@@ -43,7 +44,7 @@ protected void onStart(Map<String, Object> parsedConfig) {
4344
DatabaseClientConfig databaseClientConfig = new DefaultDatabaseClientConfigBuilder().buildDatabaseClientConfig(parsedConfig);
4445
this.databaseClient = new DefaultConfiguredDatabaseClientFactory().newDatabaseClient(databaseClientConfig);
4546

46-
JacksonHandle modulesHandle = readApiDeclarationFromMarkLogic(parsedConfig, databaseClientConfig);
47+
JacksonHandle modulesHandle = new JacksonHandle(buildApiDeclaration(parsedConfig));
4748
InputCaller<JsonNode> inputCaller = InputCaller.on(databaseClient, modulesHandle, new JacksonHandle().withFormat(Format.JSON));
4849

4950
IOEndpoint.CallContext callContext = inputCaller.newCallContext()
@@ -95,35 +96,35 @@ protected void writeSinkRecord(SinkRecord sinkRecord) {
9596
}
9697

9798
/**
98-
* Bulk Data Services requires that the API declaration exist in the modules database associated with the app
99-
* server that the connector will talk to.
99+
* Build an API declaration based on the user inputs for an endpoint URI and optional batch size. It's feasible to
100+
* do this because the connector knows what the {@code params} array must be, and the documentation instructs the
101+
* endpoint developer to use the same array of parameters. Building the API here also avoids having to read it
102+
* from a modules database which requires either the xdmp-eval-in or xdbc-eval privilege.
100103
*
101104
* @param parsedConfig
102-
* @param databaseClientConfig
103105
* @return
104106
*/
105-
private JacksonHandle readApiDeclarationFromMarkLogic(Map<String, Object> parsedConfig, DatabaseClientConfig databaseClientConfig) {
106-
final String bulkApiUri = (String) parsedConfig.get(MarkLogicSinkConfig.BULK_DS_API_URI);
107-
final String modulesDatabase = (String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_MODULES_DATABASE);
108-
if (!StringUtils.hasText(modulesDatabase)) {
109-
throw new IllegalArgumentException("Cannot read Bulk Data Services API declaration at URI: " + bulkApiUri
110-
+ "; no modules database configured. Please set the "
111-
+ MarkLogicSinkConfig.CONNECTION_MODULES_DATABASE + " property.");
112-
}
113-
114-
final String originalDatabase = databaseClientConfig.getDatabase();
115-
try {
116-
databaseClientConfig.setDatabase(modulesDatabase);
117-
DatabaseClient modulesClient = new DefaultConfiguredDatabaseClientFactory().newDatabaseClient(databaseClientConfig);
118-
return modulesClient.newJSONDocumentManager().read(bulkApiUri, new JacksonHandle().withFormat(Format.JSON));
119-
} catch (Exception ex) {
120-
// The stacktrace isn't of any value here for a user; the message below will provide sufficient information
121-
// for debugging
122-
throw new RuntimeException("Unable to read Bulk Data Services API declaration at URI: " + bulkApiUri +
123-
"; modules database: " + modulesDatabase + "; cause: " + ex.getMessage());
124-
} finally {
125-
databaseClientConfig.setDatabase(originalDatabase);
107+
private JsonNode buildApiDeclaration(Map<String, Object> parsedConfig) {
108+
final String endpoint = parsedConfig.get(MarkLogicSinkConfig.BULK_DS_ENDPOINT_URI).toString();
109+
110+
ObjectNode api = this.objectMapper.createObjectNode();
111+
api.put("endpoint", endpoint);
112+
ArrayNode params = api.putArray("params");
113+
ObjectNode param = params.addObject();
114+
param.put("name", "endpointConstants");
115+
param.put("datatype", "jsonDocument");
116+
param.put("multiple", false);
117+
param.put("nullable", false);
118+
param = params.addObject();
119+
param.put("name", "input");
120+
param.put("datatype", "jsonDocument");
121+
param.put("multiple", true);
122+
param.put("nullable", true);
123+
if (parsedConfig.containsKey(MarkLogicSinkConfig.BULK_DS_BATCH_SIZE)) {
124+
int batchSize = Integer.parseInt(parsedConfig.get(MarkLogicSinkConfig.BULK_DS_BATCH_SIZE).toString());
125+
api.putObject("$bulk").put("inputBatchSize", batchSize);
126126
}
127+
return api;
127128
}
128129

129130
/**

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConfig.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ public class MarkLogicSinkConfig extends AbstractConfig {
1515
public static final String CONNECTION_HOST = "ml.connection.host";
1616
public static final String CONNECTION_PORT = "ml.connection.port";
1717
public static final String CONNECTION_DATABASE = "ml.connection.database";
18-
public static final String CONNECTION_MODULES_DATABASE = "ml.connection.modulesDatabase";
1918
public static final String CONNECTION_SECURITY_CONTEXT_TYPE = "ml.connection.securityContextType";
2019
public static final String CONNECTION_USERNAME = "ml.connection.username";
2120
public static final String CONNECTION_PASSWORD = "ml.connection.password";
@@ -36,7 +35,8 @@ public class MarkLogicSinkConfig extends AbstractConfig {
3635
public static final String DMSDK_TRANSFORM_PARAMS_DELIMITER = "ml.dmsdk.transformParamsDelimiter";
3736
public static final String DMSDK_INCLUDE_KAFKA_METADATA = "ml.dmsdk.includeKafkaMetadata";
3837

39-
public static final String BULK_DS_API_URI = "ml.sink.bulkds.apiUri";
38+
public static final String BULK_DS_ENDPOINT_URI = "ml.sink.bulkds.endpointUri";
39+
public static final String BULK_DS_BATCH_SIZE = "ml.sink.bulkds.batchSize";
4040

4141
public static final String DOCUMENT_COLLECTIONS_ADD_TOPIC = "ml.document.addTopicToCollections";
4242
public static final String DOCUMENT_COLLECTIONS = "ml.document.collections";
@@ -77,8 +77,6 @@ public class MarkLogicSinkConfig extends AbstractConfig {
7777
"External name for 'KERBEROS' authentication")
7878
.define(CONNECTION_DATABASE, Type.STRING, null, Importance.LOW,
7979
"Name of a database to connect to. If your REST API server has a content database matching that of the one that you want to write documents to, you do not need to set this.")
80-
.define(CONNECTION_MODULES_DATABASE, Type.STRING, null, Importance.MEDIUM,
81-
"Name of the modules database associated with the app server; required if using Bulk Data Services so that the API module can be retrieved")
8280
.define(CONNECTION_TYPE, Type.STRING, null, Importance.MEDIUM,
8381
"Set to 'GATEWAY' when the host identified by ml.connection.host is a load balancer. See https://docs.marklogic.com/guide/java/data-movement#id_26583 for more information.")
8482
// Boolean fields must have a default value of null; otherwise, Confluent Platform, at least in version 7.2.1,
@@ -94,9 +92,12 @@ public class MarkLogicSinkConfig extends AbstractConfig {
9492
.define(SSL_MUTUAL_AUTH, Type.BOOLEAN, null, Importance.LOW,
9593
"Set this to true for 2-way SSL; defaults to 1-way SSL")
9694

97-
.define(BULK_DS_API_URI, Type.STRING, null, Importance.LOW,
98-
"Defines the URI of a Bulk Data Services API declaration. Requires that ml.connection.modulesDatabase be set. See the " +
99-
"user guide for more information on using Bulk Data Services instead of DMSDK for writing data to MarkLogic.")
95+
.define(BULK_DS_ENDPOINT_URI, Type.STRING, null, Importance.LOW,
96+
"Defines the URI of a Bulk Data Services endpoint for writing data. " +
97+
"See the user guide for more information on using Bulk Data Services instead of DMSDK for writing data to MarkLogic.")
98+
.define(BULK_DS_BATCH_SIZE, Type.INT, 100, Importance.LOW,
99+
"Sets the number of documents to be sent in a batch to the Bulk Data Services endpoint. The connector will not send any " +
100+
"documents to MarkLogic until it has a number matching this property or until Kafka invokes the 'flush' operation on the connector.")
100101

101102
.define(DOCUMENT_FORMAT, Type.STRING, null, Importance.MEDIUM,
102103
"Specify the format of each document; either 'JSON', 'XML', 'BINARY', 'TEXT', or 'UNKNOWN'. If not set, MarkLogic will determine the document type based on the ml.document.uriSuffix property.")

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void stop() {
3636

3737
@Override
3838
public Class<? extends Task> taskClass() {
39-
Class clazz = StringUtils.hasText(this.config.get(MarkLogicSinkConfig.BULK_DS_API_URI)) ?
39+
Class clazz = StringUtils.hasText(this.config.get(MarkLogicSinkConfig.BULK_DS_ENDPOINT_URI)) ?
4040
BulkDataServicesSinkTask.class :
4141
WriteBatcherSinkTask.class;
4242
LoggerFactory.getLogger(getClass()).info("Task class: " + clazz);

src/test/java/com/marklogic/kafka/connect/sink/WriteViaBulkDataServicesTest.java

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,20 @@
77

88
import java.util.Arrays;
99

10-
import static org.junit.jupiter.api.Assertions.*;
10+
import static org.junit.jupiter.api.Assertions.assertEquals;
11+
import static org.junit.jupiter.api.Assertions.assertNotNull;
1112

1213
public class WriteViaBulkDataServicesTest extends AbstractIntegrationTest {
1314

1415
private final static String TEST_COLLECTION = "bulk-ds-test";
15-
private final static String TEST_BULK_API_URI = "/example/bulk-endpoint.api";
16-
private final static String MODULES_DATABASE = "kafka-test-modules";
16+
private final static String TEST_BULK_ENDPOINT_URI = "/example/bulk-endpoint.sjs";
1717

1818
@Test
1919
void twoBatches() {
2020
AbstractSinkTask task = startSinkTask(
21-
MarkLogicSinkConfig.BULK_DS_API_URI, TEST_BULK_API_URI,
22-
MarkLogicSinkConfig.CONNECTION_MODULES_DATABASE, MODULES_DATABASE,
21+
MarkLogicSinkConfig.BULK_DS_ENDPOINT_URI, TEST_BULK_ENDPOINT_URI,
22+
MarkLogicSinkConfig.BULK_DS_BATCH_SIZE, "3",
23+
2324
// Turning on logging here to get coverage of this code; not possibly to verify the output via assertions
2425
MarkLogicSinkConfig.LOGGING_RECORD_HEADERS, "true",
2526
MarkLogicSinkConfig.LOGGING_RECORD_KEY, "true"
@@ -41,11 +42,29 @@ void twoBatches() {
4142
assertCollectionSize("All 4 records should have been written, as the flush call will TBD", TEST_COLLECTION, 4);
4243
}
4344

45+
@Test
46+
void defaultBatchSize() {
47+
AbstractSinkTask task = startSinkTask(
48+
MarkLogicSinkConfig.BULK_DS_ENDPOINT_URI, TEST_BULK_ENDPOINT_URI
49+
);
50+
51+
for (int i = 1; i < 100; i++) {
52+
task.put(Arrays.asList(newSinkRecord("<anything/>")));
53+
}
54+
assertCollectionSize("The default batch size is expected to be 100, so no docs should have been written since " +
55+
"we've only put 99", TEST_COLLECTION, 0);
56+
57+
task.put(Arrays.asList(newSinkRecord("<anything/>")));
58+
assertCollectionSize(TEST_COLLECTION, 100);
59+
60+
task.put(Arrays.asList(newSinkRecord("<anything/>")));
61+
assertCollectionSize(TEST_COLLECTION, 100);
62+
}
63+
4464
@Test
4565
void verifyEndpointConstants() {
4666
AbstractSinkTask task = startSinkTask(
47-
MarkLogicSinkConfig.BULK_DS_API_URI, TEST_BULK_API_URI,
48-
MarkLogicSinkConfig.CONNECTION_MODULES_DATABASE, MODULES_DATABASE,
67+
MarkLogicSinkConfig.BULK_DS_ENDPOINT_URI, TEST_BULK_ENDPOINT_URI,
4968
MarkLogicSinkConfig.DOCUMENT_COLLECTIONS, "json-test",
5069
MarkLogicSinkConfig.DOCUMENT_URI_PREFIX, "/json/",
5170
MarkLogicSinkConfig.DOCUMENT_URI_SUFFIX, ".json",
@@ -92,8 +111,7 @@ void verifyEndpointConstants() {
92111
@Test
93112
void verifyKafkaMetadata() {
94113
AbstractSinkTask task = startSinkTask(
95-
MarkLogicSinkConfig.BULK_DS_API_URI, TEST_BULK_API_URI,
96-
MarkLogicSinkConfig.CONNECTION_MODULES_DATABASE, MODULES_DATABASE
114+
MarkLogicSinkConfig.BULK_DS_ENDPOINT_URI, TEST_BULK_ENDPOINT_URI
97115
);
98116

99117
final String topic = "topic1";
@@ -119,32 +137,10 @@ void verifyKafkaMetadata() {
119137
assertEquals(timestamp, metadata.get("timestamp").asLong());
120138
}
121139

122-
@Test
123-
void missingModulesDatabase() {
124-
IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> startSinkTask(
125-
MarkLogicSinkConfig.BULK_DS_API_URI, TEST_BULK_API_URI
126-
));
127-
assertTrue(ex.getMessage().contains("Cannot read Bulk Data Services API declaration"),
128-
"Unexpected error: " + ex.getMessage());
129-
}
130-
131-
@Test
132-
void invalidApiUri() {
133-
RuntimeException ex = assertThrows(RuntimeException.class, () -> startSinkTask(
134-
MarkLogicSinkConfig.BULK_DS_API_URI, "/this-doesnt-exist.api",
135-
MarkLogicSinkConfig.CONNECTION_MODULES_DATABASE, MODULES_DATABASE
136-
));
137-
String message = ex.getMessage();
138-
assertTrue(message.startsWith("Unable to read Bulk Data Services API declaration at URI: /this-doesnt-exist.api"),
139-
"Unexpected message: " + message);
140-
assertTrue(message.contains("Could not read non-existent document"), "Unexpected message: " + message);
141-
}
142-
143140
@Test
144141
void badBulkEndpoint() {
145142
AbstractSinkTask task = startSinkTask(
146-
MarkLogicSinkConfig.BULK_DS_API_URI, "/example/bad-bulk-endpoint.api",
147-
MarkLogicSinkConfig.CONNECTION_MODULES_DATABASE, MODULES_DATABASE
143+
MarkLogicSinkConfig.BULK_DS_ENDPOINT_URI, "/example/bad-bulk-endpoint.sjs"
148144
);
149145

150146
putAndFlushRecords(task, newSinkRecord("content-doesnt-matter"));

src/test/ml-config/security/roles/minimal-user-role.json

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
{
22
"role-name": "kafka-test-minimal-user",
3-
"description": "rest-reader/rest-writer privileges are needed to read forest info and write documents; rest-extension-user, any-uri, unprotected-collections, and xdmp-eval-in is needed for Bulk DS to read the API declaration",
4-
"role": [
5-
"rest-extension-user"
6-
],
3+
"description": "rest-reader/rest-writer privileges are needed to read forest info and write documents; any-uri and unprotected-collections are needed by the test endpoint to insert a document",
74
"privilege": [
85
{
96
"privilege-name": "rest-reader",
@@ -24,11 +21,6 @@
2421
"privilege-name": "unprotected-collections",
2522
"action": "http://marklogic.com/xdmp/privileges/unprotected-collections",
2623
"kind": "execute"
27-
},
28-
{
29-
"privilege-name": "xdmp:eval-in",
30-
"action": "http://marklogic.com/xdmp/privileges/xdmp-eval-in",
31-
"kind": "execute"
3224
}
3325
]
3426
}

0 commit comments

Comments
 (0)