Skip to content

Commit 874f09e

Browse files
authored
Merge pull request #7 from aiven/loosen-string-restriction
ExtractTopic: support other primitive types apart from STRING #7
2 parents 50cd066 + 9545c27 commit 874f09e

File tree

3 files changed

+84
-23
lines changed

3 files changed

+84
-23
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ transforms.ExtractTimestampFromValueField.field.name=inner_field_name
3535

3636
This transformation extracts a string value from the record and use it as the topic name.
3737

38-
The transformation can use either the whole key or value (in this case, it must have `STRING` type) or a field in them (in this case, it must have `STRUCT` type and the field's value must be `STRING`).
38+
The transformation can use either the whole key or value (in this case, it must have `INT8`, `INT16`, `INT32`, `INT64`, `FLOAT32`, `FLOAT32`, `BOOLEAN`, or `STRING` type) or a field in them (in this case, it must have `STRUCT` type and the field's value must be `INT8`, `INT16`, `INT32`, `INT64`, `FLOAT32`, `FLOAT32`, `BOOLEAN`, or `STRING`).
3939

4040
Exists in two variants:
4141
- `io.aiven.kafka.connect.transforms.ExtractTopic$Key` - works on keys;

src/main/java/io/aiven/kafka/connect/transforms/ExtractTopic.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.aiven.kafka.connect.transforms;
1818

19+
import java.util.Arrays;
20+
import java.util.List;
1921
import java.util.Map;
2022
import java.util.Optional;
2123

@@ -34,6 +36,17 @@
3436
public abstract class ExtractTopic<R extends ConnectRecord<R>> implements Transformation<R> {
3537
private static final Logger log = LoggerFactory.getLogger(ExtractTopic.class);
3638

39+
private static final List<Schema.Type> SUPPORTED_TYPES_TO_CONVERT_FROM = Arrays.asList(
40+
Schema.Type.INT8,
41+
Schema.Type.INT16,
42+
Schema.Type.INT32,
43+
Schema.Type.INT64,
44+
Schema.Type.FLOAT32,
45+
Schema.Type.FLOAT64,
46+
Schema.Type.BOOLEAN,
47+
Schema.Type.STRING
48+
);
49+
3750
private ExtractTopicConfig config;
3851

3952
@Override
@@ -104,29 +117,34 @@ private Optional<String> getNewTopicForNamedField(final String recordStr,
104117
}
105118
}
106119

107-
if (Schema.Type.STRING != field.schema().type()) {
108-
throw new DataException(fieldName + " schema type in " + dataPlace() + " must be STRING: " + recordStr);
120+
if (!SUPPORTED_TYPES_TO_CONVERT_FROM.contains(field.schema().type())) {
121+
throw new DataException(fieldName + " schema type in " + dataPlace()
122+
+ " must be " + SUPPORTED_TYPES_TO_CONVERT_FROM
123+
+ ": " + recordStr);
109124
}
110125

111126
final Struct struct = (Struct) value;
112-
final String fieldValue = struct.getString(fieldName);
113127

114-
if (fieldValue == null || "".equals(fieldValue)) {
128+
final Optional<String> result = Optional.ofNullable(struct.get(fieldName))
129+
.map(Object::toString);
130+
if (result.isPresent() && !result.get().equals("")) {
131+
return result;
132+
} else {
115133
if (config.skipMissingOrNull()) {
116134
return Optional.empty();
117135
} else {
118136
throw new DataException(fieldName + " in " + dataPlace() + " can't be null or empty: " + recordStr);
119137
}
120-
} else {
121-
return Optional.of((String) fieldValue);
122138
}
123139
}
124140

125141
private Optional<String> getNewTopicWithoutFieldName(final String recordStr,
126142
final Schema schema,
127143
final Object value) {
128-
if (Schema.Type.STRING != schema.type()) {
129-
throw new DataException(dataPlace() + " schema type must be STRING if field name is not specified: "
144+
if (!SUPPORTED_TYPES_TO_CONVERT_FROM.contains(schema.type())) {
145+
throw new DataException(dataPlace() + " schema type must be "
146+
+ SUPPORTED_TYPES_TO_CONVERT_FROM
147+
+ " if field name is not specified: "
130148
+ recordStr);
131149
}
132150

@@ -138,7 +156,7 @@ private Optional<String> getNewTopicWithoutFieldName(final String recordStr,
138156
}
139157
}
140158

141-
return Optional.of((String) value);
159+
return Optional.of(value.toString());
142160
}
143161

144162
public static class Key<R extends ConnectRecord<R>> extends ExtractTopic<R> {

src/test/java/io/aiven/kafka/connect/transforms/ExtractTopicTest.java

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,15 @@ void nullSchema(final boolean skipMissingOrNull) {
5050

5151
@ParameterizedTest
5252
@ValueSource(booleans = { true, false })
53-
void noFieldName_NonString(final boolean skipMissingOrNull) {
54-
final SinkRecord originalRecord = record(SchemaBuilder.INT8_SCHEMA, "some");
53+
void noFieldName_UnsupportedType(final boolean skipMissingOrNull) {
54+
final Schema schema = SchemaBuilder.struct().build();
55+
final SinkRecord originalRecord = record(schema, new Struct(schema));
5556
final Throwable e = assertThrows(DataException.class,
5657
() -> transformation(null, skipMissingOrNull).apply(originalRecord));
57-
assertEquals(dataPlace() + " schema type must be STRING if field name is not specified: "
58+
assertEquals(dataPlace()
59+
+ " schema type must be "
60+
+ "[INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING]"
61+
+ " if field name is not specified: "
5862
+ originalRecord,
5963
e.getMessage());
6064
}
@@ -80,11 +84,27 @@ void noFieldName_NullOrEmptyValue_Skip(final String value) {
8084
}
8185

8286
@Test
83-
void noFieldName_NormalValue() {
87+
void noFieldName_NormalInt64Value() {
88+
final Schema schema = SchemaBuilder.INT64_SCHEMA;
89+
final SinkRecord originalRecord = record(schema, 123L);
90+
final SinkRecord result = transformation(null, false).apply(originalRecord);
91+
assertEquals(setNewTopic(originalRecord, "123"), result);
92+
}
93+
94+
@Test
95+
void noFieldName_NormalBooleanValue() {
96+
final Schema schema = SchemaBuilder.BOOLEAN_SCHEMA;
97+
final SinkRecord originalRecord = record(schema, false);
98+
final SinkRecord result = transformation(null, false).apply(originalRecord);
99+
assertEquals(setNewTopic(originalRecord, "false"), result);
100+
}
101+
102+
@Test
103+
void noFieldName_NormalStringValue() {
84104
final Schema schema = SchemaBuilder.STRING_SCHEMA;
85105
final SinkRecord originalRecord = record(schema, NEW_TOPIC);
86106
final SinkRecord result = transformation(null, false).apply(originalRecord);
87-
assertEquals(setNewTopic(originalRecord), result);
107+
assertEquals(setNewTopic(originalRecord, NEW_TOPIC), result);
88108
}
89109

90110
@ParameterizedTest
@@ -113,15 +133,18 @@ void fieldName_NullStruct(final boolean skipMissingOrNull) {
113133

114134
@ParameterizedTest
115135
@ValueSource(booleans = { true, false })
116-
void fieldName_NonStringInField(final boolean skipMissingOrNull) {
136+
void fieldName_UnsupportedTypeInField(final boolean skipMissingOrNull) {
137+
final Schema innerSchema = SchemaBuilder.struct().build();
117138
final Schema schema = SchemaBuilder.struct()
118-
.field(FIELD, SchemaBuilder.INT8_SCHEMA)
139+
.field(FIELD, innerSchema)
119140
.schema();
120141
final SinkRecord originalRecord = record(
121-
schema, new Struct(schema).put(FIELD, (byte) 0));
142+
schema, new Struct(schema).put(FIELD, new Struct(innerSchema)));
122143
final Throwable e = assertThrows(DataException.class,
123144
() -> transformation(FIELD, skipMissingOrNull).apply(originalRecord));
124-
assertEquals(FIELD + " schema type in " + dataPlace() + " must be STRING: " + originalRecord,
145+
assertEquals(FIELD + " schema type in " + dataPlace() + " must be "
146+
+ "[INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING]"
147+
+ ": " + originalRecord,
125148
e.getMessage());
126149
}
127150

@@ -178,13 +201,33 @@ void fieldName_MissingFieldInSchema_Skip() {
178201
}
179202

180203
@Test
181-
void fieldName_NormalValue() {
204+
void fieldName_NormalIntValue() {
205+
final Schema schema = SchemaBuilder.struct()
206+
.field(FIELD, SchemaBuilder.INT64_SCHEMA)
207+
.schema();
208+
final SinkRecord originalRecord = record(schema, new Struct(schema).put(FIELD, 123L));
209+
final SinkRecord result = transformation(FIELD, true).apply(originalRecord);
210+
assertEquals(setNewTopic(originalRecord, "123"), result);
211+
}
212+
213+
@Test
214+
void fieldName_NormalBooleanValue() {
215+
final Schema schema = SchemaBuilder.struct()
216+
.field(FIELD, SchemaBuilder.BOOLEAN_SCHEMA)
217+
.schema();
218+
final SinkRecord originalRecord = record(schema, new Struct(schema).put(FIELD, false));
219+
final SinkRecord result = transformation(FIELD, true).apply(originalRecord);
220+
assertEquals(setNewTopic(originalRecord, "false"), result);
221+
}
222+
223+
@Test
224+
void fieldName_NormalStringValue() {
182225
final Schema schema = SchemaBuilder.struct()
183226
.field(FIELD, SchemaBuilder.STRING_SCHEMA)
184227
.schema();
185228
final SinkRecord originalRecord = record(schema, new Struct(schema).put(FIELD, NEW_TOPIC));
186229
final SinkRecord result = transformation(FIELD, true).apply(originalRecord);
187-
assertEquals(setNewTopic(originalRecord), result);
230+
assertEquals(setNewTopic(originalRecord, NEW_TOPIC), result);
188231
}
189232

190233
private ExtractTopic<SinkRecord> transformation(final String fieldName, final boolean skipMissingOrNull) {
@@ -215,8 +258,8 @@ protected SinkRecord record(final Schema keySchema,
215258
456L, TimestampType.CREATE_TIME);
216259
}
217260

218-
private SinkRecord setNewTopic(final SinkRecord record) {
219-
return record.newRecord(NEW_TOPIC,
261+
private SinkRecord setNewTopic(final SinkRecord record, final String newTopic) {
262+
return record.newRecord(newTopic,
220263
record.kafkaPartition(),
221264
record.keySchema(),
222265
record.key(),

0 commit comments

Comments
 (0)