Skip to content

Commit d991fa3

Browse files
authored
Merge pull request #21 from brbrown25/feat/concat-transformer
Add concatenation transformer
2 parents 0da033e + 6d7cf7b commit d991fa3

File tree

7 files changed

+724
-0
lines changed

7 files changed

+724
-0
lines changed

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,33 @@ transforms.TombstoneHandler.type=io.aiven.kafka.connect.transforms.TombstoneHand
104104
transforms.TombstoneHandler.behavior=drop_silent
105105
```
106106

107+
### `ConcatField`
108+
109+
This transformation adds a new field to the message with a key of type string and a value of string which is the
110+
concatenation of the requested fields.
111+
112+
Exists in two variants:
113+
114+
- `io.aiven.kafka.connect.transforms.ConcatField$Key` - works on keys;
115+
- `io.aiven.kafka.connect.transforms.ConcatField$Value` - works on values.
116+
117+
The transformation defines the following configurations:
118+
- `field.names` - A comma-separated list of fields to concatenate.
119+
- `output.field.name` - The name of field the concatenated value should be placed into.
120+
- `delimiter` - The string which should be used to join the extracted fields.
121+
- `field.replace.missing` - The string which should be used when a field is not found or its value is null.
122+
123+
Here is an example of this transformation configuration:
124+
125+
```
126+
transforms=ConcatField
127+
transforms.ConcatField.type=io.aiven.kafka.connect.transforms.ConcatField$Value
128+
transforms.ConcatField.field.names=["test","foo","bar","age"]
129+
transforms.ConcatField.output.field.name="combined"
130+
transforms.ConcatField.delimiter="-"
131+
transforms.ConcatField.field.replace.missing="*"
132+
```
133+
107134
## License
108135

109136
This project is licensed under the [Apache License, Version 2.0](LICENSE).
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* Copyright 2021 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
import java.util.StringJoiner;
22+
23+
import org.apache.kafka.common.config.ConfigDef;
24+
import org.apache.kafka.connect.connector.ConnectRecord;
25+
import org.apache.kafka.connect.data.Schema;
26+
import org.apache.kafka.connect.data.SchemaAndValue;
27+
import org.apache.kafka.connect.data.SchemaBuilder;
28+
import org.apache.kafka.connect.data.Struct;
29+
import org.apache.kafka.connect.errors.DataException;
30+
import org.apache.kafka.connect.transforms.Transformation;
31+
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
public abstract class ConcatFields<R extends ConnectRecord<R>> implements Transformation<R> {
36+
private ConcatFieldsConfig config;
37+
private static final Logger log = LoggerFactory.getLogger(ConcatFields.class);
38+
39+
protected abstract String dataPlace();
40+
41+
protected abstract SchemaAndValue getSchemaAndValue(final R record);
42+
43+
protected abstract R createNewRecord(final R record, final Schema newSchema, final Object newValue);
44+
45+
@Override
46+
public ConfigDef config() {
47+
return ConcatFieldsConfig.config();
48+
}
49+
50+
@Override
51+
public void configure(final Map<String, ?> configs) {
52+
this.config = new ConcatFieldsConfig(configs);
53+
}
54+
55+
@Override
56+
public R apply(final R record) {
57+
final SchemaAndValue schemaAndValue = getSchemaAndValue(record);
58+
final SchemaBuilder newSchema = SchemaBuilder.struct();
59+
60+
if (schemaAndValue.value() == null) {
61+
throw new DataException(dataPlace() + " Value can't be null: " + record);
62+
}
63+
64+
final R newRecord;
65+
66+
if (schemaAndValue.value() instanceof Struct) {
67+
final Struct struct = (Struct) schemaAndValue.value();
68+
final StringJoiner outputValue = new StringJoiner(config.delimiter());
69+
70+
if (schemaAndValue.schema() != null) {
71+
schemaAndValue.schema().fields().forEach(field -> newSchema.field(field.name(), field.schema()));
72+
} else {
73+
struct.schema().fields().forEach(field -> newSchema.field(field.name(), field.schema()));
74+
}
75+
newSchema.field(config.outputFieldName(), Schema.OPTIONAL_STRING_SCHEMA);
76+
final Struct newStruct = new Struct(newSchema.build());
77+
struct.schema().fields().forEach(field -> {
78+
newStruct.put(field.name(), struct.get(field));
79+
});
80+
config.fieldNames().forEach(field -> {
81+
try {
82+
if (struct.get(field) == null) {
83+
outputValue.add(config.fieldReplaceMissing());
84+
} else {
85+
outputValue.add(struct.get(field).toString());
86+
}
87+
} catch (final DataException e) {
88+
log.debug("{} is missing, concat will use {}", field, config.fieldReplaceMissing());
89+
outputValue.add(config.fieldReplaceMissing());
90+
}
91+
});
92+
newStruct.put(config.outputFieldName(), outputValue.toString());
93+
newRecord = createNewRecord(record, newSchema.build(), newStruct);
94+
} else if (schemaAndValue.value() instanceof Map) {
95+
final Map newValue = new HashMap<>((Map<?, ?>) schemaAndValue.value());
96+
final StringJoiner outputValue = new StringJoiner(config.delimiter());
97+
config.fieldNames().forEach(field -> {
98+
if (newValue.get(field) == null) {
99+
outputValue.add(config.fieldReplaceMissing());
100+
} else {
101+
outputValue.add(newValue.get(field).toString());
102+
}
103+
});
104+
newValue.put(config.outputFieldName(), outputValue.toString());
105+
106+
//if we have a schema, we can add the new field to it, otherwise just keep the schema null
107+
if (schemaAndValue.schema() != null) {
108+
schemaAndValue.schema().fields().forEach(field -> newSchema.field(field.name(), field.schema()));
109+
newSchema.field(config.outputFieldName(), Schema.OPTIONAL_STRING_SCHEMA);
110+
newRecord = createNewRecord(record, newSchema.build(), newValue);
111+
} else {
112+
newRecord = createNewRecord(record, null, newValue);
113+
}
114+
} else {
115+
throw new DataException("Value type must be STRUCT or MAP: " + record);
116+
}
117+
118+
return newRecord;
119+
}
120+
121+
public static class Key<R extends ConnectRecord<R>> extends ConcatFields<R> {
122+
@Override
123+
protected SchemaAndValue getSchemaAndValue(final R record) {
124+
return new SchemaAndValue(record.keySchema(), record.key());
125+
}
126+
127+
@Override
128+
protected R createNewRecord(final R record, final Schema newSchema, final Object newValue) {
129+
return record.newRecord(
130+
record.topic(),
131+
record.kafkaPartition(),
132+
newSchema,
133+
newValue,
134+
record.valueSchema(),
135+
record.value(),
136+
record.timestamp(),
137+
record.headers()
138+
);
139+
}
140+
141+
@Override
142+
protected String dataPlace() {
143+
return "key";
144+
}
145+
}
146+
147+
public static class Value<R extends ConnectRecord<R>> extends ConcatFields<R> {
148+
@Override
149+
protected SchemaAndValue getSchemaAndValue(final R record) {
150+
return new SchemaAndValue(record.valueSchema(), record.value());
151+
}
152+
153+
@Override
154+
protected R createNewRecord(final R record, final Schema newSchema, final Object newValue) {
155+
return record.newRecord(
156+
record.topic(),
157+
record.kafkaPartition(),
158+
record.keySchema(),
159+
record.key(),
160+
newSchema,
161+
newValue,
162+
record.timestamp(),
163+
record.headers()
164+
);
165+
}
166+
167+
@Override
168+
protected String dataPlace() {
169+
return "value";
170+
}
171+
}
172+
173+
@Override
174+
public void close() {
175+
}
176+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2021 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.List;
20+
import java.util.Map;
21+
22+
import org.apache.kafka.common.config.AbstractConfig;
23+
import org.apache.kafka.common.config.ConfigDef;
24+
25+
final class ConcatFieldsConfig extends AbstractConfig {
26+
public static final String FIELD_NAMES_CONFIG = "field.names";
27+
private static final String FIELD_NAMES_DOC =
28+
"A comma-separated list of fields to concatenate.";
29+
public static final String OUTPUT_FIELD_NAME_CONFIG = "output.field.name";
30+
private static final String OUTPUT_FIELD_NAME_DOC =
31+
"The name of field the concatenated value should be placed into.";
32+
public static final String DELIMITER_CONFIG = "delimiter";
33+
private static final String DELIMITER_DOC =
34+
"The string which should be used to join the extracted fields.";
35+
public static final String FIELD_REPLACE_MISSING_CONFIG = "field.replace.missing";
36+
private static final String FIELD_REPLACE_MISSING_DOC =
37+
"The string which should be used when a field is not found or its value is null.";
38+
39+
ConcatFieldsConfig(final Map<?, ?> originals) {
40+
super(config(), originals);
41+
}
42+
43+
static ConfigDef config() {
44+
return new ConfigDef()
45+
.define(
46+
FIELD_NAMES_CONFIG,
47+
ConfigDef.Type.LIST,
48+
ConfigDef.NO_DEFAULT_VALUE,
49+
ConfigDef.Importance.HIGH,
50+
FIELD_NAMES_DOC)
51+
.define(
52+
OUTPUT_FIELD_NAME_CONFIG,
53+
ConfigDef.Type.STRING,
54+
ConfigDef.NO_DEFAULT_VALUE,
55+
new ConfigDef.NonEmptyString(),
56+
ConfigDef.Importance.HIGH,
57+
OUTPUT_FIELD_NAME_DOC)
58+
.define(
59+
FIELD_REPLACE_MISSING_CONFIG,
60+
ConfigDef.Type.STRING,
61+
"",
62+
ConfigDef.Importance.HIGH,
63+
FIELD_REPLACE_MISSING_DOC)
64+
.define(
65+
DELIMITER_CONFIG,
66+
ConfigDef.Type.STRING,
67+
"",
68+
ConfigDef.Importance.HIGH,
69+
DELIMITER_DOC);
70+
}
71+
72+
final List<String> fieldNames() {
73+
return getList(FIELD_NAMES_CONFIG);
74+
}
75+
76+
final String outputFieldName() {
77+
return getString(OUTPUT_FIELD_NAME_CONFIG);
78+
}
79+
80+
final String fieldReplaceMissing() {
81+
return getString(FIELD_REPLACE_MISSING_CONFIG);
82+
}
83+
84+
final String delimiter() {
85+
return getString(DELIMITER_CONFIG);
86+
}
87+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2019 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Arrays;
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
23+
import org.apache.kafka.common.config.ConfigException;
24+
25+
import org.junit.jupiter.api.Test;
26+
27+
import static io.aiven.kafka.connect.transforms.ConcatFieldsConfig.DELIMITER_CONFIG;
28+
import static io.aiven.kafka.connect.transforms.ConcatFieldsConfig.FIELD_NAMES_CONFIG;
29+
import static io.aiven.kafka.connect.transforms.ConcatFieldsConfig.FIELD_REPLACE_MISSING_CONFIG;
30+
import static io.aiven.kafka.connect.transforms.ConcatFieldsConfig.OUTPUT_FIELD_NAME_CONFIG;
31+
import static org.junit.jupiter.api.Assertions.assertEquals;
32+
import static org.junit.jupiter.api.Assertions.assertThrows;
33+
34+
class ConcatFieldsConfigTest {
35+
@Test
36+
void emptyConfig() {
37+
final Map<String, String> props = new HashMap<>();
38+
final Throwable e = assertThrows(ConfigException.class, () -> new ConcatFieldsConfig(props));
39+
assertEquals("Missing required configuration \"field.names\" which has no default value.",
40+
e.getMessage());
41+
}
42+
43+
@Test
44+
void emptyFieldName() {
45+
final Map<String, String> props = new HashMap<>();
46+
props.put(FIELD_NAMES_CONFIG, "");
47+
final Throwable e = assertThrows(ConfigException.class, () -> new ConcatFieldsConfig(props));
48+
assertEquals("Missing required configuration \"output.field.name\" which has no default value.",
49+
e.getMessage());
50+
}
51+
52+
@Test
53+
void definedFieldName() {
54+
final Map props = new HashMap<>();
55+
props.put(FIELD_NAMES_CONFIG, Arrays.asList("test", "foo", "bar"));
56+
props.put(OUTPUT_FIELD_NAME_CONFIG, "combined");
57+
props.put(DELIMITER_CONFIG, "-");
58+
props.put(FIELD_REPLACE_MISSING_CONFIG, "*");
59+
final ConcatFieldsConfig config = new ConcatFieldsConfig(props);
60+
assertEquals(Arrays.asList("test", "foo", "bar"), config.fieldNames());
61+
assertEquals("combined", config.outputFieldName());
62+
assertEquals("-", config.delimiter());
63+
assertEquals("*", config.fieldReplaceMissing());
64+
}
65+
}

0 commit comments

Comments
 (0)