Skip to content

Commit 5071f81

Browse files
allow custom datastream type (#804)
1 parent e4860b8 commit 5071f81

File tree

4 files changed

+11
-9
lines changed

4 files changed

+11
-9
lines changed

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,8 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
378378
+ "The default is %s which indicates the connector will write "
379379
+ "to regular indices instead. If set, this configuration will "
380380
+ "be used alongside %s and %s to construct the data stream name in the form of "
381-
+ "{``%s``}-{``%s``}-{``%s``}.",
381+
+ "{``%s``}-{``%s``}-{``%s``}. Custom index templates defined in the destination"
382+
+ " cluster are supported.",
382383
DataStreamType.NONE.name(),
383384
DATA_STREAM_DATASET_CONFIG,
384385
DATA_STREAM_NAMESPACE_CONFIG,
@@ -885,7 +886,6 @@ private static void addDataStreamConfigs(ConfigDef configDef) {
885886
DATA_STREAM_TYPE_CONFIG,
886887
Type.STRING,
887888
DATA_STREAM_TYPE_DEFAULT.name(),
888-
new EnumRecommender<>(DataStreamType.class),
889889
Importance.LOW,
890890
DATA_STREAM_TYPE_DOC,
891891
DATA_STREAM_GROUP,
@@ -921,7 +921,8 @@ public boolean isBasicProxyConfigured() {
921921
}
922922

923923
public boolean isDataStream() {
924-
return dataStreamType() != DataStreamType.NONE && !dataStreamDataset().isEmpty();
924+
return !dataStreamType().toUpperCase().equals(DataStreamType.NONE.name())
925+
&& !dataStreamDataset().isEmpty();
925926
}
926927

927928
public boolean isProxyWithAuthenticationConfigured() {
@@ -1002,8 +1003,8 @@ public String dataStreamNamespace() {
10021003
return getString(DATA_STREAM_NAMESPACE_CONFIG);
10031004
}
10041005

1005-
public DataStreamType dataStreamType() {
1006-
return DataStreamType.valueOf(getString(DATA_STREAM_TYPE_CONFIG).toUpperCase());
1006+
public String dataStreamType() {
1007+
return getString(DATA_STREAM_TYPE_CONFIG);
10071008
}
10081009

10091010
public List<String> dataStreamTimestampField() {

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ private String convertTopicToDataStreamName(String topic) {
199199
}
200200
String dataStream = String.format(
201201
"%s-%s-%s",
202-
config.dataStreamType().name().toLowerCase(),
202+
config.dataStreamType().toLowerCase(),
203203
config.dataStreamDataset(),
204204
namespace
205205
);

src/main/java/io/confluent/connect/elasticsearch/Validator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,8 @@ private void validateCredentials() {
138138
}
139139

140140
private void validateDataStreamConfigs() {
141-
if (config.dataStreamType() == DataStreamType.NONE ^ config.dataStreamDataset().isEmpty()) {
141+
if (config.dataStreamType().toUpperCase().equals(DataStreamType.NONE.name())
142+
^ config.dataStreamDataset().isEmpty()) {
142143
String errorMessage = String.format(
143144
"Either both or neither '%s' and '%s' must be set.",
144145
DATA_STREAM_DATASET_CONFIG,

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfigTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ public void shouldNotAllowInvalidCharactersDataStreamDataset() {
9999
new ElasticsearchSinkConnectorConfig(props);
100100
}
101101

102-
@Test(expected = ConfigException.class)
103-
public void shouldNotAllowInvalidDataStreamType() {
102+
@Test
103+
public void shouldAllowCustomDataStreamType() {
104104
props.put(DATA_STREAM_TYPE_CONFIG, "notLogOrMetrics");
105105
new ElasticsearchSinkConnectorConfig(props);
106106
}

0 commit comments

Comments
 (0)