Skip to content

Commit 23d6f30

Browse files
committed
feat: add ignore version configuration
fix: add logs fix: logs
1 parent b61af84 commit 23d6f30

File tree

4 files changed

+68
-2
lines changed

4 files changed

+68
-2
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@
327327

328328
Mapping is the process of defining how a document, and the fields it contains, are stored and indexed. Users can explicitly define mappings for types in indices. When a mapping is not explicitly defined, Elasticsearch can determine field names and types from data, however, some types such as timestamp and decimal, may not be correctly inferred. To ensure that the types are correctly inferred, the connector provides a feature to infer a mapping from the schemas of Kafka messages.
329329
</description>
330-
<logo>logos/elasticsearch.jpg</logo>
330+
<logo>logos/elasticlogo.png</logo>
331331

332332
<supportProviderName>Confluent, Inc.</supportProviderName>
333333
<supportSummary><![CDATA[This connector is <a href="https://www.confluent.io/subscription/">supported by Confluent</a> as part of a <a href="https://www.confluent.io/product/confluent-platform/">Confluent Platform</a> subscription.]]></supportSummary>

src/main/java/io/confluent/connect/elasticsearch/DataConverter.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,15 @@ private DocWriteRequest<?> maybeAddExternalVersioning(
247247
DocWriteRequest<?> request,
248248
SinkRecord record
249249
) {
250-
if (!config.isDataStream() && !config.shouldIgnoreKey(record.topic())) {
250+
if (!config.shouldIgnoreVersion()
251+
&& !config.isDataStream()
252+
&& !config.shouldIgnoreKey(record.topic())) {
253+
log.debug("Adding external versioning to {} with offset {}, the ignore setting is {}",
254+
recordString(record), record.kafkaOffset(), config.shouldIgnoreVersion());
251255
request.versionType(VersionType.EXTERNAL);
252256
request.version(record.kafkaOffset());
257+
} else {
258+
log.trace("Not adding external versioning to {}", recordString(record));
253259
}
254260

255261
return request;

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,15 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
278278
+ "is acceptable and is necessary for troubleshooting.";
279279
private static final Boolean LOG_SENSITIVE_DATA_DEFAULT = Boolean.FALSE;
280280

281+
public static final String IGNORE_VERSION_CONFIG = "version.ignore";
282+
private static final String IGNORE_VERSION_DOC =
283+
"Whether to use kafka record offset as Elasticsearch document EXTERNAL version."
284+
+ " When this is set to ``true``, kafka records with offset lower than previously"
285+
+ " indexed will overwrite the document. When set to ``false``, the kafka record with"
286+
+ " offset lower than one previously indexed will be ignored.";
287+
private static final String IGNORE_VERSION_DISPLAY = "Ignore Version mode";
288+
private static final boolean IGNORE_VERSION_DEFAULT = Boolean.FALSE;
289+
281290
// Proxy group
282291
public static final String PROXY_HOST_CONFIG = "proxy.host";
283292
private static final String PROXY_HOST_DISPLAY = "Proxy Host";
@@ -702,6 +711,16 @@ private static void addConversionConfigs(ConfigDef configDef) {
702711
Width.SHORT,
703712
WRITE_METHOD_DISPLAY,
704713
new EnumRecommender<>(WriteMethod.class)
714+
).define(
715+
IGNORE_VERSION_CONFIG,
716+
Type.BOOLEAN,
717+
IGNORE_VERSION_DEFAULT,
718+
Importance.LOW,
719+
IGNORE_VERSION_DOC,
720+
DATA_CONVERSION_GROUP,
721+
++order,
722+
Width.SHORT,
723+
IGNORE_VERSION_DISPLAY
705724
);
706725
}
707726

@@ -1053,6 +1072,10 @@ public WriteMethod writeMethod() {
10531072
return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase());
10541073
}
10551074

1075+
public boolean shouldIgnoreVersion() {
1076+
return getBoolean(IGNORE_VERSION_CONFIG);
1077+
}
1078+
10561079
private static class DataStreamDatasetValidator implements Validator {
10571080

10581081
@Override

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchClientTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SECURITY_PROTOCOL_CONFIG;
3333
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SSL_CONFIG_PREFIX;
3434
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG;
35+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_VERSION_CONFIG;
3536
import static org.junit.Assert.assertEquals;
3637
import static org.junit.Assert.assertFalse;
3738
import static org.junit.Assert.assertThrows;
@@ -599,6 +600,42 @@ private List<SinkRecord> causeExternalVersionConflictError(ElasticsearchClient c
599600

600601
return conflict_list;
601602
}
603+
604+
/**
605+
* VERSION_IGNORE_CONFIG is set to true, then Version.EXTERNAL is not used
606+
* even if the IGNORE_KEY_CONFIG is said to false.
607+
*
608+
* In this case, even if records
609+
* are written with a kafka offset, less than the previous indexed record and no
610+
* exception should be thrown.
611+
*
612+
* @throws Exception will be thrown if the test fails
613+
*/
614+
@Test
615+
public void testInternalVersionUsedForVersionIgnore() throws Exception {
616+
props.put(IGNORE_VERSION_CONFIG, "true");
617+
props.put(IGNORE_KEY_CONFIG, "false");
618+
config = new ElasticsearchSinkConnectorConfig(props);
619+
converter = new DataConverter(config);
620+
621+
ErrantRecordReporter reporter = mock(ErrantRecordReporter.class);
622+
ElasticsearchClient client = new ElasticsearchClient(config, reporter, () -> offsetTracker.updateOffsets()) {
623+
protected boolean handleResponse(BulkItemResponse response, DocWriteRequest<?> request,
624+
long executionId) {
625+
// Check if request version type is INTERNAL
626+
assertEquals(request.versionType(), VersionType.INTERNAL);
627+
return super.handleResponse(response, request, executionId);
628+
}
629+
};
630+
631+
List<SinkRecord> duplicate_records = causeExternalVersionConflictError(client);
632+
633+
// Make sure that no error was reported for any record(s)
634+
for (SinkRecord duplicated_record : duplicate_records) {
635+
verify(reporter, never()).report(eq(duplicated_record), any(Throwable.class));
636+
}
637+
client.close();
638+
}
602639

603640
/**
604641
* If the record version is set to VersionType.EXTERNAL (normal case for non-streaming),

0 commit comments

Comments
 (0)