Skip to content

Commit 1b7ebd6

Browse files
authored
Merge pull request #12 from brbrown25/feature/ISSUE-9
Adding HashField transformer
2 parents 951c754 + 2e308f0 commit 1b7ebd6

File tree

6 files changed

+653
-0
lines changed

6 files changed

+653
-0
lines changed
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* Copyright 2019 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.security.MessageDigest;
20+
import java.util.Arrays;
21+
import java.util.Base64;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.Optional;
25+
26+
import org.apache.kafka.common.config.ConfigDef;
27+
import org.apache.kafka.connect.connector.ConnectRecord;
28+
import org.apache.kafka.connect.data.Field;
29+
import org.apache.kafka.connect.data.Schema;
30+
import org.apache.kafka.connect.data.SchemaAndValue;
31+
import org.apache.kafka.connect.data.Struct;
32+
import org.apache.kafka.connect.errors.DataException;
33+
import org.apache.kafka.connect.transforms.Transformation;
34+
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
38+
39+
public abstract class HashField<R extends ConnectRecord<R>> implements Transformation<R> {
40+
private static final Logger log = LoggerFactory.getLogger(HashField.class);
41+
42+
private HashFieldConfig config;
43+
private static final List<Schema.Type> SUPPORTED_TYPES_TO_CONVERT_FROM = Arrays.asList(
44+
Schema.Type.STRING
45+
);
46+
47+
@Override
48+
public ConfigDef config() {
49+
return HashFieldConfig.config();
50+
}
51+
52+
@Override
53+
public void configure(final Map<String, ?> configs) {
54+
this.config = new HashFieldConfig(configs);
55+
}
56+
57+
@Override
58+
public R apply(final R record) {
59+
final SchemaAndValue schemaAndValue = getSchemaAndValue(record);
60+
if (schemaAndValue.schema() == null) {
61+
throw new DataException(dataPlace() + " schema can't be null: " + record);
62+
}
63+
64+
final Optional<String> newValue;
65+
if (config.fieldName().isPresent()) {
66+
newValue = getNewValueForNamedField(
67+
record.toString(), schemaAndValue.schema(), schemaAndValue.value(), config.fieldName().get());
68+
} else {
69+
if (config.skipMissingOrNull()) {
70+
newValue = getNewValueWithoutFieldName(
71+
record.toString(), schemaAndValue.schema(), schemaAndValue.value());
72+
} else {
73+
throw new DataException(dataPlace() + " can't be null or empty: " + record);
74+
}
75+
}
76+
77+
if (newValue.isPresent()) {
78+
return record.newRecord(
79+
record.topic(),
80+
record.kafkaPartition(),
81+
record.keySchema(),
82+
record.key(),
83+
record.valueSchema(),
84+
newValue.get(),
85+
record.timestamp(),
86+
record.headers()
87+
);
88+
} else {
89+
return record;
90+
}
91+
}
92+
93+
@Override
94+
public void close() {
95+
}
96+
97+
protected abstract String dataPlace();
98+
99+
protected abstract SchemaAndValue getSchemaAndValue(final R record);
100+
101+
private Optional<String> getNewValueForNamedField(final String recordStr,
102+
final Schema schema,
103+
final Object value,
104+
final String fieldName) {
105+
if (Schema.Type.STRUCT != schema.type()) {
106+
throw new DataException(dataPlace() + " schema type must be STRUCT if field name is specified: "
107+
+ recordStr);
108+
}
109+
110+
if (value == null) {
111+
throw new DataException(dataPlace() + " can't be null if field name is specified: " + recordStr);
112+
}
113+
114+
final Field field = schema.field(fieldName);
115+
if (field == null) {
116+
if (config.skipMissingOrNull()) {
117+
return Optional.empty();
118+
} else {
119+
throw new DataException(fieldName + " in " + dataPlace() + " schema can't be missing: " + recordStr);
120+
}
121+
}
122+
123+
if (!SUPPORTED_TYPES_TO_CONVERT_FROM.contains(field.schema().type())) {
124+
throw new DataException(fieldName + " schema type in " + dataPlace()
125+
+ " must be " + SUPPORTED_TYPES_TO_CONVERT_FROM
126+
+ ": " + recordStr);
127+
}
128+
129+
final Struct struct = (Struct) value;
130+
131+
final Optional<String> result = Optional.ofNullable(struct.get(fieldName))
132+
.map(Object::toString).map(s -> hashString(config.hashFunction(), s));
133+
134+
if (result.isPresent() && !result.get().equals("")) {
135+
return result;
136+
} else {
137+
if (config.skipMissingOrNull()) {
138+
return Optional.empty();
139+
} else {
140+
throw new DataException(fieldName + " in " + dataPlace() + " can't be null or empty: " + recordStr);
141+
}
142+
}
143+
}
144+
145+
static final String hashString(final MessageDigest md, final String input) {
146+
final byte[] digest = md.digest(input.getBytes());
147+
return Base64.getEncoder().encodeToString(digest);
148+
}
149+
150+
private Optional<String> getNewValueWithoutFieldName(final String recordStr,
151+
final Schema schema,
152+
final Object value) {
153+
if (!SUPPORTED_TYPES_TO_CONVERT_FROM.contains(schema.type())) {
154+
throw new DataException(dataPlace() + " schema type must be "
155+
+ SUPPORTED_TYPES_TO_CONVERT_FROM
156+
+ " if field name is not specified: "
157+
+ recordStr);
158+
}
159+
160+
final Optional<String> result = Optional.ofNullable(value)
161+
.map(Object::toString).map(s -> hashString(config.hashFunction(), s));
162+
163+
if (result.isPresent() && !result.get().equals("")) {
164+
return result;
165+
} else {
166+
return Optional.empty();
167+
}
168+
}
169+
170+
public static class Key<R extends ConnectRecord<R>> extends HashField<R> {
171+
@Override
172+
protected SchemaAndValue getSchemaAndValue(final R record) {
173+
return new SchemaAndValue(record.keySchema(), record.key());
174+
}
175+
176+
@Override
177+
protected String dataPlace() {
178+
return "key";
179+
}
180+
}
181+
182+
public static class Value<R extends ConnectRecord<R>> extends HashField<R> {
183+
@Override
184+
protected SchemaAndValue getSchemaAndValue(final R record) {
185+
return new SchemaAndValue(record.valueSchema(), record.value());
186+
}
187+
188+
@Override
189+
protected String dataPlace() {
190+
return "value";
191+
}
192+
}
193+
194+
protected HashFieldConfig getConfig() {
195+
return this.config;
196+
}
197+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2019 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.security.MessageDigest;
20+
import java.security.NoSuchAlgorithmException;
21+
import java.util.Map;
22+
import java.util.Optional;
23+
import java.util.stream.Collectors;
24+
import java.util.stream.Stream;
25+
26+
import org.apache.kafka.common.config.AbstractConfig;
27+
import org.apache.kafka.common.config.ConfigDef;
28+
import org.apache.kafka.connect.errors.DataException;
29+
30+
class HashFieldConfig extends AbstractConfig {
31+
public static final String FIELD_NAME_CONFIG = "field.name";
32+
private static final String FIELD_NAME_DOC =
33+
"The name of the field which value should be hashed. If null or empty, "
34+
+ "the entire key or value is used (and assumed to be a string). By default is null.";
35+
public static final String SKIP_MISSING_OR_NULL_CONFIG = "skip.missing.or.null";
36+
private static final String SKIP_MISSING_OR_NULL_DOC =
37+
"In case the source of the new topic name is null or missing, "
38+
+ "should a record be silently passed without transformation.";
39+
public static final String FUNCTION_CONFIG = "function";
40+
private static final String FUNCTION_DOC =
41+
"The name of the hash function to use. The supported values are: md5, sha1, sha256.";
42+
private MessageDigest md = null;
43+
44+
HashFieldConfig(final Map<?, ?> originals) {
45+
super(config(), originals);
46+
}
47+
48+
static ConfigDef config() {
49+
return new ConfigDef()
50+
.define(
51+
FIELD_NAME_CONFIG,
52+
ConfigDef.Type.STRING,
53+
null,
54+
ConfigDef.Importance.HIGH,
55+
FIELD_NAME_DOC)
56+
.define(
57+
SKIP_MISSING_OR_NULL_CONFIG,
58+
ConfigDef.Type.BOOLEAN,
59+
false,
60+
ConfigDef.Importance.LOW,
61+
SKIP_MISSING_OR_NULL_DOC)
62+
.define(
63+
FUNCTION_CONFIG,
64+
ConfigDef.Type.STRING,
65+
ConfigDef.NO_DEFAULT_VALUE,
66+
ConfigDef.ValidString.in(
67+
HashFunction.MD5.toString(),
68+
HashFunction.SHA1.toString(),
69+
HashFunction.SHA256.toString()),
70+
ConfigDef.Importance.HIGH,
71+
FUNCTION_DOC);
72+
}
73+
74+
public enum HashFunction {
75+
MD5,
76+
SHA1 {
77+
public String toString() {
78+
return "SHA-1";
79+
}
80+
},
81+
SHA256 {
82+
public String toString() {
83+
return "SHA-256";
84+
}
85+
};
86+
static String stringValues = Stream.of(HashFunction.values())
87+
.map(Enum::toString)
88+
.collect(Collectors.joining("|", "[", "]"));
89+
}
90+
91+
Optional<String> fieldName() {
92+
final String rawFieldName = getString(FIELD_NAME_CONFIG);
93+
if (null == rawFieldName || "".equals(rawFieldName)) {
94+
return Optional.empty();
95+
}
96+
return Optional.of(rawFieldName);
97+
}
98+
99+
boolean skipMissingOrNull() {
100+
return getBoolean(SKIP_MISSING_OR_NULL_CONFIG);
101+
}
102+
103+
MessageDigest hashFunction() {
104+
final String hashAlg = getString(FUNCTION_CONFIG);
105+
try {
106+
if (md == null) {
107+
md = MessageDigest.getInstance(hashAlg);
108+
}
109+
} catch (final NoSuchAlgorithmException e) {
110+
throw new DataException("Hash function " + hashAlg
111+
+ " must be "
112+
+ HashFieldConfig.HashFunction.stringValues);
113+
}
114+
return md;
115+
}
116+
}
117+
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2019 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
import java.util.Optional;
22+
23+
import org.apache.kafka.common.config.ConfigException;
24+
25+
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.params.ParameterizedTest;
27+
import org.junit.jupiter.params.provider.ValueSource;
28+
29+
import static org.junit.jupiter.api.Assertions.assertEquals;
30+
import static org.junit.jupiter.api.Assertions.assertThrows;
31+
32+
class HashFieldConfigTest {
33+
@Test
34+
void defaults() {
35+
final Map<String, String> props = new HashMap<>();
36+
final Throwable e = assertThrows(ConfigException.class,
37+
() -> new HashFieldConfig(props));
38+
assertEquals("Missing required configuration \"function\" which has no default value.",
39+
e.getMessage());
40+
}
41+
42+
@ParameterizedTest
43+
@ValueSource(booleans = {true, false})
44+
void skipMissingOrNull(final boolean skipMissingOrNull) {
45+
final Map<String, String> props = new HashMap<>();
46+
props.put(HashFieldConfig.SKIP_MISSING_OR_NULL_CONFIG, Boolean.toString(skipMissingOrNull));
47+
props.put(HashFieldConfig.FUNCTION_CONFIG, HashFieldConfig.HashFunction.SHA256.toString());
48+
final HashFieldConfig config = new HashFieldConfig(props);
49+
assertEquals(skipMissingOrNull, config.skipMissingOrNull());
50+
}
51+
52+
@ParameterizedTest
53+
@ValueSource(strings = {"MD5", "SHA-1", "SHA-256"})
54+
void hashFunction(final String hashFunction) {
55+
final Map<String, String> props = new HashMap<>();
56+
props.put(HashFieldConfig.FUNCTION_CONFIG, hashFunction);
57+
final HashFieldConfig config = new HashFieldConfig(props);
58+
assertEquals(hashFunction, config.hashFunction().getAlgorithm());
59+
}
60+
61+
@Test
62+
void emptyFieldName() {
63+
final Map<String, String> props = new HashMap<>();
64+
props.put(HashFieldConfig.FIELD_NAME_CONFIG, "");
65+
props.put(HashFieldConfig.FUNCTION_CONFIG, HashFieldConfig.HashFunction.SHA256.toString());
66+
final HashFieldConfig config = new HashFieldConfig(props);
67+
assertEquals(Optional.empty(), config.fieldName());
68+
}
69+
70+
@Test
71+
void definedFieldName() {
72+
final Map<String, String> props = new HashMap<>();
73+
props.put(HashFieldConfig.FIELD_NAME_CONFIG, "test");
74+
props.put(HashFieldConfig.FUNCTION_CONFIG, HashFieldConfig.HashFunction.SHA256.toString());
75+
final HashFieldConfig config = new HashFieldConfig(props);
76+
assertEquals(Optional.of("test"), config.fieldName());
77+
assertEquals(HashFieldConfig.HashFunction.SHA256.toString(), config.hashFunction().getAlgorithm());
78+
}
79+
}

0 commit comments

Comments
 (0)