Skip to content

Commit fddb620

Browse files
add 'data.stream.namespace' configuration property (#802)
1 parent 9bdef0a commit fddb620

File tree

5 files changed

+131
-15
lines changed

5 files changed

+131
-15
lines changed

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ private void closeResources() {
521521
/**
522522
* Creates a data stream. Will not recreate the data stream if it already exists.
523523
*
524-
* @param dataStream the data stream to create given in the form {type}-{dataset}-{topic}
524+
* @param dataStream the data stream to create given in the form {type}-{dataset}-{namespace}
525525
* @return true if the data stream was created, false if it already exists
526526
*/
527527
private boolean createDataStream(String dataStream) {

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java

Lines changed: 78 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -332,29 +332,46 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
332332
private static final String KERBEROS_KEYTAB_PATH_DEFAULT = null;
333333

334334
// Data stream configs
335+
public static final String DATA_STREAM_NAMESPACE_CONFIG = "data.stream.namespace";
336+
private static final String DATA_STREAM_NAMESPACE_DOC =
337+
"Generic name describing user-configurable arbitrary grouping to be written to a data "
338+
+ "stream. Can be any arbitrary string that is no longer than 100 characters, "
339+
+ " is in all lowercase, and does not contain spaces or any of these special characters "
340+
+ "``/\\*\"<>|,#:-``. Otherwise, no value indicates the connector will write to regular "
341+
+ "indices instead. If set, this configuration will be used alongside "
342+
+ "``data.stream.type`` and ``data.stream.dataset`` to construct the data stream name in"
343+
+ "the form of {``data.stream.type``}-{``data.stream.dataset``}-{``"
344+
+ DATA_STREAM_NAMESPACE_CONFIG + "``}. "
345+
+ "Defaut value is ``${topic}``, that is to say the topic name.";
346+
private static final String DATA_STREAM_NAMESPACE_DISPLAY = "Data Stream Namespace";
347+
private static final String DATA_STREAM_NAMESPACE_DEFAULT = "${topic}";
348+
335349
public static final String DATA_STREAM_DATASET_CONFIG = "data.stream.dataset";
336350
private static final String DATA_STREAM_DATASET_DOC =
337351
"Generic name describing data ingested and its structure to be written to a data stream. Can "
338352
+ "be any arbitrary string that is no longer than 100 characters, is in all lowercase, "
339353
+ "and does not contain spaces or any of these special characters ``/\\*\"<>|,#:-``. "
340354
+ "Otherwise, no value indicates the connector will write to regular indices instead. "
341-
+ "If set, this configuration will be used alongside ``data.stream.type`` to "
342-
+ "construct the data stream name in the form of {``data.stream.type``"
343-
+ "}-{``" + DATA_STREAM_DATASET_CONFIG + "``}-{topic}.";
355+
+ "If set, this configuration will be used alongside ``data.stream.type`` and "
356+
+ "``" + DATA_STREAM_NAMESPACE_CONFIG + "`` to construct the data stream name in "
357+
+ "the form of {``data.stream.type``}-{``" + DATA_STREAM_DATASET_CONFIG + "``}-{``"
358+
+ DATA_STREAM_NAMESPACE_CONFIG + "``}.";
344359
private static final String DATA_STREAM_DATASET_DISPLAY = "Data Stream Dataset";
345360
private static final String DATA_STREAM_DATASET_DEFAULT = "";
346-
361+
347362
public static final String DATA_STREAM_TYPE_CONFIG = "data.stream.type";
348363
private static final String DATA_STREAM_TYPE_DOC = String.format(
349364
"Generic type describing the data to be written to data stream. "
350365
+ "The default is %s which indicates the connector will write "
351366
+ "to regular indices instead. If set, this configuration will "
352-
+ "be used alongside %s to construct the data stream name in the form of "
353-
+ "{``%s``}-{``%s``}-{topic}.",
367+
+ "be used alongside %s and %s to construct the data stream name in the form of "
368+
+ "{``%s``}-{``%s``}-{``%s``}.",
354369
DataStreamType.NONE.name(),
355370
DATA_STREAM_DATASET_CONFIG,
371+
DATA_STREAM_NAMESPACE_CONFIG,
356372
DATA_STREAM_TYPE_CONFIG,
357-
DATA_STREAM_DATASET_CONFIG
373+
DATA_STREAM_DATASET_CONFIG,
374+
DATA_STREAM_NAMESPACE_CONFIG
358375
);
359376
private static final String DATA_STREAM_TYPE_DISPLAY = "Data Stream Type";
360377
private static final DataStreamType DATA_STREAM_TYPE_DEFAULT = DataStreamType.NONE;
@@ -820,6 +837,17 @@ private static void addDataStreamConfigs(ConfigDef configDef) {
820837
int order = 0;
821838
configDef
822839
.define(
840+
DATA_STREAM_NAMESPACE_CONFIG,
841+
Type.STRING,
842+
DATA_STREAM_NAMESPACE_DEFAULT,
843+
new DataStreamNamespaceValidator(),
844+
Importance.LOW,
845+
DATA_STREAM_NAMESPACE_DOC,
846+
DATA_STREAM_GROUP,
847+
++order,
848+
Width.MEDIUM,
849+
DATA_STREAM_NAMESPACE_DISPLAY
850+
).define(
823851
DATA_STREAM_DATASET_CONFIG,
824852
Type.STRING,
825853
DATA_STREAM_DATASET_DEFAULT,
@@ -946,6 +974,10 @@ public String dataStreamDataset() {
946974
return getString(DATA_STREAM_DATASET_CONFIG);
947975
}
948976

977+
public String dataStreamNamespace() {
978+
return getString(DATA_STREAM_NAMESPACE_CONFIG);
979+
}
980+
949981
public DataStreamType dataStreamType() {
950982
return DataStreamType.valueOf(getString(DATA_STREAM_TYPE_CONFIG).toUpperCase());
951983
}
@@ -1078,6 +1110,45 @@ public WriteMethod writeMethod() {
10781110
return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase());
10791111
}
10801112

1113+
private static class DataStreamNamespaceValidator implements Validator {
1114+
1115+
@Override
1116+
@SuppressWarnings("unchecked")
1117+
public void ensureValid(String name, Object value) {
1118+
if (value == null) {
1119+
return;
1120+
}
1121+
1122+
String namespace = (String) value;
1123+
1124+
if (namespace.length() > 100) {
1125+
throw new ConfigException(
1126+
name, namespace, "The specified namespace must be no longer than 100 characters."
1127+
);
1128+
}
1129+
1130+
if (!namespace.equals(namespace.toLowerCase())) {
1131+
throw new ConfigException(
1132+
name, namespace, "The specified namespace must be in all lowercase."
1133+
);
1134+
}
1135+
1136+
if (namespace.matches(".*[\\\\\\/\\*\\?\\\"<>| ,#\\-:]+.*")) {
1137+
throw new ConfigException(
1138+
name, namespace,
1139+
"The specified namespace must not contain any spaces or "
1140+
+ "invalid characters \\/*?\"<>|,#-:"
1141+
);
1142+
}
1143+
}
1144+
1145+
@Override
1146+
public String toString() {
1147+
return "A valid namespace name that is all lowercase, less than 100 characters, and "
1148+
+ "does not contain any spaces or invalid characters \\/*?\"<>|,#-:";
1149+
}
1150+
}
1151+
10811152
private static class DataStreamDatasetValidator implements Validator {
10821153

10831154
@Override

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -182,31 +182,33 @@ private String convertTopicToIndexName(String topic) {
182182
}
183183

184184
/**
185-
* Returns the converted index name from a given topic name in the form {type}-{dataset}-{topic}.
186-
* For the <code>topic</code>, Elasticsearch accepts:
185+
* Returns the converted datastream name from a given topic name in the form:
186+
* {type}-{dataset}-{namespace}
187+
* For the <code>namespace</code> (that can contain topic), Elasticsearch accepts:
187188
* <ul>
188189
* <li>all lowercase</li>
189190
* <li>no longer than 100 bytes</li>
190191
* </ul>
191192
* (<a href="https://github.com/elastic/ecs/blob/master/rfcs/text/0009-data_stream-fields.md#restrictions-on-values">ref</a>_.)
192193
*/
193194
private String convertTopicToDataStreamName(String topic) {
194-
topic = topic.toLowerCase();
195-
if (topic.length() > 100) {
196-
topic = topic.substring(0, 100);
195+
String namespace = config.dataStreamNamespace();
196+
namespace = namespace.replace("${topic}", topic.toLowerCase());
197+
if (namespace.length() > 100) {
198+
namespace = namespace.substring(0, 100);
197199
}
198200
String dataStream = String.format(
199201
"%s-%s-%s",
200202
config.dataStreamType().name().toLowerCase(),
201203
config.dataStreamDataset(),
202-
topic
204+
namespace
203205
);
204206
return dataStream;
205207
}
206208

207209
/**
208210
* Returns the converted index name from a given topic name. If writing to a data stream,
209-
* returns the index name in the form {type}-{dataset}-{topic}. For both cases, Elasticsearch
211+
* returns the index name in the form {type}-{dataset}-{namespace}. For both cases, Elasticsearch
210212
* accepts:
211213
* <ul>
212214
* <li>all lowercase</li>

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfigTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ public void testSetHttpTimeoutsConfig() {
5151
assertEquals(config.connectionTimeoutMs(), 15000);
5252
}
5353

54+
@Test
55+
public void shouldAllowValidChractersDataStreamNamespace() {
56+
props.put(DATA_STREAM_NAMESPACE_CONFIG, "a_valid.namespace123");
57+
new ElasticsearchSinkConnectorConfig(props);
58+
}
59+
5460
@Test
5561
public void shouldAllowValidChractersDataStreamDataset() {
5662
props.put(DATA_STREAM_DATASET_CONFIG, "a_valid.dataset123");
@@ -69,12 +75,24 @@ public void shouldAllowValidDataStreamTypeCaseInsensitive() {
6975
new ElasticsearchSinkConnectorConfig(props);
7076
}
7177

78+
@Test(expected = ConfigException.class)
79+
public void shouldNotAllowInvalidCaseDataStreamNamespace() {
80+
props.put(DATA_STREAM_NAMESPACE_CONFIG, "AN_INVALID.namespace123");
81+
new ElasticsearchSinkConnectorConfig(props);
82+
}
83+
7284
@Test(expected = ConfigException.class)
7385
public void shouldNotAllowInvalidCaseDataStreamDataset() {
7486
props.put(DATA_STREAM_DATASET_CONFIG, "AN_INVALID.dataset123");
7587
new ElasticsearchSinkConnectorConfig(props);
7688
}
7789

90+
@Test(expected = ConfigException.class)
91+
public void shouldNotAllowInvalidCharactersDataStreamNamespace() {
92+
props.put(DATA_STREAM_NAMESPACE_CONFIG, "not-valid?");
93+
new ElasticsearchSinkConnectorConfig(props);
94+
}
95+
7896
@Test(expected = ConfigException.class)
7997
public void shouldNotAllowInvalidCharactersDataStreamDataset() {
8098
props.put(DATA_STREAM_DATASET_CONFIG, "not-valid?");
@@ -87,6 +105,12 @@ public void shouldNotAllowInvalidDataStreamType() {
87105
new ElasticsearchSinkConnectorConfig(props);
88106
}
89107

108+
@Test(expected = ConfigException.class)
109+
public void shouldNotAllowLongDataStreamNamespace() {
110+
props.put(DATA_STREAM_NAMESPACE_CONFIG, String.format("%d%100d", 1, 1));
111+
new ElasticsearchSinkConnectorConfig(props);
112+
}
113+
90114
@Test(expected = ConfigException.class)
91115
public void shouldNotAllowLongDataStreamDataset() {
92116
props.put(DATA_STREAM_DATASET_CONFIG, String.format("%d%100d", 1, 1));

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues;
4343

4444
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG;
45+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_NAMESPACE_CONFIG;
4546
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_DATASET_CONFIG;
4647
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_TYPE_CONFIG;
4748
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG;
@@ -376,6 +377,24 @@ public void testConvertTopicToDataStreamAllowUnderscores() {
376377
verify(client, times(1)).createIndexOrDataStream(eq(indexName));
377378
}
378379

380+
@Test
381+
public void testConvertTopicToDataStreamWithCustomNamespace() {
382+
String type = "logs";
383+
String dataset = "a_valid_dataset";
384+
String namespaceTemplate = "a_valid_prefix_${topic}";
385+
props.put(DATA_STREAM_TYPE_CONFIG, type);
386+
props.put(DATA_STREAM_DATASET_CONFIG, dataset);
387+
props.put(DATA_STREAM_NAMESPACE_CONFIG, namespaceTemplate);
388+
setUpTask();
389+
390+
String topic = "a_valid_topic";
391+
String namespace = namespaceTemplate.replace("${topic}", topic);
392+
when(assignment.contains(eq(new TopicPartition(topic, 1)))).thenReturn(true);
393+
task.put(Collections.singletonList(record(topic, true, false, 0)));
394+
String indexName = dataStreamName(type, dataset, namespace);
395+
verify(client, times(1)).createIndexOrDataStream(eq(indexName));
396+
}
397+
379398
@Test
380399
public void testConvertTopicToDataStreamTooLong() {
381400
String type = "logs";

0 commit comments

Comments
 (0)