Skip to content

Commit c1667f8

Browse files
committed
Added support for Bulk Data Services
Resolves #3 . Did a lot of refactoring (described below) to support this, as my initial attempt to cram this into MarkLogicSinkTask resulted in a too-large, too-complicated class. - Extracted AbstractSinkTask and WriteBatcherSinkTask to capture what MarkLogicSinkTask - Added BulkDataServicesSinkTask to capture the new functionality - MarkLogicSinkConnector now looks at the user config to determine what type of task to create. And all of our integration tests now start with this class. - Moved WriteFailureHandler from its own class into WriteBatcherSinkTask since it is not intended to be used by any other class - Renamed BuildSuccessListenerTest to BuildRunFlowListenerTest to make it more self-documenting - Moved a couple of tests from ConvertSinkRecordTest into HandleInvalidSinkRecordTest because they really needed to be integration tests
1 parent 60d66fa commit c1667f8

21 files changed

+733
-194
lines changed

gradle.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ confluentHome=
1515

1616
# Only used for testing
1717
mlConfigPaths=src/test/ml-config
18+
mlModulePaths=src/test/ml-modules
1819
mlRestPort=8018
1920
mlTestRestPort=8019
2021
mlAppName=kafka-test
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package com.marklogic.kafka.connect.sink;
2+
3+
import com.marklogic.kafka.connect.ConfigUtil;
4+
import org.apache.kafka.connect.sink.SinkRecord;
5+
import org.apache.kafka.connect.sink.SinkTask;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.util.ArrayList;
10+
import java.util.Collection;
11+
import java.util.List;
12+
import java.util.Map;
13+
14+
/**
15+
* Base class for concrete SinkTask implementations, providing some generic functionality.
16+
*/
17+
abstract class AbstractSinkTask extends SinkTask {
18+
19+
protected final Logger logger = LoggerFactory.getLogger(getClass());
20+
21+
private boolean logKeys = false;
22+
private boolean logHeaders = false;
23+
24+
/**
25+
* Subclasses implement this to pull their necessary config from Kafka. Invoked by the {@code start} method.
26+
*
27+
* @param parsedConfig
28+
*/
29+
protected abstract void onStart(Map<String, Object> parsedConfig);
30+
31+
/**
32+
* Subclasses implement this to determine how to write each {@code SinkRecord}. This is invoked by the
33+
* {@code put} method, which subclasses can override if necessary - e.g. to provide their own behavior after all
34+
* records have been processed.
35+
*
36+
* @param sinkRecord
37+
*/
38+
protected abstract void writeSinkRecord(SinkRecord sinkRecord);
39+
40+
/**
41+
* Required for a Kafka task.
42+
*
43+
* @return
44+
*/
45+
@Override
46+
public String version() {
47+
return MarkLogicSinkConnector.MARKLOGIC_SINK_CONNECTOR_VERSION;
48+
}
49+
50+
/**
51+
* Invoked by Kafka when the connector is started by Kafka Connect.
52+
*
53+
* @param config initial configuration
54+
*/
55+
@Override
56+
public final void start(Map<String, String> config) {
57+
logger.info("Starting");
58+
Map<String, Object> parsedConfig = MarkLogicSinkConfig.CONFIG_DEF.parse(config);
59+
logKeys = ConfigUtil.getBoolean(MarkLogicSinkConfig.LOGGING_RECORD_KEY, parsedConfig);
60+
logHeaders = ConfigUtil.getBoolean(MarkLogicSinkConfig.LOGGING_RECORD_HEADERS, parsedConfig);
61+
this.onStart(parsedConfig);
62+
logger.info("Started");
63+
}
64+
65+
/**
66+
* Invoked by Kafka each time it determines that it has data to send to a connector.
67+
*
68+
* @param records the set of records to send
69+
*/
70+
@Override
71+
public void put(Collection<SinkRecord> records) {
72+
records.forEach(record -> {
73+
// It is not known if either of these scenarios will ever occur; it would seem that Kafka would never pass
74+
// a null record nor a record with a null value to a connector.
75+
if (record == null) {
76+
logger.debug("Skipping null record");
77+
} else if (record.value() == null) {
78+
logger.debug("Skipping record with null value");
79+
} else {
80+
logRecordBeforeWriting(record);
81+
try {
82+
this.writeSinkRecord(record);
83+
} catch (Exception ex) {
84+
// Including the stacktrace here as this could happen for a variety of reasons
85+
throw new RuntimeException("Unable to write sink record; record offset: " + record.kafkaOffset() +
86+
"cause: " + ex.getMessage(), ex);
87+
}
88+
}
89+
});
90+
}
91+
92+
private void logRecordBeforeWriting(SinkRecord record) {
93+
if (logKeys && record.key() != null) {
94+
logger.info("Record key {}", record.key());
95+
}
96+
if (logHeaders) {
97+
List<String> headers = new ArrayList<>();
98+
record.headers().forEach(header -> {
99+
headers.add(String.format("%s:%s", header.key(), header.value().toString()));
100+
});
101+
logger.info("Record headers: {}", headers);
102+
}
103+
if (logger.isDebugEnabled()) {
104+
logger.debug("Processing record value {} in topic {}", record.value(), record.topic());
105+
}
106+
}
107+
}
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
package com.marklogic.kafka.connect.sink;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.databind.node.ObjectNode;
6+
import com.marklogic.client.DatabaseClient;
7+
import com.marklogic.client.dataservices.IOEndpoint;
8+
import com.marklogic.client.dataservices.InputCaller;
9+
import com.marklogic.client.document.DocumentWriteOperation;
10+
import com.marklogic.client.ext.DatabaseClientConfig;
11+
import com.marklogic.client.ext.DefaultConfiguredDatabaseClientFactory;
12+
import com.marklogic.client.io.BytesHandle;
13+
import com.marklogic.client.io.Format;
14+
import com.marklogic.client.io.JacksonHandle;
15+
import com.marklogic.client.io.StringHandle;
16+
import com.marklogic.client.io.marker.AbstractWriteHandle;
17+
import com.marklogic.kafka.connect.DefaultDatabaseClientConfigBuilder;
18+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
19+
import org.apache.kafka.common.TopicPartition;
20+
import org.apache.kafka.connect.sink.SinkRecord;
21+
import org.springframework.util.StringUtils;
22+
23+
import java.util.Map;
24+
25+
/**
26+
* Uses Bulk Data Services - https://github.com/marklogic/java-client-api/wiki/Bulk-Data-Services - to allow the user
27+
* to provide their own endpoint implementation, thus giving the user full control over how data is written to
28+
* MarkLogic.
29+
*/
30+
class BulkDataServicesSinkTask extends AbstractSinkTask {
31+
32+
private DatabaseClient databaseClient;
33+
private InputCaller.BulkInputCaller<JsonNode> bulkInputCaller;
34+
private ObjectMapper objectMapper;
35+
private SinkRecordConverter sinkRecordConverter;
36+
37+
public BulkDataServicesSinkTask() {
38+
this.objectMapper = new ObjectMapper();
39+
}
40+
41+
@Override
42+
protected void onStart(Map<String, Object> parsedConfig) {
43+
DatabaseClientConfig databaseClientConfig = new DefaultDatabaseClientConfigBuilder().buildDatabaseClientConfig(parsedConfig);
44+
this.databaseClient = new DefaultConfiguredDatabaseClientFactory().newDatabaseClient(databaseClientConfig);
45+
46+
JacksonHandle modulesHandle = readApiDeclarationFromMarkLogic(parsedConfig, databaseClientConfig);
47+
InputCaller<JsonNode> inputCaller = InputCaller.on(databaseClient, modulesHandle, new JacksonHandle().withFormat(Format.JSON));
48+
49+
IOEndpoint.CallContext callContext = inputCaller.newCallContext()
50+
.withEndpointConstants(new JacksonHandle(buildEndpointConstants(parsedConfig)));
51+
this.bulkInputCaller = inputCaller.bulkCaller(callContext);
52+
this.configureErrorListenerOnBulkInputCaller();
53+
54+
this.sinkRecordConverter = new DefaultSinkRecordConverter(parsedConfig);
55+
}
56+
57+
/**
58+
* When Kafka calls - the frequency of which can be controlled by the user - perform a synchronous flush of any
59+
* records waiting to be written to MarkLogic. {@code BulkInputCaller} does not yet have an asynchronous flush
60+
* like DMSDK does, but the use of a synchronous flush seems appropriate - i.e. Kafka seems to be okay with a
61+
* synchronous call here, while {@code put} is expected to be async.
62+
* <p>
63+
* For a good reference, see https://stackoverflow.com/questions/44871377/put-vs-flush-in-kafka-connector-sink-task
64+
*
65+
* @param currentOffsets
66+
*/
67+
@Override
68+
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
69+
if (bulkInputCaller != null) {
70+
logger.info("Flushing BulkInputCaller");
71+
bulkInputCaller.awaitCompletion();
72+
logger.info("Finished flushing BulkInputCaller");
73+
}
74+
}
75+
76+
@Override
77+
public void stop() {
78+
flush(null);
79+
if (databaseClient != null) {
80+
databaseClient.release();
81+
}
82+
}
83+
84+
/**
85+
* Queues up the sink record for writing to MarkLogic. Once the batch size, as defined in the Bulk API declaration,
86+
* is reached, the {@code BulkInputCaller} will write the data to MarkLogic.
87+
*
88+
* @param sinkRecord
89+
*/
90+
@Override
91+
protected void writeSinkRecord(SinkRecord sinkRecord) {
92+
DocumentWriteOperation writeOp = sinkRecordConverter.convert(sinkRecord);
93+
JsonNode input = buildBulkDataServiceInput(writeOp, sinkRecord);
94+
bulkInputCaller.accept(input);
95+
}
96+
97+
/**
98+
* Bulk Data Services requires that the API declaration exist in the modules database associated with the app
99+
* server that the connector will talk to.
100+
*
101+
* @param parsedConfig
102+
* @param databaseClientConfig
103+
* @return
104+
*/
105+
private JacksonHandle readApiDeclarationFromMarkLogic(Map<String, Object> parsedConfig, DatabaseClientConfig databaseClientConfig) {
106+
final String bulkApiUri = (String) parsedConfig.get(MarkLogicSinkConfig.BULK_DS_API_URI);
107+
final String modulesDatabase = (String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_MODULES_DATABASE);
108+
if (!StringUtils.hasText(modulesDatabase)) {
109+
throw new IllegalArgumentException("Cannot read Bulk Data Services API declaration at URI: " + bulkApiUri
110+
+ "; no modules database configured. Please set the "
111+
+ MarkLogicSinkConfig.CONNECTION_MODULES_DATABASE + " property.");
112+
}
113+
114+
final String originalDatabase = databaseClientConfig.getDatabase();
115+
try {
116+
databaseClientConfig.setDatabase(modulesDatabase);
117+
DatabaseClient modulesClient = new DefaultConfiguredDatabaseClientFactory().newDatabaseClient(databaseClientConfig);
118+
return modulesClient.newJSONDocumentManager().read(bulkApiUri, new JacksonHandle().withFormat(Format.JSON));
119+
} catch (Exception ex) {
120+
// The stacktrace isn't of any value here for a user; the message below will provide sufficient information
121+
// for debugging
122+
throw new RuntimeException("Unable to read Bulk Data Services API declaration at URI: " + bulkApiUri +
123+
"; modules database: " + modulesDatabase + "; cause: " + ex.getMessage());
124+
} finally {
125+
databaseClientConfig.setDatabase(originalDatabase);
126+
}
127+
}
128+
129+
/**
130+
* When using Bulk Data Services, include all "ml.document" config options in the endpoint constants in case the
131+
* endpoint developer wishes to use these.
132+
*
133+
* @param parsedConfig
134+
* @return
135+
*/
136+
private ObjectNode buildEndpointConstants(Map<String, Object> parsedConfig) {
137+
ObjectNode endpointConstants = this.objectMapper.createObjectNode();
138+
for (String key : parsedConfig.keySet()) {
139+
if (key.startsWith("ml.document")) {
140+
Object value = parsedConfig.get(key);
141+
if (value != null) {
142+
endpointConstants.put(key, value.toString());
143+
}
144+
}
145+
}
146+
return endpointConstants;
147+
}
148+
149+
/**
150+
* An envelope structure is used so that both the content and Kafka metadata from the sink record can be sent to
151+
* the endpoint.
152+
*
153+
* @param writeOp
154+
* @param sinkRecord
155+
* @return
156+
*/
157+
private JsonNode buildBulkDataServiceInput(DocumentWriteOperation writeOp, SinkRecord sinkRecord) {
158+
AbstractWriteHandle handle = writeOp.getContent();
159+
// This assumes that the SinkRecordConverter always constructs either a BytesHandle or StringHandle. This is an
160+
// implementation detail not exposed to the user, and sufficient testing should ensure that this assumption
161+
// holds up over time.
162+
String content;
163+
if (handle instanceof BytesHandle) {
164+
content = new String(((BytesHandle) handle).get());
165+
} else {
166+
content = ((StringHandle) handle).get();
167+
}
168+
ObjectNode input = new ObjectMapper().createObjectNode();
169+
input.put("content", content);
170+
171+
ObjectNode kafkaMetadata = input.putObject("kafka-metadata");
172+
kafkaMetadata.put("topic", sinkRecord.topic());
173+
Object key = sinkRecord.key();
174+
if (key != null) {
175+
kafkaMetadata.put("key", key.toString());
176+
}
177+
kafkaMetadata.put("offset", sinkRecord.kafkaOffset());
178+
Integer partition = sinkRecord.kafkaPartition();
179+
if (partition != null) {
180+
kafkaMetadata.put("partition", partition);
181+
}
182+
Long timestamp = sinkRecord.timestamp();
183+
if (timestamp != null) {
184+
kafkaMetadata.put("timestamp", timestamp);
185+
}
186+
return input;
187+
}
188+
189+
/**
190+
* For the initial release of this capability, applying the "skip" approach that behaves in the same manner as
191+
* the existing WriteBatcher approach - i.e. log the failure and keep processing other records/batches. Can make
192+
* this configurable in the future if a client wants "stop all calls" support.
193+
*/
194+
private void configureErrorListenerOnBulkInputCaller() {
195+
this.bulkInputCaller.setErrorListener((retryCount, throwable, callContext, input) -> {
196+
// The stacktrace is not included here, as it will only contain references to Bulk Data Services code and
197+
// connector code, which won't help with debugging. The MarkLogic error log will be of much more value,
198+
// along with seeing the error message here.
199+
logger.error("Skipping failed write; cause: " + throwable.getMessage() + "; check the MarkLogic error " +
200+
"log file for additional information as to the cause of the failed write");
201+
return IOEndpoint.BulkIOEndpointCaller.ErrorDisposition.SKIP_CALL;
202+
});
203+
}
204+
}

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public class MarkLogicSinkConfig extends AbstractConfig {
1515
public static final String CONNECTION_HOST = "ml.connection.host";
1616
public static final String CONNECTION_PORT = "ml.connection.port";
1717
public static final String CONNECTION_DATABASE = "ml.connection.database";
18+
public static final String CONNECTION_MODULES_DATABASE = "ml.connection.modulesDatabase";
1819
public static final String CONNECTION_SECURITY_CONTEXT_TYPE = "ml.connection.securityContextType";
1920
public static final String CONNECTION_USERNAME = "ml.connection.username";
2021
public static final String CONNECTION_PASSWORD = "ml.connection.password";
@@ -35,6 +36,8 @@ public class MarkLogicSinkConfig extends AbstractConfig {
3536
public static final String DMSDK_TRANSFORM_PARAMS_DELIMITER = "ml.dmsdk.transformParamsDelimiter";
3637
public static final String DMSDK_INCLUDE_KAFKA_METADATA = "ml.dmsdk.includeKafkaMetadata";
3738

39+
public static final String BULK_DS_API_URI = "ml.sink.bulkds.apiUri";
40+
3841
public static final String DOCUMENT_COLLECTIONS_ADD_TOPIC = "ml.document.addTopicToCollections";
3942
public static final String DOCUMENT_COLLECTIONS = "ml.document.collections";
4043
public static final String DOCUMENT_TEMPORAL_COLLECTION = "ml.document.temporalCollection";
@@ -74,6 +77,8 @@ public class MarkLogicSinkConfig extends AbstractConfig {
7477
"External name for 'KERBEROS' authentication")
7578
.define(CONNECTION_DATABASE, Type.STRING, null, Importance.LOW,
7679
"Name of a database to connect to. If your REST API server has a content database matching that of the one that you want to write documents to, you do not need to set this.")
80+
.define(CONNECTION_MODULES_DATABASE, Type.STRING, null, Importance.MEDIUM,
81+
"Name of the modules database associated with the app server; required if using Bulk Data Services so that the API module can be retrieved")
7782
.define(CONNECTION_TYPE, Type.STRING, null, Importance.MEDIUM,
7883
"Set to 'GATEWAY' when the host identified by ml.connection.host is a load balancer. See https://docs.marklogic.com/guide/java/data-movement#id_26583 for more information.")
7984
// Boolean fields must have a default value of null; otherwise, Confluent Platform, at least in version 7.2.1,
@@ -124,6 +129,11 @@ public class MarkLogicSinkConfig extends AbstractConfig {
124129
.define(DMSDK_INCLUDE_KAFKA_METADATA, Type.BOOLEAN, null, Importance.LOW,
125130
"Set to true so that Kafka record metadata is added to document metadata before it is written. If the document fails to be written, the Kafka record metadata will be logged as well.")
126131

132+
// TODO Need more info here on the API declaration itself?
133+
.define(BULK_DS_API_URI, Type.STRING, null, Importance.LOW,
134+
"Defines the URI of a Bulk Data Services API declaration. If set, all DMSDK properties will be ignored as Bulk Data Services will be used instead of DMSDK. " +
135+
"Also, ml.connection.modulesDatabase must be defined so that the API declaration can be retrieved from the modules database.")
136+
127137
.define(LOGGING_RECORD_KEY, Type.BOOLEAN, null, Importance.LOW,
128138
"Set to true to log at the info level the key of each record")
129139
.define(LOGGING_RECORD_HEADERS, Type.BOOLEAN, null, Importance.LOW,

0 commit comments

Comments
 (0)