Skip to content

Commit e4860b8

Browse files
Add option to use auto-generated IDs on indexing (#803)
1 parent fddb620 commit e4860b8

File tree

3 files changed

+62
-6
lines changed

3 files changed

+62
-6
lines changed

src/main/java/io/confluent/connect/elasticsearch/DataConverter.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,10 +181,12 @@ public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
181181
.retryOnConflict(Math.min(config.maxInFlightRequests(), 5));
182182
case INSERT:
183183
OpType opType = config.isDataStream() ? OpType.CREATE : OpType.INDEX;
184-
return maybeAddExternalVersioning(
185-
new IndexRequest(index).id(id).source(payload, XContentType.JSON).opType(opType),
186-
record
187-
);
184+
IndexRequest req =
185+
new IndexRequest(index).source(payload, XContentType.JSON).opType(opType);
186+
if (config.useAutogeneratedIds()) {
187+
return req;
188+
}
189+
return maybeAddExternalVersioning(req.id(id), record);
188190
default:
189191
return null; // shouldn't happen
190192
}

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,9 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
232232
private static final String DROP_INVALID_MESSAGE_DISPLAY = "Drop invalid messages";
233233
private static final boolean DROP_INVALID_MESSAGE_DEFAULT = false;
234234

235+
private static final String USE_AUTOGENERATED_IDS_DISPLAY = "Elasticsearch Generated IDs";
236+
private static final boolean USE_AUTOGENERATED_IDS_DEFAULT = false;
237+
235238
public static final String BEHAVIOR_ON_NULL_VALUES_CONFIG = "behavior.on.null.values";
236239
private static final String BEHAVIOR_ON_NULL_VALUES_DOC = "How to handle records with a "
237240
+ "non-null key and a null value (i.e. Kafka tombstone records). Valid options are "
@@ -358,7 +361,17 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
358361
+ DATA_STREAM_NAMESPACE_CONFIG + "``}.";
359362
private static final String DATA_STREAM_DATASET_DISPLAY = "Data Stream Dataset";
360363
private static final String DATA_STREAM_DATASET_DEFAULT = "";
361-
364+
public static final String USE_AUTO_GENERATED_IDS_CONFIG = "use.autogenerated.ids";
365+
private static final String USE_AUTO_GENERATED_IDS_DOC = String.format(
366+
"Whether to use auto-generated Elasticsearch document IDs for insertion requests. "
367+
+ "Note that this setting removes exactly once guarantees and message "
368+
+ "delivery will be at least once. Only applies if %s is set to %s."
369+
+ "When this is set to ``true``, ``%s`` option will also be ignored when "
370+
+ "sending data to Elasticsearch",
371+
WRITE_METHOD_CONFIG,
372+
WriteMethod.INSERT,
373+
IGNORE_KEY_CONFIG
374+
);
362375
public static final String DATA_STREAM_TYPE_CONFIG = "data.stream.type";
363376
private static final String DATA_STREAM_TYPE_DOC = String.format(
364377
"Generic type describing the data to be written to data stream. "
@@ -736,6 +749,16 @@ private static void addConversionConfigs(ConfigDef configDef) {
736749
Width.SHORT,
737750
WRITE_METHOD_DISPLAY,
738751
new EnumRecommender<>(WriteMethod.class)
752+
).define(
753+
USE_AUTO_GENERATED_IDS_CONFIG,
754+
Type.BOOLEAN,
755+
USE_AUTOGENERATED_IDS_DEFAULT,
756+
Importance.MEDIUM,
757+
USE_AUTO_GENERATED_IDS_DOC,
758+
CONNECTOR_GROUP,
759+
++order,
760+
Width.SHORT,
761+
USE_AUTOGENERATED_IDS_DISPLAY
739762
);
740763
}
741764

@@ -923,7 +946,8 @@ public boolean shouldDisableHostnameVerification() {
923946
}
924947

925948
public boolean shouldIgnoreKey(String topic) {
926-
return ignoreKey() || ignoreKeyTopics().contains(topic);
949+
return ignoreKey() || ignoreKeyTopics().contains(topic)
950+
|| (writeMethod().equals(WriteMethod.INSERT) && useAutogeneratedIds());
927951
}
928952

929953
public boolean shouldIgnoreSchema(String topic) {
@@ -1010,6 +1034,10 @@ public Set<String> ignoreSchemaTopics() {
10101034
return new HashSet<>(getList(IGNORE_SCHEMA_TOPICS_CONFIG));
10111035
}
10121036

1037+
public boolean useAutogeneratedIds() {
1038+
return getBoolean(USE_AUTO_GENERATED_IDS_CONFIG);
1039+
}
1040+
10131041
public String kerberosUserPrincipal() {
10141042
return getString(KERBEROS_PRINCIPAL_CONFIG);
10151043
}

src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.kafka.connect.data.*;
2121
import org.apache.kafka.connect.errors.DataException;
2222
import org.apache.kafka.connect.sink.SinkRecord;
23+
import org.elasticsearch.action.DocWriteRequest;
2324
import org.elasticsearch.action.delete.DeleteRequest;
2425
import org.elasticsearch.action.index.IndexRequest;
2526
import org.elasticsearch.index.VersionType;
@@ -39,6 +40,7 @@
3940
import static io.confluent.connect.elasticsearch.DataConverter.TIMESTAMP_FIELD;
4041
import static org.junit.Assert.assertEquals;
4142
import static org.junit.Assert.assertFalse;
43+
import static org.junit.Assert.assertNotNull;
4244
import static org.junit.Assert.assertNull;
4345
import static org.junit.Assert.assertThrows;
4446
import static org.junit.Assert.fail;
@@ -435,6 +437,30 @@ public void failOnNullValue() {
435437
}
436438
}
437439

440+
@Test
441+
public void useAutogeneratedIds() {
442+
props.put(ElasticsearchSinkConnectorConfig.USE_AUTO_GENERATED_IDS_CONFIG, "true");
443+
props.put(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG, "INSERT");
444+
445+
converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));
446+
447+
SinkRecord sinkRecord = createSinkRecordWithValue(new Struct(schema).put("string","test"));
448+
DocWriteRequest<?> req = converter.convertRecord(sinkRecord, index);
449+
assertNull(req.id());
450+
}
451+
452+
@Test
453+
public void ignoreAutogeneratedIdsForUpsert() {
454+
props.put(ElasticsearchSinkConnectorConfig.USE_AUTO_GENERATED_IDS_CONFIG, "true");
455+
props.put(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG, "UPSERT");
456+
457+
converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));
458+
459+
SinkRecord sinkRecord = createSinkRecordWithValue(new Struct(schema).put("string","test"));
460+
DocWriteRequest<?> req = converter.convertRecord(sinkRecord, index);
461+
assertNotNull(req.id());
462+
}
463+
438464
public SinkRecord createSinkRecordWithValue(Object value) {
439465
return new SinkRecord(
440466
topic,

0 commit comments

Comments
 (0)