Skip to content

Commit fc18a5e

Browse files
authored
Merge pull request #179 from BillFarber/feature/storeKafkaHeaders
Can now include Kafka headers in the document metadata
2 parents c861673 + 092d433 commit fc18a5e

File tree

5 files changed

+139
-13
lines changed

5 files changed

+139
-13
lines changed

build.gradle

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ plugins {
55
id "com.github.jk1.dependency-license-report" version "1.19"
66

77
// Only used for testing
8-
id 'com.marklogic.ml-gradle' version '4.7.0'
8+
id 'com.marklogic.ml-gradle' version '4.8.0'
99
id 'jacoco'
1010
id "org.sonarqube" version "4.4.1.3373"
1111

@@ -42,9 +42,9 @@ dependencies {
4242
compileOnly "org.apache.kafka:connect-runtime:${kafkaVersion}"
4343
compileOnly "org.slf4j:slf4j-api:2.0.13"
4444

45-
implementation 'com.marklogic:ml-javaclient-util:4.7.0'
45+
implementation 'com.marklogic:ml-javaclient-util:4.8.0'
4646
// Force DHF to use the latest version of ml-app-deployer, which minimizes security vulnerabilities
47-
implementation "com.marklogic:ml-app-deployer:4.7.0"
47+
implementation "com.marklogic:ml-app-deployer:4.8.0"
4848

4949
implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.15.3"
5050

docs/writing-data.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,24 @@ This will result in the following pieces of Kafka record metadata being in each
113113
- `kafka.partition` = the partition of the Kafka record
114114
- `kafka.timestamp` = the timestamp of the Kafka record
115115

116+
### Including Kafka headers
117+
118+
Each Kafka record passed to the MarkLogic connector also has headers that may contain useful information which can be
119+
included in the metadata written with documents. This includes the headers that are included in Kafka records by
120+
default as well as any custom headers. Kafka headers can be included in each document by configuring the following
121+
property:
122+
123+
- `ml.dmsdk.includeKafkaHeaders` = `true` to include Kafka headers
124+
125+
When the headers are added to the document metadata, they are simply given the same name as the key for the header.
126+
However, you may also specify a prefix that will be prepended to each header key. To set that prefix, use the following
127+
property:
128+
129+
- `ml.dmsdk.includeKafkaHeaders.prefix` = `<prefix>` to be prepended to header keys in the metadata.
130+
131+
The headers that are on the Kafka records will depend on the Kafka distribution you are using and the message producer
132+
configuration.
133+
116134
### Configuring DMSDK performance
117135

118136
The performance of how data is written to MarkLogic can be configured via the following properties:

src/main/java/com/marklogic/kafka/connect/sink/DefaultSinkRecordConverter.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,19 @@ public class DefaultSinkRecordConverter implements SinkRecordConverter {
5252
private DocumentWriteOperationBuilder documentWriteOperationBuilder;
5353
private Format format;
5454
private String mimeType;
55-
private boolean addTopicToCollections = false;
56-
private boolean includeKafkaMetadata = false;
57-
private IdStrategy idStrategy;
55+
private final boolean addTopicToCollections;
56+
private final boolean includeKafkaMetadata;
57+
private final boolean includeKafkaHeaders;
58+
private final String kafkaHeadersPrefix;
59+
60+
private final IdStrategy idStrategy;
5861

5962
public DefaultSinkRecordConverter(Map<String, Object> parsedConfig) {
6063
this.addTopicToCollections = ConfigUtil.getBoolean(MarkLogicSinkConfig.DOCUMENT_COLLECTIONS_ADD_TOPIC, parsedConfig);
6164
this.includeKafkaMetadata = ConfigUtil.getBoolean(MarkLogicSinkConfig.DMSDK_INCLUDE_KAFKA_METADATA, parsedConfig);
65+
this.includeKafkaHeaders = ConfigUtil.getBoolean(MarkLogicSinkConfig.DMSDK_INCLUDE_KAFKA_HEADERS, parsedConfig);
66+
String configPrefixValue = (String) parsedConfig.get(MarkLogicSinkConfig.DMSDK_INCLUDE_KAFKA_HEADERS_PREFIX);
67+
this.kafkaHeadersPrefix = (configPrefixValue == null) ? "" : configPrefixValue;
6268

6369
documentWriteOperationBuilder = new DocumentWriteOperationBuilder()
6470
.withCollections((String) parsedConfig.get(MarkLogicSinkConfig.DOCUMENT_COLLECTIONS))
@@ -94,8 +100,20 @@ private DocumentMetadataHandle buildAdditionalMetadata(SinkRecord sinkRecord) {
94100
if (this.addTopicToCollections) {
95101
metadata.getCollections().add(sinkRecord.topic());
96102
}
97-
if (this.includeKafkaMetadata) {
103+
if (this.includeKafkaMetadata || this.includeKafkaHeaders) {
98104
DocumentMetadataHandle.DocumentMetadataValues values = metadata.getMetadataValues();
105+
addKafkaMetadataToDocumentMetadata(sinkRecord, values);
106+
if (this.includeKafkaHeaders) {
107+
sinkRecord.headers().forEach(
108+
header -> values.add(kafkaHeadersPrefix + header.key(), header.value().toString())
109+
);
110+
}
111+
}
112+
return metadata;
113+
}
114+
115+
private void addKafkaMetadataToDocumentMetadata(SinkRecord sinkRecord, DocumentMetadataHandle.DocumentMetadataValues values) {
116+
if (this.includeKafkaMetadata) {
99117
Object key = sinkRecord.key();
100118
if (key != null) {
101119
values.add("kafka-key", key.toString());
@@ -111,7 +129,6 @@ private DocumentMetadataHandle buildAdditionalMetadata(SinkRecord sinkRecord) {
111129
}
112130
values.add("kafka-topic", sinkRecord.topic());
113131
}
114-
return metadata;
115132
}
116133

117134
/**

src/main/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public class MarkLogicSinkConfig extends MarkLogicConfig {
3737
public static final String DMSDK_TRANSFORM_PARAMS = "ml.dmsdk.transformParams";
3838
public static final String DMSDK_TRANSFORM_PARAMS_DELIMITER = "ml.dmsdk.transformParamsDelimiter";
3939
public static final String DMSDK_INCLUDE_KAFKA_METADATA = "ml.dmsdk.includeKafkaMetadata";
40+
public static final String DMSDK_INCLUDE_KAFKA_HEADERS = "ml.dmsdk.includeKafkaHeaders";
41+
public static final String DMSDK_INCLUDE_KAFKA_HEADERS_PREFIX = "ml.dmsdk.includeKafkaHeaders.prefix";
4042

4143
public static final String BULK_DS_ENDPOINT_URI = "ml.sink.bulkds.endpointUri";
4244
public static final String BULK_DS_BATCH_SIZE = "ml.sink.bulkds.batchSize";
@@ -118,6 +120,12 @@ private static ConfigDef getConfigDef() {
118120
.define(DMSDK_INCLUDE_KAFKA_METADATA, Type.BOOLEAN, null, Importance.LOW,
119121
"Set to true so that Kafka record metadata is added to document metadata before it is written. If the document fails to be written, the Kafka record metadata will be logged as well.",
120122
GROUP, -1, ConfigDef.Width.MEDIUM, "DMSDK Include Kafka Metadata")
123+
.define(DMSDK_INCLUDE_KAFKA_HEADERS, Type.BOOLEAN, null, Importance.LOW,
124+
"Set to true so that Kafka record headers are added to document metadata before it is written. If the document fails to be written, the Kafka record headers will be logged as well.",
125+
GROUP, -1, ConfigDef.Width.MEDIUM, "DMSDK Include Kafka Headers")
126+
.define(DMSDK_INCLUDE_KAFKA_HEADERS_PREFIX, Type.STRING, null, Importance.LOW,
127+
"Optional value of the prefix to be added to the Kafka headers when included in document metadata.",
128+
GROUP, -1, ConfigDef.Width.MEDIUM, "DMSDK Kafka Header Prefix")
121129

122130
.define(BULK_DS_ENDPOINT_URI, Type.STRING, null, Importance.LOW,
123131
"Defines the URI of a Bulk Data Services endpoint for writing data. " +

src/test/java/com/marklogic/kafka/connect/sink/ConvertSinkRecordTest.java

Lines changed: 88 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@
2424
import com.marklogic.client.io.Format;
2525
import com.marklogic.client.io.StringHandle;
2626
import org.apache.kafka.common.record.TimestampType;
27+
import org.apache.kafka.connect.data.Schema;
28+
import org.apache.kafka.connect.header.Header;
2729
import org.apache.kafka.connect.sink.SinkRecord;
2830
import org.junit.jupiter.api.Test;
2931

3032
import java.io.IOException;
31-
import java.util.HashMap;
32-
import java.util.Iterator;
33-
import java.util.Map;
33+
import java.util.*;
3434

3535
import static org.junit.jupiter.api.Assertions.assertEquals;
3636
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -78,7 +78,7 @@ void allPropertiesSet() {
7878

7979
@Test
8080
void noPropertiesSet() {
81-
Map<String, Object> kafkaConfig = new HashMap<String, Object>();
81+
Map<String, Object> kafkaConfig = new HashMap<>();
8282
converter = new DefaultSinkRecordConverter(kafkaConfig);
8383

8484
DocumentWriteOperation op = converter.convert(newSinkRecord("doesn't matter"));
@@ -221,7 +221,7 @@ void binaryContent() {
221221

222222
@Test
223223
void includeKafkaMetadata() {
224-
Map<String, Object> kafkaConfig = new HashMap<String, Object>();
224+
Map<String, Object> kafkaConfig = new HashMap<>();
225225
kafkaConfig.put(MarkLogicSinkConfig.DMSDK_INCLUDE_KAFKA_METADATA, true);
226226
converter = new DefaultSinkRecordConverter(kafkaConfig);
227227

@@ -260,6 +260,89 @@ void dontIncludeKafkaMetadata() {
260260
assertNull(values.get("kafka-topic"));
261261
}
262262

263+
@Test
264+
void includeKafkaHeadersWithPrefix() {
265+
Map<String, Object> kafkaConfig = new HashMap<>();
266+
kafkaConfig.put(MarkLogicSinkConfig.DMSDK_INCLUDE_KAFKA_HEADERS, true);
267+
kafkaConfig.put(MarkLogicSinkConfig.DMSDK_INCLUDE_KAFKA_HEADERS_PREFIX, "kafkaHeader_");
268+
converter = new DefaultSinkRecordConverter(kafkaConfig);
269+
270+
final int partition = 5;
271+
final long offset = 2;
272+
final String key = "some-key";
273+
final Long timestamp = System.currentTimeMillis();
274+
List<Header> headers = new ArrayList<Header>() {{
275+
add(new TestHeaders("A", "1"));
276+
add(new TestHeaders("B", "2"));
277+
}};
278+
DocumentWriteOperation op = converter.convert(new SinkRecord("topic1", partition, null, key,
279+
null, "some-value", offset, timestamp, TimestampType.CREATE_TIME, headers));
280+
281+
DocumentMetadataHandle metadata = (DocumentMetadataHandle) op.getMetadata();
282+
DocumentMetadataHandle.DocumentMetadataValues values = metadata.getMetadataValues();
283+
assertEquals("1", values.get("kafkaHeader_A"));
284+
assertEquals("2", values.get("kafkaHeader_B"));
285+
assertNull(values.get("kafka-offset"));
286+
}
287+
288+
@Test
289+
void includeKafkaHeadersWithoutPrefix() {
290+
Map<String, Object> kafkaConfig = new HashMap<>();
291+
kafkaConfig.put(MarkLogicSinkConfig.DMSDK_INCLUDE_KAFKA_HEADERS, true);
292+
converter = new DefaultSinkRecordConverter(kafkaConfig);
293+
294+
final int partition = 5;
295+
final long offset = 2;
296+
final String key = "some-key";
297+
final Long timestamp = System.currentTimeMillis();
298+
List<Header> headers = new ArrayList<Header>() {{
299+
add(new TestHeaders("A", "1"));
300+
add(new TestHeaders("B", "2"));
301+
}};
302+
DocumentWriteOperation op = converter.convert(new SinkRecord("topic1", partition, null, key,
303+
null, "some-value", offset, timestamp, TimestampType.CREATE_TIME, headers));
304+
305+
DocumentMetadataHandle metadata = (DocumentMetadataHandle) op.getMetadata();
306+
DocumentMetadataHandle.DocumentMetadataValues values = metadata.getMetadataValues();
307+
assertEquals("1", values.get("A"));
308+
assertEquals("2", values.get("B"));
309+
assertNull(values.get("kafka-offset"));
310+
}
311+
312+
static class TestHeaders implements Header {
313+
private final String key;
314+
private final String value;
315+
TestHeaders(String key, String value) {
316+
this.key = key;
317+
this.value = value;
318+
}
319+
320+
@Override
321+
public String key() {
322+
return key;
323+
}
324+
325+
@Override
326+
public Schema schema() {
327+
return null;
328+
}
329+
330+
@Override
331+
public Object value() {
332+
return value;
333+
}
334+
335+
@Override
336+
public Header with(Schema schema, Object o) {
337+
return null;
338+
}
339+
340+
@Override
341+
public Header rename(String s) {
342+
return null;
343+
}
344+
}
345+
263346
private SinkRecord newSinkRecord(Object value) {
264347
return new SinkRecord("test-topic", 1, null, null, null, value, 0);
265348
}

0 commit comments

Comments
 (0)