Skip to content

Commit d8aa4cb

Browse files
author
Brandon Brown
committed
feat(HashField): Adding a HashField transformer that can use a supplied hashing algorithm to transform the targeted value. ISSUE-9.
1 parent 951c754 commit d8aa4cb

File tree

7 files changed

+631
-0
lines changed

7 files changed

+631
-0
lines changed
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Copyright 2019 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.security.MessageDigest;
20+
import java.security.NoSuchAlgorithmException;
21+
import java.util.Arrays;
22+
import java.util.Base64;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.Optional;
26+
import java.util.stream.Collectors;
27+
28+
import org.apache.kafka.common.config.ConfigDef;
29+
import org.apache.kafka.connect.connector.ConnectRecord;
30+
import org.apache.kafka.connect.data.Field;
31+
import org.apache.kafka.connect.data.Schema;
32+
import org.apache.kafka.connect.data.SchemaAndValue;
33+
import org.apache.kafka.connect.data.Struct;
34+
import org.apache.kafka.connect.errors.DataException;
35+
import org.apache.kafka.connect.transforms.Transformation;
36+
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
40+
41+
public abstract class HashField<R extends ConnectRecord<R>> implements Transformation<R> {
42+
private static final Logger log = LoggerFactory.getLogger(HashField.class);
43+
44+
private HashFieldConfig config;
45+
private static final List<Schema.Type> SUPPORTED_TYPES_TO_CONVERT_FROM = Arrays.asList(
46+
Schema.Type.STRING
47+
);
48+
49+
@Override
50+
public ConfigDef config() {
51+
return HashFieldConfig.config();
52+
}
53+
54+
@Override
55+
public void configure(final Map<String, ?> configs) {
56+
this.config = new HashFieldConfig(configs);
57+
}
58+
59+
@Override
60+
public R apply(final R record) {
61+
final SchemaAndValue schemaAndValue = getSchemaAndValue(record);
62+
if (schemaAndValue.schema() == null) {
63+
throw new DataException(dataPlace() + " schema can't be null: " + record);
64+
}
65+
66+
final Optional<String> newValue;
67+
if (config.fieldName().isPresent()) {
68+
newValue = getNewValueForNamedField(
69+
record.toString(), schemaAndValue.schema(), schemaAndValue.value(), config.fieldName().get());
70+
} else {
71+
newValue = getNewValueWithoutFieldName(
72+
record.toString(), schemaAndValue.schema(), schemaAndValue.value());
73+
}
74+
75+
if (newValue.isPresent()) {
76+
return record.newRecord(
77+
record.topic(),
78+
record.kafkaPartition(),
79+
record.keySchema(),
80+
record.key(),
81+
record.valueSchema(),
82+
newValue.get(),
83+
record.timestamp(),
84+
record.headers()
85+
);
86+
} else {
87+
return record;
88+
}
89+
}
90+
91+
@Override
92+
public void close() {
93+
}
94+
95+
protected abstract String dataPlace();
96+
97+
protected abstract SchemaAndValue getSchemaAndValue(final R record);
98+
99+
private Optional<String> getNewValueForNamedField(final String recordStr,
100+
final Schema schema,
101+
final Object value,
102+
final String fieldName) {
103+
if (Schema.Type.STRUCT != schema.type()) {
104+
throw new DataException(dataPlace() + " schema type must be STRUCT if field name is specified: "
105+
+ recordStr);
106+
}
107+
108+
if (value == null) {
109+
throw new DataException(dataPlace() + " can't be null if field name is specified: " + recordStr);
110+
}
111+
112+
final Field field = schema.field(fieldName);
113+
if (field == null) {
114+
return Optional.empty();
115+
}
116+
117+
if (!SUPPORTED_TYPES_TO_CONVERT_FROM.contains(field.schema().type())) {
118+
throw new DataException(fieldName + " schema type in " + dataPlace()
119+
+ " must be " + SUPPORTED_TYPES_TO_CONVERT_FROM
120+
+ ": " + recordStr);
121+
}
122+
123+
final Struct struct = (Struct) value;
124+
125+
final Optional<String> result = Optional.ofNullable(struct.get(fieldName))
126+
.map(Object::toString).flatMap(s -> config.hashFunction().map(alg -> hashString(alg, s)));
127+
if (result.isPresent() && !result.get().equals("")) {
128+
return result;
129+
} else {
130+
return Optional.empty();
131+
}
132+
}
133+
134+
static final String hashString(final String hashAlg, final String input) {
135+
try {
136+
final MessageDigest md = MessageDigest.getInstance(hashAlg);
137+
final byte[] digest = md.digest(input.getBytes());
138+
return Base64.getEncoder().encodeToString(digest);
139+
} catch (final NoSuchAlgorithmException e) {
140+
System.out.println("EXCEPTION: " + e.getMessage());
141+
throw new DataException("Hash function " + hashAlg
142+
+ " must be "
143+
+ HashFieldConfig.HashFunction.stringValues
144+
.stream()
145+
.collect(Collectors.joining("|", "[", "]")));
146+
}
147+
}
148+
149+
private Optional<String> getNewValueWithoutFieldName(final String recordStr,
150+
final Schema schema,
151+
final Object value) {
152+
if (!SUPPORTED_TYPES_TO_CONVERT_FROM.contains(schema.type())) {
153+
throw new DataException(dataPlace() + " schema type must be "
154+
+ SUPPORTED_TYPES_TO_CONVERT_FROM
155+
+ " if field name is not specified: "
156+
+ recordStr);
157+
}
158+
159+
final Optional<String> result = Optional.ofNullable(value)
160+
.map(Object::toString).flatMap(s -> config.hashFunction().map(alg -> hashString(alg, s)));
161+
162+
if (result.isPresent() && !result.get().equals("")) {
163+
return result;
164+
} else {
165+
return Optional.empty();
166+
}
167+
}
168+
169+
public static class Key<R extends ConnectRecord<R>> extends HashField<R> {
170+
@Override
171+
protected SchemaAndValue getSchemaAndValue(final R record) {
172+
return new SchemaAndValue(record.keySchema(), record.key());
173+
}
174+
175+
@Override
176+
protected String dataPlace() {
177+
return "key";
178+
}
179+
}
180+
181+
public static class Value<R extends ConnectRecord<R>> extends HashField<R> {
182+
@Override
183+
protected SchemaAndValue getSchemaAndValue(final R record) {
184+
return new SchemaAndValue(record.valueSchema(), record.value());
185+
}
186+
187+
@Override
188+
protected String dataPlace() {
189+
return "value";
190+
}
191+
}
192+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2019 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.Optional;
22+
import java.util.stream.Collectors;
23+
import java.util.stream.Stream;
24+
25+
import org.apache.kafka.common.config.AbstractConfig;
26+
import org.apache.kafka.common.config.ConfigDef;
27+
28+
class HashFieldConfig extends AbstractConfig {
29+
public static final String FIELD_NAME_CONFIG = "field.name";
30+
private static final String FIELD_NAME_DOC =
31+
"The name of the field which value should be hashed. If null or empty, "
32+
+ "the entire key or value is used (and assumed to be a string). By default is null.";
33+
public static final String FUNCTION_CONFIG = "function";
34+
private static final String FUNCTION_DOC =
35+
"The name of the hash function to use. The supported values are: md5, sha1, sha256.";
36+
37+
HashFieldConfig(final Map<?, ?> originals) {
38+
super(config(), originals);
39+
}
40+
41+
static ConfigDef config() {
42+
return new ConfigDef()
43+
.define(
44+
FIELD_NAME_CONFIG,
45+
ConfigDef.Type.STRING,
46+
null,
47+
ConfigDef.Importance.HIGH,
48+
FIELD_NAME_DOC)
49+
.define(
50+
FUNCTION_CONFIG,
51+
ConfigDef.Type.STRING,
52+
null,
53+
HashFunctionValidator.INSTANCE,
54+
ConfigDef.Importance.HIGH,
55+
FUNCTION_DOC);
56+
}
57+
58+
public enum HashFunction {
59+
MD5,
60+
SHA1 {
61+
public String toString() {
62+
return "SHA-1";
63+
}
64+
},
65+
SHA256 {
66+
public String toString() {
67+
return "SHA-256";
68+
}
69+
};
70+
static List<String> stringValues = Stream.of(HashFunction.values())
71+
.map(Enum::toString)
72+
.collect(Collectors.toList());
73+
}
74+
75+
Optional<String> fieldName() {
76+
final String rawFieldName = getString(FIELD_NAME_CONFIG);
77+
if (null == rawFieldName || "".equals(rawFieldName)) {
78+
return Optional.empty();
79+
}
80+
return Optional.of(rawFieldName);
81+
}
82+
83+
Optional<String> hashFunction() {
84+
final String rawFieldName = getString(FUNCTION_CONFIG);
85+
if (null == rawFieldName || "".equals(rawFieldName)) {
86+
return Optional.empty();
87+
}
88+
return Optional.of(rawFieldName);
89+
}
90+
}
91+
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2019 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.stream.Collectors;
20+
21+
import org.apache.kafka.common.config.ConfigDef;
22+
import org.apache.kafka.common.config.ConfigException;
23+
24+
public class HashFunctionValidator implements ConfigDef.Validator {
25+
public static final HashFunctionValidator INSTANCE =
26+
new HashFunctionValidator();
27+
28+
@Override
29+
public void ensureValid(final String name, final Object value) {
30+
if (value != null) {
31+
if (!HashFieldConfig.HashFunction.stringValues.contains(value.toString())) {
32+
throw new ConfigException(name, value, "Invalid Hash Function");
33+
}
34+
}
35+
}
36+
37+
@Override
38+
public String toString() {
39+
final String validFunctions = HashFieldConfig.HashFunction.stringValues
40+
.stream()
41+
.collect(Collectors.joining("|"));
42+
return "Must be one of " + validFunctions;
43+
}
44+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2019 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
import java.util.Optional;
22+
23+
import org.junit.jupiter.api.Test;
24+
import org.junit.jupiter.params.ParameterizedTest;
25+
import org.junit.jupiter.params.provider.ValueSource;
26+
27+
import static org.junit.jupiter.api.Assertions.assertEquals;
28+
29+
class HashFieldConfigTest {
30+
@Test
31+
void defaults() {
32+
final Map<String, String> props = new HashMap<>();
33+
final HashFieldConfig config = new HashFieldConfig(props);
34+
assertEquals(Optional.empty(), config.fieldName());
35+
assertEquals(Optional.empty(), config.hashFunction());
36+
}
37+
38+
@ParameterizedTest
39+
@ValueSource(strings = {"MD5", "SHA-1", "SHA-256"})
40+
void hashFunction(final String hashFunction) {
41+
final Map<String, String> props = new HashMap<>();
42+
props.put(HashFieldConfig.FUNCTION_CONFIG, hashFunction);
43+
final HashFieldConfig config = new HashFieldConfig(props);
44+
assertEquals(Optional.of(hashFunction), config.hashFunction());
45+
}
46+
47+
@Test
48+
void emptyFieldName() {
49+
final Map<String, String> props = new HashMap<>();
50+
props.put(HashFieldConfig.FIELD_NAME_CONFIG, "");
51+
final HashFieldConfig config = new HashFieldConfig(props);
52+
assertEquals(Optional.empty(), config.fieldName());
53+
}
54+
55+
@Test
56+
void definedFieldName() {
57+
final Map<String, String> props = new HashMap<>();
58+
props.put(HashFieldConfig.FIELD_NAME_CONFIG, "test");
59+
final HashFieldConfig config = new HashFieldConfig(props);
60+
assertEquals(Optional.of("test"), config.fieldName());
61+
}
62+
}

0 commit comments

Comments
 (0)