Skip to content

Commit 60d66fa

Browse files
authored
Merge pull request #67 from marklogic-community/feature/31-error-logging
Can now include Kafka metadata on doc and log it when a failure occurs
2 parents 3a032d1 + 20ee2bc commit 60d66fa

File tree

12 files changed

+332
-60
lines changed

12 files changed

+332
-60
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ connector.
101101
| ml.dmsdk.transform | | Name of a REST transform to use when writing documents |
102102
| ml.dmsdk.transformParams | | Delimited set of transform parameter names and values; example = param1,value1,param2,value2 |
103103
| ml.dmsdk.transformParamsDelimiter | , | Delimiter for transform parameter names and values |
104+
| ml.dmsdk.includeKafkaMetadata | false | Set to true so that Kafka record metadata is added to document metadata before it is written. If the document fails to be written, the Kafka record metadata will be logged as well. |
104105
| ml.log.record.key | false | Set to true to log at the info level the key of each record |
105106
| ml.log.record.headers | false | Set to true to log at the info level the headers of each record |
106107
| ml.datahub.flow.name | | Name of a Data Hub Framework flow to run after writing documents |

config/marklogic-sink.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ ml.dmsdk.threadCount=8
116116
# Delimiter for transform parameter names and values
117117
# ml.dmsdk.transformParamsDelimiter=,
118118

119+
# Set to true so that Kafka record metadata is added to document metadata before it is written. If the document fails to
120+
# be written, the Kafka record metadata will be logged as well.
121+
# ml.dmsdk.includeKafkaMetadata=true
122+
119123
# Set to true to log at the info level the key of each record
120124
# ml.log.record.key=true
121125

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.marklogic.kafka.connect;
2+
3+
import java.util.Map;
4+
5+
public abstract class ConfigUtil {
6+
7+
/**
8+
* Convenience method for getting a boolean value from the parsed Kafka config, returning false if the given key is
9+
* not found. This accounts for the fact that boolean options are expected to have "null" as a default value, which
10+
* ensures in Confluent Platform that the default value is shown as "false". Oddly, if the default value is "false",
11+
* then Confluent Platform seems to erroneously show the default value as "true".
12+
*
13+
* @param key
14+
* @param parsedConfig
15+
* @return
16+
*/
17+
public final static boolean getBoolean(String key, Map<String, Object> parsedConfig) {
18+
Boolean val = (Boolean) parsedConfig.get(key);
19+
return val != null ? val : false;
20+
}
21+
}

src/main/java/com/marklogic/kafka/connect/DefaultDatabaseClientConfigBuilder.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,10 @@ public DatabaseClientConfig buildDatabaseClientConfig(Map<String, Object> parsed
4747
clientConfig.setPassword(password.value());
4848
}
4949
clientConfig.setPort((Integer) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_PORT));
50-
Boolean customSsl = (Boolean) parsedConfig.get(MarkLogicSinkConfig.ENABLE_CUSTOM_SSL);
51-
if (customSsl != null && customSsl) {
50+
if (ConfigUtil.getBoolean(MarkLogicSinkConfig.ENABLE_CUSTOM_SSL, parsedConfig)) {
5251
configureCustomSslConnection(clientConfig, parsedConfig);
5352
}
54-
Boolean simpleSsl = (Boolean) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_SIMPLE_SSL);
55-
if (simpleSsl != null && simpleSsl) {
53+
if (ConfigUtil.getBoolean(MarkLogicSinkConfig.CONNECTION_SIMPLE_SSL, parsedConfig)) {
5654
configureSimpleSsl(clientConfig);
5755
}
5856
clientConfig.setUsername((String) parsedConfig.get(MarkLogicSinkConfig.CONNECTION_USERNAME));
@@ -90,12 +88,11 @@ else if ("STRICT".equals(sslHostNameVerifier))
9088

9189
private void configureCustomSslConnection(DatabaseClientConfig clientConfig, Map<String, Object> parsedConfig) {
9290
String tlsVersion = (String) parsedConfig.get(MarkLogicSinkConfig.TLS_VERSION);
93-
Boolean sslMutualAuth = (Boolean) parsedConfig.get(MarkLogicSinkConfig.SSL_MUTUAL_AUTH);
9491
SSLContext sslContext = null;
9592
SecurityContextType securityContextType = clientConfig.getSecurityContextType();
9693

9794
if (SecurityContextType.BASIC.equals(securityContextType) || SecurityContextType.DIGEST.equals(securityContextType)) {
98-
if (sslMutualAuth != null && sslMutualAuth) {
95+
if (ConfigUtil.getBoolean(MarkLogicSinkConfig.SSL_MUTUAL_AUTH, parsedConfig)) {
9996
/*2 way ssl changes*/
10097
KeyStore clientKeyStore = null;
10198
try {

src/main/java/com/marklogic/kafka/connect/sink/DefaultSinkRecordConverter.java

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010
import com.marklogic.client.io.Format;
1111
import com.marklogic.client.io.StringHandle;
1212
import com.marklogic.client.io.marker.AbstractWriteHandle;
13+
import com.marklogic.kafka.connect.ConfigUtil;
1314
import org.apache.kafka.connect.data.Schema;
1415
import org.apache.kafka.connect.data.Struct;
1516
import org.apache.kafka.connect.json.JsonConverter;
1617
import org.apache.kafka.connect.sink.SinkRecord;
1718
import org.apache.kafka.connect.storage.Converter;
19+
import org.springframework.util.StringUtils;
1820

1921
import java.nio.charset.StandardCharsets;
2022
import java.util.Collections;
@@ -35,15 +37,13 @@ public class DefaultSinkRecordConverter implements SinkRecordConverter {
3537
private DocumentWriteOperationBuilder documentWriteOperationBuilder;
3638
private Format format;
3739
private String mimeType;
38-
private Boolean addTopicToCollections = false;
40+
private boolean addTopicToCollections = false;
41+
private boolean includeKafkaMetadata = false;
3942
private IdStrategy idStrategy;
4043

4144
public DefaultSinkRecordConverter(Map<String, Object> parsedConfig) {
42-
43-
Boolean booleanVal = (Boolean) parsedConfig.get(MarkLogicSinkConfig.DOCUMENT_COLLECTIONS_ADD_TOPIC);
44-
if (booleanVal != null) {
45-
addTopicToCollections = booleanVal;
46-
}
45+
this.addTopicToCollections = ConfigUtil.getBoolean(MarkLogicSinkConfig.DOCUMENT_COLLECTIONS_ADD_TOPIC, parsedConfig);
46+
this.includeKafkaMetadata = ConfigUtil.getBoolean(MarkLogicSinkConfig.DMSDK_INCLUDE_KAFKA_METADATA, parsedConfig);
4747

4848
documentWriteOperationBuilder = new DocumentWriteOperationBuilder()
4949
.withCollections((String) parsedConfig.get(MarkLogicSinkConfig.DOCUMENT_COLLECTIONS))
@@ -52,35 +52,49 @@ public DefaultSinkRecordConverter(Map<String, Object> parsedConfig) {
5252
.withUriSuffix((String) parsedConfig.get(MarkLogicSinkConfig.DOCUMENT_URI_SUFFIX));
5353

5454
String val = (String) parsedConfig.get(MarkLogicSinkConfig.DOCUMENT_FORMAT);
55-
if (val != null && val.trim().length() > 0) {
56-
format = Format.valueOf(val.toUpperCase());
55+
if (StringUtils.hasText(val)) {
56+
this.format = Format.valueOf(val.toUpperCase());
5757
}
58+
5859
val = (String) parsedConfig.get(MarkLogicSinkConfig.DOCUMENT_MIMETYPE);
59-
if (val != null && val.trim().length() > 0) {
60-
mimeType = val;
60+
if (StringUtils.hasText(val)) {
61+
this.mimeType = val;
6162
}
6263

63-
idStrategy = IdStrategyFactory.getIdStrategy(parsedConfig);
64+
this.idStrategy = IdStrategyFactory.getIdStrategy(parsedConfig);
6465
}
6566

6667
@Override
6768
public DocumentWriteOperation convert(SinkRecord sinkRecord) {
6869
RecordContent recordContent = new RecordContent();
6970
AbstractWriteHandle content = toContent(sinkRecord);
7071
recordContent.setContent(content);
71-
recordContent.setAdditionalMetadata(addTopicToCollections(sinkRecord.topic(), addTopicToCollections));
72+
recordContent.setAdditionalMetadata(buildAdditionalMetadata(sinkRecord));
7273
recordContent.setId(idStrategy.generateId(content, sinkRecord.topic(), sinkRecord.kafkaPartition(), sinkRecord.kafkaOffset()));
7374
return documentWriteOperationBuilder.build(recordContent);
7475
}
7576

76-
/**
77-
* @param topic, addTopicToCollections
78-
* @return metadata
79-
*/
80-
protected DocumentMetadataHandle addTopicToCollections(String topic, Boolean addTopicToCollections) {
77+
private DocumentMetadataHandle buildAdditionalMetadata(SinkRecord sinkRecord) {
8178
DocumentMetadataHandle metadata = new DocumentMetadataHandle();
82-
if (addTopicToCollections) {
83-
metadata.getCollections().add(topic);
79+
if (this.addTopicToCollections) {
80+
metadata.getCollections().add(sinkRecord.topic());
81+
}
82+
if (this.includeKafkaMetadata) {
83+
DocumentMetadataHandle.DocumentMetadataValues values = metadata.getMetadataValues();
84+
Object key = sinkRecord.key();
85+
if (key != null) {
86+
values.add("kafka-key", key.toString());
87+
}
88+
values.add("kafka-offset", sinkRecord.kafkaOffset() + "");
89+
Integer partition = sinkRecord.kafkaPartition();
90+
if (partition != null) {
91+
values.add("kafka-partition", partition + "");
92+
}
93+
Long timestamp = sinkRecord.timestamp();
94+
if (timestamp != null) {
95+
values.add("kafka-timestamp", timestamp + "");
96+
}
97+
values.add("kafka-topic", sinkRecord.topic());
8498
}
8599
return metadata;
86100
}
@@ -91,10 +105,11 @@ protected DocumentMetadataHandle addTopicToCollections(String topic, Boolean add
91105
* @param record
92106
* @return
93107
*/
94-
protected AbstractWriteHandle toContent(SinkRecord record) {
95-
if ((record == null) || (record.value() == null)) {
96-
throw new NullPointerException("'record' must not be null, and must have a value.");
108+
private AbstractWriteHandle toContent(SinkRecord record) {
109+
if (record == null || record.value() == null) {
110+
throw new IllegalArgumentException("Sink record must not be null and must have a value");
97111
}
112+
98113
Object value = record.value();
99114
Schema schema = record.valueSchema();
100115

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class MarkLogicSinkConfig extends AbstractConfig {
3333
public static final String DMSDK_TRANSFORM = "ml.dmsdk.transform";
3434
public static final String DMSDK_TRANSFORM_PARAMS = "ml.dmsdk.transformParams";
3535
public static final String DMSDK_TRANSFORM_PARAMS_DELIMITER = "ml.dmsdk.transformParamsDelimiter";
36+
public static final String DMSDK_INCLUDE_KAFKA_METADATA = "ml.dmsdk.includeKafkaMetadata";
3637

3738
public static final String DOCUMENT_COLLECTIONS_ADD_TOPIC = "ml.document.addTopicToCollections";
3839
public static final String DOCUMENT_COLLECTIONS = "ml.document.collections";
@@ -120,6 +121,8 @@ public class MarkLogicSinkConfig extends AbstractConfig {
120121
"Delimited set of transform parameter names and values; example = param1,value1,param2,value2")
121122
.define(DMSDK_TRANSFORM_PARAMS_DELIMITER, Type.STRING, ",", Importance.LOW,
122123
"Delimiter for transform parameter names and values")
124+
.define(DMSDK_INCLUDE_KAFKA_METADATA, Type.BOOLEAN, null, Importance.LOW,
125+
"Set to true so that Kafka record metadata is added to document metadata before it is written. If the document fails to be written, the Kafka record metadata will be logged as well.")
123126

124127
.define(LOGGING_RECORD_KEY, Type.BOOLEAN, null, Importance.LOW,
125128
"Set to true to log at the info level the key of each record")

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkTask.java

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.marklogic.client.document.ServerTransform;
77
import com.marklogic.client.ext.DatabaseClientConfig;
88
import com.marklogic.client.ext.DefaultConfiguredDatabaseClientFactory;
9+
import com.marklogic.kafka.connect.ConfigUtil;
910
import com.marklogic.kafka.connect.DefaultDatabaseClientConfigBuilder;
1011
import org.apache.kafka.connect.sink.SinkRecord;
1112
import org.apache.kafka.connect.sink.SinkTask;
@@ -37,29 +38,18 @@ public void start(final Map<String, String> config) {
3738

3839
Map<String, Object> parsedConfig = MarkLogicSinkConfig.CONFIG_DEF.parse(config);
3940

40-
if (parsedConfig.get(MarkLogicSinkConfig.LOGGING_RECORD_KEY) != null) {
41-
logKeys = (Boolean) parsedConfig.get(MarkLogicSinkConfig.LOGGING_RECORD_KEY);
42-
}
43-
if (parsedConfig.get(MarkLogicSinkConfig.LOGGING_RECORD_HEADERS) != null) {
44-
logHeaders = (Boolean) parsedConfig.get(MarkLogicSinkConfig.LOGGING_RECORD_HEADERS);
45-
}
41+
logKeys = ConfigUtil.getBoolean(MarkLogicSinkConfig.LOGGING_RECORD_KEY, parsedConfig);
42+
logHeaders = ConfigUtil.getBoolean(MarkLogicSinkConfig.LOGGING_RECORD_HEADERS, parsedConfig);
4643

4744
sinkRecordConverter = new DefaultSinkRecordConverter(parsedConfig);
4845

4946
DatabaseClientConfig databaseClientConfig = new DefaultDatabaseClientConfigBuilder().buildDatabaseClientConfig(parsedConfig);
5047
databaseClient = new DefaultConfiguredDatabaseClientFactory().newDatabaseClient(databaseClientConfig);
5148

5249
dataMovementManager = databaseClient.newDataMovementManager();
53-
5450
writeBatcher = dataMovementManager.newWriteBatcher();
5551
configureWriteBatcher(parsedConfig, writeBatcher);
5652

57-
writeBatcher.onBatchFailure((writeBatch, throwable) -> {
58-
int batchSize = writeBatch.getItems().length;
59-
logger.error("#error failed to write {} records", batchSize);
60-
logger.error("#error batch failure:", throwable);
61-
});
62-
6353
final String flowName = (String) parsedConfig.get(MarkLogicSinkConfig.DATAHUB_FLOW_NAME);
6454
if (flowName != null && flowName.trim().length() > 0) {
6555
writeBatcher.onBatchSuccess(buildSuccessListener(flowName, parsedConfig, databaseClientConfig));
@@ -76,8 +66,8 @@ public void start(final Map<String, String> config) {
7666
* @param parsedConfig
7767
* @param writeBatcher
7868
*/
79-
protected void configureWriteBatcher(Map<String, Object> parsedConfig, WriteBatcher writeBatcher) {
80-
Integer batchSize = (Integer)parsedConfig.get(MarkLogicSinkConfig.DMSDK_BATCH_SIZE);
69+
private void configureWriteBatcher(Map<String, Object> parsedConfig, WriteBatcher writeBatcher) {
70+
Integer batchSize = (Integer) parsedConfig.get(MarkLogicSinkConfig.DMSDK_BATCH_SIZE);
8171
if (batchSize != null) {
8272
logger.info("DMSDK batch size: " + batchSize);
8373
writeBatcher.withBatchSize(batchSize);
@@ -96,11 +86,14 @@ protected void configureWriteBatcher(Map<String, Object> parsedConfig, WriteBatc
9686
writeBatcher.withTransform(transform);
9787
}
9888

99-
String temporalCollection = (String)parsedConfig.get(MarkLogicSinkConfig.DOCUMENT_TEMPORAL_COLLECTION);
89+
String temporalCollection = (String) parsedConfig.get(MarkLogicSinkConfig.DOCUMENT_TEMPORAL_COLLECTION);
10090
if (StringUtils.hasText(temporalCollection)) {
10191
logger.info("Will add documents to temporal collection: " + temporalCollection);
10292
writeBatcher.withTemporalCollection(temporalCollection);
10393
}
94+
95+
writeBatcher.onBatchFailure(new WriteFailureHandler(
96+
ConfigUtil.getBoolean(MarkLogicSinkConfig.DMSDK_INCLUDE_KAFKA_METADATA, parsedConfig)));
10497
}
10598

10699
/**
@@ -123,7 +116,7 @@ protected RunFlowWriteBatchListener buildSuccessListener(String flowName, Map<St
123116

124117
RunFlowWriteBatchListener listener = new RunFlowWriteBatchListener(flowName, steps, databaseClientConfig);
125118
if (parsedConfig.get(MarkLogicSinkConfig.DATAHUB_FLOW_LOG_RESPONSE) != null) {
126-
listener.setLogResponse((Boolean) parsedConfig.get(MarkLogicSinkConfig.DATAHUB_FLOW_LOG_RESPONSE));
119+
listener.setLogResponse(ConfigUtil.getBoolean(MarkLogicSinkConfig.DATAHUB_FLOW_LOG_RESPONSE, parsedConfig));
127120
}
128121
return listener;
129122
}
@@ -190,18 +183,15 @@ public void put(final Collection<SinkRecord> records) {
190183
return;
191184
}
192185

193-
final List<String> headers = new ArrayList<>();
194-
195186
records.forEach(record -> {
196-
197187
if (record == null) {
198188
logger.warn("Skipping null record object.");
199189
} else {
200190
if (logKeys) {
201191
logger.info("#record key {}", record.key());
202192
}
203193
if (logHeaders) {
204-
headers.clear();
194+
List<String> headers = new ArrayList<>();
205195
record.headers().forEach(header -> {
206196
headers.add(String.format("%s:%s", header.key(), header.value().toString()));
207197
});
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.marklogic.kafka.connect.sink;
2+
3+
import com.marklogic.client.datamovement.WriteBatch;
4+
import com.marklogic.client.datamovement.WriteEvent;
5+
import com.marklogic.client.datamovement.WriteFailureListener;
6+
import com.marklogic.client.ext.helper.LoggingObject;
7+
import com.marklogic.client.io.DocumentMetadataHandle;
8+
import com.marklogic.client.io.marker.DocumentMetadataWriteHandle;
9+
10+
/**
11+
* Handles a failed write batch, which currently just consists of logging information about the failed batch.
12+
*/
13+
public class WriteFailureHandler extends LoggingObject implements WriteFailureListener {
14+
15+
private boolean includeKafkaMetadata;
16+
17+
public WriteFailureHandler(boolean includeKafkaMetadata) {
18+
this.includeKafkaMetadata = includeKafkaMetadata;
19+
}
20+
21+
@Override
22+
public void processFailure(WriteBatch batch, Throwable throwable) {
23+
logger.error("Batch failed; size: {}; cause: {}", batch.getItems().length, throwable.getMessage());
24+
if (this.includeKafkaMetadata) {
25+
logger.error("Logging Kafka record metadata for each failed document");
26+
for (WriteEvent event : batch.getItems()) {
27+
DocumentMetadataWriteHandle writeHandle = event.getMetadata();
28+
if (writeHandle instanceof DocumentMetadataHandle) {
29+
DocumentMetadataHandle metadata = (DocumentMetadataHandle) writeHandle;
30+
DocumentMetadataHandle.DocumentMetadataValues values = metadata.getMetadataValues();
31+
if (values != null) {
32+
logger.error("URI: {}; key: {}; partition: {}; offset: {}; timestamp: {}; topic: {}",
33+
event.getTargetUri(), values.get("kafka-key"), values.get("kafka-partition"),
34+
values.get("kafka-offset"), values.get("kafka-timestamp"), values.get("kafka-topic"));
35+
}
36+
}
37+
}
38+
}
39+
}
40+
}

src/test/java/com/marklogic/kafka/connect/sink/AbstractIntegrationTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,12 @@ protected MarkLogicSinkTask startSinkTask(String... configParamNamesAndValues) {
6161
protected void putContent(MarkLogicSinkTask task, String content) {
6262
String topic = "topic-name-doesnt-matter";
6363
SinkRecord record = new SinkRecord(topic, 1, null, null, null, content, 0);
64+
putSinkRecord(task, record);
65+
}
66+
67+
protected void putSinkRecord(MarkLogicSinkTask task, SinkRecord record) {
6468
task.put(Stream.of(record).collect(Collectors.toList()));
6569
task.getWriteBatcher().flushAndWait();
6670
}
71+
6772
}

0 commit comments

Comments
 (0)